__init__.py 5.8 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Query, HTTPException, status,WebSocket,WebSocketDisconnect
  4. from common.security import valid_access_token,valid_websocket_token
  5. from fastapi.responses import JSONResponse
  6. from sqlalchemy.orm import Session
  7. from sqlalchemy.sql import func
  8. from common.auth_user import *
  9. from sqlalchemy import text
  10. from pydantic import BaseModel
  11. from common.BigDataCenterAPI import *
  12. from database import get_db
  13. from typing import List
  14. from models import *
  15. from utils import *
  16. from utils.spatial import *
  17. from utils.ry_system_util import *
  18. from utils.video_util import *
  19. import json
  20. import traceback
  21. router = APIRouter()
  22. #union all (select `jhhk`.`video_code` AS `id`,`jhhk`.`name` AS `name`,Null AS `dataType`,`jhhk`.`area` AS `area`,`jhhk`.`longitude` AS `longitude`,`jhhk`.`latitude` AS `latitude`,NULL AS `address`,null as dict_label from (select `t1`.`name` AS `name`,(case when (`t1`.`area` like '%高州%') then '高州市' when (`t1`.`area` like '%信宜%') then '信宜市' when (`t1`.`area` like '%化州%') then '化州市' when (`t1`.`area` like '%茂南%') then '茂南区' when (`t1`.`area` like '%电白%') then '电白区' else '直辖市' end) AS `area`,`t2`.`longitude` AS `longitude`,`t2`.`latitude` AS `latitude`,`t1`.`status` AS `status`,`t1`.`video_code` AS `video_code` from ((select `tp_video_log`.`id` AS `id`,`tp_video_log`.`name` AS `name`,`tp_video_log`.`area` AS `area`,`tp_video_log`.`ip` AS `ip`,`tp_video_log`.`status` AS `status`,`tp_video_log`.`status_lifetime` AS `status_lifetime`,`tp_video_log`.`record_status` AS `record_status`,`tp_video_log`.`inspection_datetime` AS `inspection_datetime`,`tp_video_log`.`video_code_int` AS `video_code_int`,`tp_video_log`.`video_code` AS `video_code` from `tp_video_log` where (`tp_video_log`.`video_code` not in (SELECT id from point_data ))) `t1` left join `tp_video_base` `t2` on((`t1`.`video_code` = `t2`.`indexcode`)))) `jhhk` where id like '%{keyword}%' or `name` like '%{keyword}%')A
  23. def get_add_data(keyword:str ,db :Session):
  24. sql = text("""SELECT * FROM point_data where dataType in (2,3,4,5,6,7,8,9,11,12,13,14,18,19,20,24,26,35,36,37,38,39) and ( address like '%{keyword}%' or `name` like '%{keyword}%')""".replace("{keyword}",keyword))
  25. return [{"id":i.id,"name":i.name,"dataType":i.dataType,"area":i.area,"longitude":i.longitude,"latitude":i.latitude,"address":i.address} for i in db.execute(sql).all()]
  26. def get_video_data(keyword:str ,db :Session):
  27. sql = text("""SELECT T1.*,T2.dict_label FROM point_data T1 LEFT JOIN (SELECT * FROM `sys_dict_data` WHERE `dict_type`='point_type') T2 on T1.dataType=T2.dict_value where T1.dataType in (30,31,32,33,34) and ( T1.id like '%{keyword}%' or T1.`name` like '%{keyword}%') """.replace("{keyword}",keyword))
  28. result = []
  29. for i in db.execute(sql).all():
  30. tag = []
  31. tag_lable = []
  32. for info in get_video_tag_list(db, i.id):
  33. tag_info = get_dict_data_info(db, info.dict_type, info.dict_value)
  34. if tag_info:
  35. if tag_info.dict_label not in tag_lable:
  36. tag.append({"id": info.id,
  37. "video_code": i.id,
  38. "dict_type": info.dict_type,
  39. "dict_value": info.dict_value,
  40. "dict_label": tag_info.dict_label,
  41. "dict_code": tag_info.dict_code})
  42. tag_lable.append(tag_info.dict_label)
  43. result.append({"id": i.id, "name": i.name, "dataType": i.dataType, "area": i.area, "longitude": i.longitude,
  44. "latitude": i.latitude, "address": i.address, "tag_info":tag}) #"dict_label": i.dict_label,
  45. return result
  46. def get_point_data(keyword:str ,db :Session):
  47. sql = text("""SELECT T1.*,T2.dict_label FROM point_data T1 LEFT JOIN (SELECT * FROM `sys_dict_data` WHERE `dict_type`='point_type') T2 on T1.dataType=T2.dict_value where (T2.dict_label like '%{keyword}%' or T1.`name` like '%{keyword}%')""".replace("{keyword}",keyword))
  48. return [{"id":i.id,"name":i.name,"dataType":i.dataType,"area":i.area,"longitude":i.longitude,"latitude":i.latitude,"address":i.address,"dict_label":i.dict_label} for i in db.execute(sql).all()]
  49. def contains_special_characters(input_string, special_characters=";|&|$|#|'|@| "):
  50. """
  51. 判断字符串是否包含特殊符号。
  52. :param input_string: 需要检查的字符串
  53. :param special_characters: 特殊符号的字符串,多个符号用竖线 '|' 分隔
  54. :return: 如果包含特殊符号返回 True,否则返回 False
  55. """
  56. # 创建正则表达式模式
  57. pattern = re.compile('[' + re.escape(special_characters) + ']')
  58. # 搜索字符串中的特殊符号
  59. if pattern.search(input_string):
  60. return True
  61. return False
  62. @router.get("/info")
  63. async def get_pattern_list(
  64. keyword: str = Query(None, description='预案名称'),
  65. # page: int = Query(1, gt=0, description='页码'),
  66. # pageSize: int = Query(5, gt=0, description='每页条目数量'),
  67. db: Session = Depends(get_db)
  68. ):
  69. try:
  70. if contains_special_characters(keyword):
  71. return JSONResponse(status_code=411, content={
  72. 'code': 411,
  73. 'msg': f'参数keyword含特殊符号:;、&、$、#、\'、\\t、@、空格等'
  74. })
  75. return {
  76. "code": 200,
  77. "msg": "查询成功",
  78. "data":{
  79. "add_data":get_add_data(keyword,db),
  80. "video_data":get_video_data(keyword,db),
  81. "point_data":get_point_data(keyword,db)
  82. }
  83. }
  84. except Exception as e:
  85. traceback.print_exc()
  86. raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")