#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends,Query,HTTPException from database import get_db from sqlalchemy.orm import Session from sqlalchemy import case,or_ from sqlalchemy import text from utils import * from utils.ry_system_util import * from utils.video_util import * from common.security import valid_access_token from fastapi.responses import JSONResponse import traceback from datetime import datetime router = APIRouter() @router.get('/get_video_list_by_user') async def get_video_url_by_id( longitude:float = Query(None, description='经度'), latitude:float = Query(None, description='纬度'), db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量') ): if longitude is not None and latitude is not None: location = f"""ST_Distance_Sphere( ST_GeomFromText(CONCAT('POINT(', longitude, ' ', latitude, ')')), ST_PointFromText('POINT({longitude} {latitude})'))""" orddis = 'distance' else: location = 0 orddis = '' videoIds = user_id_get_user_videoIds(db, user_id) video_list = [i.video_code_int for i in videoIds] if len(video_list)==0: video = '' else: video = "" for i in video_list: video += f"WHEN '{i}' THEN 0 \n" video = f"""CASE video_code_int {video} ELSE 1 END """ if orddis != '' and video !='': video += ',' sql = f"""SELECT T1.indexcode,T2.`name`,T1.longitude,T1.latitude, {location} AS distance,T2.area,T2.ip,T2.`status`,T2.status_lifetime,T2.record_status,T2.inspection_datetime,T2.video_code_int,T2.video_code FROM tp_video_base T1 RIGHT JOIN tp_video_log T2 on T1.indexcode=T2.video_code_int where T1.longitude is not NULL ORDER BY {video} {orddis} """ totalsql = f'select count(*) from ({sql})t' print(video_list) total_items = db.execute(totalsql).first()[0] lim = f"limit {pageSize*(page-1)}, {pageSize};" videos = db.execute(sql+lim).all() # query = db.query(TpVideoLog) # total_items = query.count() # # query = query.order_by( # case( # [(TpVideoLog.video_code_int == video_code_int, 0) for video_code_int in video_list], # else_=1 # ) # ) # videos = query.offset((page - 1) * pageSize).limit(pageSize).all() video_list1 = [] for video in videos: videoInfo = { "name": video.name, "invideoIds": video.video_code_int in video_list, "area": video.area, "ip": video.ip, "status": video.status, "status_lifetime": video.status_lifetime, "record_status": video.record_status, "inspection_datetime": video.inspection_datetime, "video_code_int": video.video_code_int, "video_code": video.video_code, "longitude":video.longitude, "latitude":video.latitude } video_list1.append(videoInfo) return { "code": 200, "msg": "操作成功", "rows": video_list1, "total": total_items, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } @router.get('/get_waterlogged_all_video_info') async def get_waterlogged_all_video_info( radius:int = Query(None), db: Session = Depends(get_db), body=Depends(remove_xss_json), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), user_id=Depends(valid_access_token)): try: if radius is None: radius=500 print(1,datetime.now) sql = f"""SELECT * from mid_waterlogged_roads """ waterlogged_data = db.execute(sql).all() resutl = [] print(2,datetime.now) tj = [] for location_1 in waterlogged_data: location = f"POINT({location_1.lng} {location_1.lat})" tj.append(f"ST_Distance_Sphere(ST_GeomFromText(CONCAT('POINT(', longitude, ' ', latitude, ')')),ST_GeomFromText(CONCAT('{location}'))) <= '{radius}'") tj = ' or '.join(tj) sql = text(f"""SELECT indexcode,`name`,longitude,latitude FROM ( SELECT indexcode,`name`,longitude,latitude FROM tp_video_base where longitude is not null and latitude is not null and ({tj}) and `status`='ON' ) T limit {pageSize*(page-1)}, {pageSize}""") # , # ST_Distance_Sphere( # ST_GeomFromText(CONCAT('POINT(', longitude, ' ', latitude, ')')), # ST_PointFromText('{location}') # ) # AS # distance ,"distance":info.distance ORDER BY distance ASC resutl=db.execute(sql).all() print(3, datetime.now) total_items = len(resutl) print(4,datetime.now) return { "code": 200, "msg": "成功", "data": {"list":[{"indexcode":info.indexcode,"name":info.name,"longitude":info.longitude,"latitude":info.latitude} for info in resutl[(page - 1) * pageSize:(page - 1) * pageSize+pageSize]]}, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/get_video_forest_fire_list') async def get_video_forest_fire_list( radius:int = Query(None), db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token)): try: """ 根据层级路径构建带有 label 和 children 的树形结构。 :param paths: 包含层级路径的列表 :return: 树形结构的字典 """ video_code_list = [item[0] for item in db.query(TpVideoTag.id).filter(TpVideoTag.dict_value == '4').all()] video_list = db.query(TpVideoLog).filter(TpVideoLog.video_code.in_(video_code_list)).all() root = {"label": "Root", "children": [],"online":0,"total":0} # 创建根节点 for video_info in video_list: levels = video_info.area.split('/') current_node = root current_node['total']+=1 if video_info.status == '在线': current_node['online']+=1 for level in levels: # 查找当前层级是否已存在 existing_node = next((node for node in current_node["children"] if node["label"] == level), None) if not existing_node: # 如果不存在,创建新节点 new_node = {"label": level, "children": [],"online":0,"total":0} current_node["children"].append(new_node) existing_node = new_node # 移动到子节点 current_node = existing_node current_node['total']+=1 if video_info.status == '在线': current_node['online']+=1 current_node['children'].append({"label":video_info.name,"status":video_info.status,"video_code":video_info.video_code,"isLeaf":True}) return { "code": 200, "msg": "成功", "data": root['children'], 'online':root['online'], 'total':root['total'] } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/get_video_tag_info') async def get_video_tag_info( video_code:str = Query(None), db: Session = Depends(get_db), body=Depends(remove_xss_json), # page: int = Query(1, gt=0, description='页码'), # pageSize: int = Query(10, gt=0, description='每页条目数量'), user_id=Depends(valid_access_token) ): try: tag = [] tag_lable = [] for info in get_video_tag_list(db,video_code): tag_info = get_dict_data_info(db,info.dict_type,info.dict_value) if tag_info.dict_label not in tag_lable: tag.append({"id":info.id, "video_code":video_code, "dict_type":info.dict_type, "dict_value":info.dict_value, "dict_label":tag_info.dict_label, "dict_code":tag_info.dict_code}) tag_lable.append(tag_info.dict_label) return { "code": 200, "msg": "成功", "data": tag } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/add_video_tag") async def add_video_tag( user_id=Depends(valid_access_token), body = Depends(remove_xss_json), db: Session = Depends(get_db) ): try: tag_info = get_dict_data_info(db, body['dict_type'], body['dict_value']) if tag_info is None: return JSONResponse(status_code=404,content={"code":404,"msg":"标签不存在"}) new_video_tag = TpVideoTag( id = new_guid(), video_code=body['video_code'], dict_value=body['dict_value'], dict_type=body['dict_type'], create_dept = user_id ) db.add(new_video_tag) db.commit() return {"code": 200, "msg": "新增成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/add_video_tag_label") async def add_video_tag( user_id=Depends(valid_access_token), body = Depends(remove_xss_json), db: Session = Depends(get_db) ): try: dict_label = body['dict_label'] dict_info = dict_label_get_dict_data_info(db,body['dict_type'],dict_label) if dict_info is None: dict_info = SysDictData( dict_label=dict_label, dict_value = new_guid(), dict_type=body['dict_type'], create_by = user_id ) db.add(dict_info) new_video_tag = TpVideoTag( id = new_guid(), video_code=body['video_code'], dict_value=dict_info.dict_value, dict_type=body['dict_type'], create_dept = user_id ) db.add(new_video_tag) db.commit() return {"code": 200, "msg": "新增成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.delete("/delete_video_tag/{video_tag_id}") async def delete_video_tag( video_tag_id: str, db: Session = Depends(get_db) ): try: # 检查图案是否存在 query = db.query(TpVideoTag) query = query.filter(TpVideoTag.id == video_tag_id) query = query.filter(TpVideoTag.del_flag != '2') video_tag = query.first() if not video_tag: return JSONResponse(status_code=404,content={"code":404,"msg":"标签不存在"}) # 执行删除操作 video_tag.del_flag='2' db.commit() return {"code": 200, "msg": "删除成功"} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/get_recently_video_tag_info') async def get_video_tag_info( # video_code:str = Query(None), db: Session = Depends(get_db), body=Depends(remove_xss_json), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(3, gt=0, description='每页条目数量'), user_id=Depends(valid_access_token) ): try: query = db.query(SysDictData) query = query.filter(SysDictData.del_flag != '2') query = query.filter(SysDictData.dict_type == 'video_type') total_items = query.count() # 排序 query = query.order_by(SysDictData.create_time.desc()) # 执行分页查询 dicts = query.offset((page - 1) * pageSize).limit(pageSize).all() tag = [] for info in dicts: tag.append({ "dict_type":info.dict_type, "dict_value":info.dict_value, "dict_label":info.dict_label, "dict_code":info.dict_code}) return { "code": 200, "msg": "成功", "data": tag, "total": total_items, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/get_lx_hy_video_tag_info') async def get_lx_hy_video_tag_info( dict_value:str = Query(None), type:str= Query(None), db: Session = Depends(get_db), body=Depends(remove_xss_json), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(3, gt=0, description='每页条目数量'), user_id=Depends(valid_access_token) ): try: if dict_value is None or dict_value =='': query = db.query(SysDictData) query = query.filter(SysDictData.del_flag != '2') query = query.filter(SysDictData.dict_type == 'video_type') if type=='lx': query_1 = db.query(SysDictData) query_1 = query_1.filter(SysDictData.del_flag != '2') query_1 = query_1.filter(SysDictData.dict_type == 'video_tag_type') elif type == 'hy': query_1 = db.query(SysDictData) query_1 = query_1.filter(SysDictData.del_flag != '2') query_1 = query_1.filter(SysDictData.dict_type == 'video_tag_industry') else: return { "code": 200, "msg": "成功", "data": [] } # for i in query_1.all(): query = query.filter(or_(SysDictData.remark.like(f'%{i.dict_value};%') for i in query_1.all())) total_items = query.count() # 排序 query = query.order_by(SysDictData.create_time.desc()) # 执行分页查询 dicts = query.offset((page - 1) * pageSize).limit(pageSize).all() tag = [] for info in dicts: tag.append({ "dict_type": info.dict_type, "dict_value": info.dict_value, "dict_label": info.dict_label, "dict_code": info.dict_code}) return { "code": 200, "msg": "成功", "data": tag, "total": total_items, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } query = db.query(SysDictData) query = query.filter(SysDictData.del_flag != '2') query = query.filter(SysDictData.dict_type == 'video_type') query = query.filter(SysDictData.remark.like( f'%{dict_value};%')) total_items = query.count() # 排序 query = query.order_by(SysDictData.create_time.desc()) # 执行分页查询 dicts = query.offset((page - 1) * pageSize).limit(pageSize).all() tag = [] for info in dicts: tag.append({ "dict_type":info.dict_type, "dict_value":info.dict_value, "dict_label":info.dict_label, "dict_code":info.dict_code}) return { "code": 200, "msg": "成功", "data": tag, "total": total_items, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")