__init__.py 5.7 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Query, HTTPException, status,WebSocket,WebSocketDisconnect
  4. from common.security import valid_access_token,valid_websocket_token
  5. from fastapi.responses import JSONResponse
  6. from sqlalchemy.orm import Session
  7. from sqlalchemy.sql import func
  8. from common.auth_user import *
  9. from sqlalchemy import text
  10. from pydantic import BaseModel
  11. from common.BigDataCenterAPI import *
  12. from database import get_db
  13. from typing import List
  14. from models import *
  15. from utils import *
  16. from utils.spatial import *
  17. from utils.ry_system_util import *
  18. from utils.video_util import *
  19. import json
  20. import traceback
  21. router = APIRouter()
  22. #union all (select `jhhk`.`video_code` AS `id`,`jhhk`.`name` AS `name`,Null AS `dataType`,`jhhk`.`area` AS `area`,`jhhk`.`longitude` AS `longitude`,`jhhk`.`latitude` AS `latitude`,NULL AS `address`,null as dict_label from (select `t1`.`name` AS `name`,(case when (`t1`.`area` like '%高州%') then '高州市' when (`t1`.`area` like '%信宜%') then '信宜市' when (`t1`.`area` like '%化州%') then '化州市' when (`t1`.`area` like '%茂南%') then '茂南区' when (`t1`.`area` like '%电白%') then '电白区' else '直辖市' end) AS `area`,`t2`.`longitude` AS `longitude`,`t2`.`latitude` AS `latitude`,`t1`.`status` AS `status`,`t1`.`video_code` AS `video_code` from ((select `tp_video_log`.`id` AS `id`,`tp_video_log`.`name` AS `name`,`tp_video_log`.`area` AS `area`,`tp_video_log`.`ip` AS `ip`,`tp_video_log`.`status` AS `status`,`tp_video_log`.`status_lifetime` AS `status_lifetime`,`tp_video_log`.`record_status` AS `record_status`,`tp_video_log`.`inspection_datetime` AS `inspection_datetime`,`tp_video_log`.`video_code_int` AS `video_code_int`,`tp_video_log`.`video_code` AS `video_code` from `tp_video_log` where (`tp_video_log`.`video_code` not in (SELECT id from point_data ))) `t1` left join `tp_video_base` `t2` on((`t1`.`video_code` = `t2`.`indexcode`)))) `jhhk` where id like '%{keyword}%' or `name` like '%{keyword}%')A
  23. def get_add_data(keyword:str ,db :Session):
  24. sql = text("""SELECT * FROM point_data where dataType in (2,3,4,5,6,7,8,9,11,12,13,14,18,19,20,24,26,35,36,37,38,39) and ( address like '%{keyword}%' or `name` like '%{keyword}%')""".replace("{keyword}",keyword))
  25. return [{"id":i.id,"name":i.name,"dataType":i.dataType,"area":i.area,"longitude":i.longitude,"latitude":i.latitude,"address":i.address} for i in db.execute(sql).all()]
  26. def get_video_data(keyword:str ,db :Session):
  27. sql = text("""SELECT T1.*,T2.dict_label FROM point_data T1 LEFT JOIN (SELECT * FROM `sys_dict_data` WHERE `dict_type`='point_type') T2 on T1.dataType=T2.dict_value where T1.dataType in (30,31,32,33,34) and ( T1.id like '%{keyword}%' or T1.`name` like '%{keyword}%') """.replace("{keyword}",keyword))
  28. result = []
  29. for i in db.execute(sql).all():
  30. tag = []
  31. tag_lable = []
  32. for info in get_video_tag_list(db, i.id):
  33. tag_info = get_dict_data_info(db, info.dict_type, info.dict_value)
  34. if tag_info.dict_label not in tag_lable:
  35. tag.append({"id": info.id,
  36. "video_code": i.id,
  37. "dict_type": info.dict_type,
  38. "dict_value": info.dict_value,
  39. "dict_label": tag_info.dict_label,
  40. "dict_code": tag_info.dict_code})
  41. tag_lable.append(tag_info.dict_label)
  42. result.append({"id": i.id, "name": i.name, "dataType": i.dataType, "area": i.area, "longitude": i.longitude,
  43. "latitude": i.latitude, "address": i.address, "tag_info":tag}) #"dict_label": i.dict_label,
  44. return result
  45. def get_point_data(keyword:str ,db :Session):
  46. sql = text("""SELECT T1.*,T2.dict_label FROM point_data T1 LEFT JOIN (SELECT * FROM `sys_dict_data` WHERE `dict_type`='point_type') T2 on T1.dataType=T2.dict_value where (T2.dict_label like '%{keyword}%' or T1.`name` like '%{keyword}%')""".replace("{keyword}",keyword))
  47. return [{"id":i.id,"name":i.name,"dataType":i.dataType,"area":i.area,"longitude":i.longitude,"latitude":i.latitude,"address":i.address,"dict_label":i.dict_label} for i in db.execute(sql).all()]
  48. def contains_special_characters(input_string, special_characters=";|&|$|#|'|@| "):
  49. """
  50. 判断字符串是否包含特殊符号。
  51. :param input_string: 需要检查的字符串
  52. :param special_characters: 特殊符号的字符串,多个符号用竖线 '|' 分隔
  53. :return: 如果包含特殊符号返回 True,否则返回 False
  54. """
  55. # 创建正则表达式模式
  56. pattern = re.compile('[' + re.escape(special_characters) + ']')
  57. # 搜索字符串中的特殊符号
  58. if pattern.search(input_string):
  59. return True
  60. return False
  61. @router.get("/info")
  62. async def get_pattern_list(
  63. keyword: str = Query(None, description='预案名称'),
  64. # page: int = Query(1, gt=0, description='页码'),
  65. # pageSize: int = Query(5, gt=0, description='每页条目数量'),
  66. db: Session = Depends(get_db)
  67. ):
  68. try:
  69. if contains_special_characters(keyword):
  70. return JSONResponse(status_code=411, content={
  71. 'code': 411,
  72. 'msg': f'参数keyword含特殊符号:;、&、$、#、\'、\\t、@、空格等'
  73. })
  74. return {
  75. "code": 200,
  76. "msg": "查询成功",
  77. "data":{
  78. "add_data":get_add_data(keyword,db),
  79. "video_data":get_video_data(keyword,db),
  80. "point_data":get_point_data(keyword,db)
  81. }
  82. }
  83. except Exception as e:
  84. traceback.print_exc()
  85. raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")