#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status,WebSocket,WebSocketDisconnect from common.security import valid_access_token,valid_websocket_token from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from sqlalchemy.sql import func from common.auth_user import * from sqlalchemy import text from pydantic import BaseModel from common.BigDataCenterAPI import * from database import get_db from typing import List from models import * from utils import * from utils.spatial import * from utils.ry_system_util import * from utils.video_util import * import json import traceback router = APIRouter() #union all (select `jhhk`.`video_code` AS `id`,`jhhk`.`name` AS `name`,Null AS `dataType`,`jhhk`.`area` AS `area`,`jhhk`.`longitude` AS `longitude`,`jhhk`.`latitude` AS `latitude`,NULL AS `address`,null as dict_label from (select `t1`.`name` AS `name`,(case when (`t1`.`area` like '%高州%') then '高州市' when (`t1`.`area` like '%信宜%') then '信宜市' when (`t1`.`area` like '%化州%') then '化州市' when (`t1`.`area` like '%茂南%') then '茂南区' when (`t1`.`area` like '%电白%') then '电白区' else '直辖市' end) AS `area`,`t2`.`longitude` AS `longitude`,`t2`.`latitude` AS `latitude`,`t1`.`status` AS `status`,`t1`.`video_code` AS `video_code` from ((select `tp_video_log`.`id` AS `id`,`tp_video_log`.`name` AS `name`,`tp_video_log`.`area` AS `area`,`tp_video_log`.`ip` AS `ip`,`tp_video_log`.`status` AS `status`,`tp_video_log`.`status_lifetime` AS `status_lifetime`,`tp_video_log`.`record_status` AS `record_status`,`tp_video_log`.`inspection_datetime` AS `inspection_datetime`,`tp_video_log`.`video_code_int` AS `video_code_int`,`tp_video_log`.`video_code` AS `video_code` from `tp_video_log` where (`tp_video_log`.`video_code` not in (SELECT id from point_data ))) `t1` left join `tp_video_base` `t2` on((`t1`.`video_code` = `t2`.`indexcode`)))) `jhhk` where id like '%{keyword}%' or `name` like '%{keyword}%')A def get_add_data(keyword:str ,db :Session): sql = text("""SELECT * FROM point_data where dataType in (2,3,4,5,6,7,8,9,11,12,13,14,18,19,20,24,26,35,36,37,38,39) and ( address like '%{keyword}%' or `name` like '%{keyword}%')""".replace("{keyword}",keyword)) return [{"id":i.id,"name":i.name,"dataType":i.dataType,"area":i.area,"longitude":i.longitude,"latitude":i.latitude,"address":i.address,"type":"2"} for i in db.execute(sql).all()] def get_video_data(keyword:str ,db :Session): sql = text("""SELECT T1.*,T2.dict_label FROM point_data T1 LEFT JOIN (SELECT * FROM `sys_dict_data` WHERE `dict_type`='point_type') T2 on T1.dataType=T2.dict_value where T1.dataType in (30,31,32,33,34) and ( T1.id like '%{keyword}%' or T1.`name` like '%{keyword}%') """.replace("{keyword}",keyword)) result = [] for i in db.execute(sql).all(): tag = [] tag_lable = [] for info in get_video_tag_list(db, i.id): tag_info = get_dict_data_info(db, info.dict_type, info.dict_value) if tag_info: if tag_info.dict_label not in tag_lable: tag.append({"id": info.id, "video_code": i.id, "dict_type": info.dict_type, "dict_value": info.dict_value, "dict_label": tag_info.dict_label, "dict_code": tag_info.dict_code}) tag_lable.append(tag_info.dict_label) result.append({"id": i.id, "name": i.name, "dataType": i.dataType, "area": i.area, "longitude": i.longitude, "latitude": i.latitude, "address": i.address, "tag_info":tag}) #"dict_label": i.dict_label, return result def get_point_data(keyword:str ,db :Session): sql = text("""SELECT T1.*,T2.dict_label FROM point_data T1 LEFT JOIN (SELECT * FROM `sys_dict_data` WHERE `dict_type`='point_type') T2 on T1.dataType=T2.dict_value where (T2.dict_label like '%{keyword}%' or T1.`name` like '%{keyword}%')""".replace("{keyword}",keyword)) return [{"id":i.id,"name":i.name,"dataType":i.dataType,"area":i.area,"longitude":i.longitude,"latitude":i.latitude,"address":i.address,"dict_label":i.dict_label} for i in db.execute(sql).all()] def contains_special_characters(input_string, special_characters=";|&|$|#|'|@| "): """ 判断字符串是否包含特殊符号。 :param input_string: 需要检查的字符串 :param special_characters: 特殊符号的字符串,多个符号用竖线 '|' 分隔 :return: 如果包含特殊符号返回 True,否则返回 False """ # 创建正则表达式模式 pattern = re.compile('[' + re.escape(special_characters) + ']') # 搜索字符串中的特殊符号 if pattern.search(input_string): return True return False @router.get("/info") async def get_pattern_list( keyword: str = Query(None, description='预案名称'), # page: int = Query(1, gt=0, description='页码'), # pageSize: int = Query(5, gt=0, description='每页条目数量'), db: Session = Depends(get_db) ): try: if contains_special_characters(keyword): return JSONResponse(status_code=411, content={ 'code': 411, 'msg': f'参数keyword含特殊符号:;、&、$、#、\'、\\t、@、空格等' }) return { "code": 200, "msg": "查询成功", "data":{ "add_data":get_add_data(keyword,db), "video_data":get_video_data(keyword,db), "point_data":get_point_data(keyword,db) } } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")