#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from common.security import valid_access_token from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from sqlalchemy.sql import func from sqlalchemy.future import select from common.auth_user import * from pydantic import BaseModel from database import get_db from typing import List from models import * from utils import * from utils.ry_system_util import * from utils.video_util import * from collections import defaultdict import traceback from concurrent.futures import ThreadPoolExecutor, as_completed from multiprocessing import Pool, cpu_count import json import time import math router = APIRouter() @router.get('/video/{video_code}') async def get_video_url_by_id( video_code:str, db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token) ): # 大屏左下角视频及更多视频 try: query = db.query(TPVideoInfo) query = query.filter(TPVideoInfo.gbIndexCode==video_code) videoIds = user_id_get_user_videoIds(db, user_id) video_list = [i.video_code_int for i in videoIds] video = query.first() tag_list =get_video_tag_list(db,video.gbIndexCode) tag = [] tag_lable = [] for info in tag_list: tag_info = get_dict_data_info(db, info.dict_type, info.dict_value) if tag_info: if tag_info.dict_label not in tag_lable and tag_info.dict_label!='全量视频': tag.append({"id": info.id, "video_code": video.gbIndexCode, "dict_type": info.dict_type, "dict_value": info.dict_value, "dict_label": tag_info.dict_label, "dict_code": tag_info.dict_code}) tag_lable.append(tag_info.dict_label) data={ "name":video.name, "isUserVideos":video.gbIndexCode in video_list, "video_code": video.gbIndexCode, "isTag" : len(tag_list)>0, "tag" : tag, "tagLabels" : "、".join(tag_lable), "status":video.status, "longitude":video.longitude, "latitude":video.latitude, "statusName":video.statusName, "regionPath":video.regionPath, "installPlace":video.installPlace, "cameraTypeName":video.cameraTypeName, "cameraType":video.cameraType } return { "code": 200, "msg": "操作成功", "data": data} except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.get("/videos") async def get_videos( zoom_level: float = Query(..., description="Zoom level for clustering"), latitude_min: float = Query(..., description="Minimum latitude"), latitude_max: float = Query(..., description="Maximum latitude"), longitude_min: float = Query(..., description="Minimum longitude"), longitude_max: float = Query(..., description="Maximum longitude"), dict_value: str = Query(None), db: Session = Depends(get_db) ): try: # 根据缩放级别动态调整分组粒度 distance_threshold = 1000 / (2 ** zoom_level) # 例如:每缩放一级,距离阈值减半 que = True print(time.time()) if dict_value: tag_info = get_dict_data_info(db, 'video_type', dict_value) if tag_info: if tag_info.dict_label!='全量视频': videolist = [i.video_code for i in tag_get_video_tag_list(db,dict_value)] que =TPVideoInfo.gbIndexCode.in_(videolist) # 查询分组 print("1",time.time()) query = ( select( TPVideoInfo.cameraIndexCode, TPVideoInfo.gbIndexCode, TPVideoInfo.pixel, TPVideoInfo.cameraType, TPVideoInfo.cameraTypeName, TPVideoInfo.installPlace, TPVideoInfo.status, TPVideoInfo.statusName, TPVideoInfo.latitude, TPVideoInfo.longitude, TPVideoInfo.name, TPVideoInfo.unitIndexCode, func.ST_AsText(TPVideoInfo.location).label("location") ) .select_from(TPVideoInfo).where( and_( TPVideoInfo.latitude >= latitude_min, TPVideoInfo.latitude <= latitude_max, TPVideoInfo.longitude >= longitude_min, TPVideoInfo.longitude <= longitude_max, TPVideoInfo.longitude>0, TPVideoInfo.latitude>0,que ) ) .order_by(TPVideoInfo.cameraIndexCode) ) result = db.execute(query) print("2",time.time()) videos = result.fetchall() print("3",time.time()) # 动态分组逻辑 # groups = {} groups = group_videos(videos, distance_threshold) # for video in videos: # grouped = False # for group_id, group in list(groups.items()): # for v in group["videos"]: # distance = calculate_distance(video, v) # if distance < distance_threshold: # groups[group_id]["videos"].append(video) # groups[group_id]["count"] += 1 # grouped = True # break # if grouped: # break # if not grouped: # group_id = video.cameraIndexCode # groups[group_id] = {"count": 1, "videos": [video]} print("4",time.time()) return {"code": 200, "msg": "操作成功", "data": groups} except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) def calculate_grid_size(distance_threshold): # 假设地球半径为6371公里,将距离阈值转换为经纬度的差值 # 这里假设纬度变化对距离的影响较小,仅根据经度计算网格大小 earth_radius = 6371 # 地球半径,单位为公里 grid_size = distance_threshold / earth_radius return grid_size def get_grid_key(latitude, longitude, grid_size): # 根据经纬度和网格大小计算网格键 return (math.floor(latitude / grid_size), math.floor(longitude / grid_size)) def calculate_distance(video1, video2): # 使用 Haversine 公式计算两点之间的距离 from math import radians, sin, cos, sqrt, atan2 R = 6371 # 地球半径(公里) lat1, lon1 = radians(video1.latitude), radians(video1.longitude) lat2, lon2 = radians(video2.latitude), radians(video2.longitude) dlat = lat2 - lat1 dlon = lon2 - lon1 a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2 c = 2 * atan2(sqrt(a), sqrt(1 - a)) return R * c def group_videos(videos, distance_threshold): grid_size = calculate_grid_size(distance_threshold) grid = defaultdict(lambda:{"count":0}) #,"list":[] groups = [] for video in videos: grid_key = get_grid_key(video.latitude, video.longitude, grid_size) grid['%s-%s'%grid_key]['count']+=1 grid['%s-%s'%grid_key]['latitude'] = (grid_key[0] + 0.5) * grid_size grid['%s-%s'%grid_key]['longitude'] = (grid_key[1] + 0.5) * grid_size # grid['%s-%s'%grid_key]['list'].append(video) if grid['%s-%s'%grid_key]['count']>1: grid['%s-%s' % grid_key]['type'] ='2' else:grid['%s-%s'%grid_key]['type'] ='1' groups = list(grid.values()) # for group_id, group in list(grid.items()): # groups.append(group) # #使用多线程计算距离 # def process_video(video, grid_videos, groups, distance_threshold): # grouped = False # for group_id, group in list(groups.items()): # for v in group["videos"]: # if calculate_distance(video, v) < distance_threshold: # groups[group_id]["videos"].append(video) # groups[group_id]["count"] += 1 # grouped = True # break # if grouped: # break # if not grouped: # group_id = video.cameraIndexCode # groups[group_id] = {"count": 1, "videos": [video]} # # with ThreadPoolExecutor() as executor: # futures = [] # for grid_key, grid_videos in grid.items(): # for video in grid_videos: # futures.append(executor.submit(process_video, video, grid_videos, groups, distance_threshold)) # for future in as_completed(futures): # future.result() # 确保所有任务完成 # for grid_key, grid_videos in grid.items(): # for video in grid_videos: # grouped = False # for group_id, group in list(groups.items()): # for v in group["videos"]: # if calculate_distance(video, v) < distance_threshold: # groups[group_id]["videos"].append(video) # groups[group_id]["count"] += 1 # grouped = True # break # if grouped: # break # if not grouped: # group_id = video.cameraIndexCode # groups[group_id] = {"count": 1, "videos": [video]} # 使用多进程处理每个网格中的视频 # with Pool(processes=cpu_count()) as pool: # for grid_key, grid_videos in grid.items(): # # 初始化局部分组 # partial_groups = pool.starmap(process_video, [(video, grid_videos, groups.copy(), distance_threshold) for video in grid_videos]) # # 合并局部分组结果 # for partial_group in partial_groups: # for group_id, group in partial_group.items(): # if group_id in groups: # groups[group_id]["videos"].extend(group["videos"]) # groups[group_id]["count"] += group["count"] # else: # groups[group_id] = group # return groups return groups def process_video(video, grid_videos, groups, distance_threshold): grouped = False for group_id, group in list(groups.items()): for v in group["videos"]: if calculate_distance(video, v) < distance_threshold: group["videos"].append(video) group["count"] += 1 grouped = True break if grouped: break if not grouped: group_id = video.cameraIndexCode groups[group_id] = {"count": 1, "videos": [video]} return groups