#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from common.security import valid_access_token from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from sqlalchemy.sql import func from sqlalchemy.future import select from common.auth_user import * from pydantic import BaseModel from database import get_db from typing import List from models import * from utils import * from utils.ry_system_util import * from utils.video_util import * from collections import defaultdict import traceback import json import time import math router = APIRouter() @router.get("/videos") async def get_videos( zoom_level: float = Query(..., description="Zoom level for clustering"), latitude_min: float = Query(..., description="Minimum latitude"), latitude_max: float = Query(..., description="Maximum latitude"), longitude_min: float = Query(..., description="Minimum longitude"), longitude_max: float = Query(..., description="Maximum longitude"), dict_value: str = Query(None), db: Session = Depends(get_db) ): try: # 根据缩放级别动态调整分组粒度 distance_threshold = 1000 / (2 ** zoom_level) # 例如:每缩放一级,距离阈值减半 que = True print(time.time()) if dict_value: tag_info = get_dict_data_info(db, 'video_type', dict_value) if tag_info: if tag_info.dict_label!='全量视频': videolist = [i.video_code for i in tag_get_video_tag_list(db,dict_value)] que =TPVideoInfo.gbIndexCode.in_(videolist) # 查询分组 print("1",time.time()) query = ( select( TPVideoInfo.cameraIndexCode, TPVideoInfo.gbIndexCode, TPVideoInfo.pixel, TPVideoInfo.cameraType, TPVideoInfo.cameraTypeName, TPVideoInfo.installPlace, TPVideoInfo.status, TPVideoInfo.statusName, TPVideoInfo.latitude, TPVideoInfo.longitude, TPVideoInfo.name, TPVideoInfo.unitIndexCode, func.ST_AsText(TPVideoInfo.location).label("location") ) .select_from(TPVideoInfo).where( and_( TPVideoInfo.latitude >= latitude_min, TPVideoInfo.latitude <= latitude_max, TPVideoInfo.longitude >= longitude_min, TPVideoInfo.longitude <= longitude_max, TPVideoInfo.longitude>0, TPVideoInfo.latitude>0,que ) ) .order_by(TPVideoInfo.cameraIndexCode) ) result = db.execute(query) print("2",time.time()) videos = result.fetchall() print("3",time.time()) # 动态分组逻辑 # groups = {} groups = group_videos(videos, distance_threshold) # for video in videos: # grouped = False # for group_id, group in list(groups.items()): # for v in group["videos"]: # distance = calculate_distance(video, v) # if distance < distance_threshold: # groups[group_id]["videos"].append(video) # groups[group_id]["count"] += 1 # grouped = True # break # if grouped: # break # if not grouped: # group_id = video.cameraIndexCode # groups[group_id] = {"count": 1, "videos": [video]} print("4",time.time()) return {"code": 200, "msg": "操作成功", "data": groups} except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) def calculate_grid_size(distance_threshold): # 假设地球半径为6371公里,将距离阈值转换为经纬度的差值 # 这里假设纬度变化对距离的影响较小,仅根据经度计算网格大小 earth_radius = 6371 # 地球半径,单位为公里 grid_size = distance_threshold / earth_radius return grid_size def get_grid_key(latitude, longitude, grid_size): # 根据经纬度和网格大小计算网格键 return (math.floor(latitude / grid_size), math.floor(longitude / grid_size)) def calculate_distance(video1, video2): # 使用 Haversine 公式计算两点之间的距离 from math import radians, sin, cos, sqrt, atan2 R = 6371 # 地球半径(公里) lat1, lon1 = radians(video1.latitude), radians(video1.longitude) lat2, lon2 = radians(video2.latitude), radians(video2.longitude) dlat = lat2 - lat1 dlon = lon2 - lon1 a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2 c = 2 * atan2(sqrt(a), sqrt(1 - a)) return R * c def group_videos(videos, distance_threshold): grid_size = calculate_grid_size(distance_threshold) grid = defaultdict(list) groups = {} for video in videos: grid_key = get_grid_key(video.latitude, video.longitude, grid_size) grid[grid_key].append(video) for grid_key, grid_videos in grid.items(): for video in grid_videos: grouped = False for group_id, group in list(groups.items()): for v in group["videos"]: if calculate_distance(video, v) < distance_threshold: groups[group_id]["videos"].append(video) groups[group_id]["count"] += 1 grouped = True break if grouped: break if not grouped: group_id = video.cameraIndexCode groups[group_id] = {"count": 1, "videos": [video]} return groups