123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
- from common.security import valid_access_token
- from fastapi.responses import JSONResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import and_, or_
- from sqlalchemy.sql import func
- from sqlalchemy.future import select
- from common.auth_user import *
- from pydantic import BaseModel
- from database import get_db
- from typing import List
- from models import *
- from utils import *
- from utils.ry_system_util import *
- from utils.video_util import *
- from collections import defaultdict
- import traceback
- from concurrent.futures import ThreadPoolExecutor, as_completed
- from multiprocessing import Pool, cpu_count
- import json
- import time
- import math
- router = APIRouter()
- @router.get('/video/{video_code}')
- async def get_video_url_by_id(
- video_code:str,
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- user_id=Depends(valid_access_token)
- ):
- # 大屏左下角视频及更多视频
- try:
- query = db.query(TPVideoInfo)
- query = query.filter(TPVideoInfo.gbIndexCode==video_code)
- videoIds = user_id_get_user_videoIds(db, user_id)
- video_list = [i.video_code_int for i in videoIds]
- video = query.first()
- tag_list =get_video_tag_list(db,video.gbIndexCode)
- tag = []
- tag_lable = []
- for info in tag_list:
- tag_info = get_dict_data_info(db, info.dict_type, info.dict_value)
- if tag_info:
- if tag_info.dict_label not in tag_lable and tag_info.dict_label!='全量视频':
- tag.append({"id": info.id,
- "video_code": video.gbIndexCode,
- "dict_type": info.dict_type,
- "dict_value": info.dict_value,
- "dict_label": tag_info.dict_label,
- "dict_code": tag_info.dict_code})
- tag_lable.append(tag_info.dict_label)
- data={
- "name":video.name,
- "isUserVideos":video.gbIndexCode in video_list,
- "video_code": video.gbIndexCode,
- "isTag" : len(tag_list)>0,
- "tag" : tag,
- "tagLabels" : "、".join(tag_lable),
- "status":video.status,
- "longitude":video.longitude,
- "latitude":video.latitude,
- "statusName":video.statusName,
- "regionPath":video.regionPath,
- "installPlace":video.installPlace,
- "cameraTypeName":video.cameraTypeName,
- "cameraType":video.cameraType
- }
- return {
- "code": 200,
- "msg": "操作成功",
- "data": data}
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
- @router.get("/videos")
- async def get_videos(
- zoom_level: float = Query(..., description="Zoom level for clustering"),
- latitude_min: float = Query(..., description="Minimum latitude"),
- latitude_max: float = Query(..., description="Maximum latitude"),
- longitude_min: float = Query(..., description="Minimum longitude"),
- longitude_max: float = Query(..., description="Maximum longitude"),
- dict_value: str = Query(None),
- db: Session = Depends(get_db)
- ):
- try:
- # 根据缩放级别动态调整分组粒度
- distance_threshold = 1000 / (2 ** zoom_level) # 例如:每缩放一级,距离阈值减半
- que = True
- print(time.time())
- if dict_value:
- tag_info = get_dict_data_info(db, 'video_type', dict_value)
- if tag_info:
- if tag_info.dict_label!='全量视频':
- videolist = [i.video_code for i in tag_get_video_tag_list(db,dict_value)]
- que =TPVideoInfo.gbIndexCode.in_(videolist)
- # 查询分组
- print("1",time.time())
- query = (
- select(
- TPVideoInfo.cameraIndexCode,
- TPVideoInfo.gbIndexCode,
- TPVideoInfo.pixel,
- TPVideoInfo.cameraType,
- TPVideoInfo.cameraTypeName,
- TPVideoInfo.installPlace,
- TPVideoInfo.status,
- TPVideoInfo.statusName,
- TPVideoInfo.latitude,
- TPVideoInfo.longitude,
- TPVideoInfo.name,
- TPVideoInfo.unitIndexCode,
- func.ST_AsText(TPVideoInfo.location).label("location")
- )
- .select_from(TPVideoInfo).where(
- and_(
- TPVideoInfo.latitude >= latitude_min,
- TPVideoInfo.latitude <= latitude_max,
- TPVideoInfo.longitude >= longitude_min,
- TPVideoInfo.longitude <= longitude_max,
- TPVideoInfo.longitude>0,
- TPVideoInfo.latitude>0,que
- )
- )
- .order_by(TPVideoInfo.cameraIndexCode)
- )
- result = db.execute(query)
- print("2",time.time())
- videos = result.fetchall()
- print("3",time.time())
- # 动态分组逻辑
- # groups = {}
- groups = group_videos(videos, distance_threshold)
- # for video in videos:
- # grouped = False
- # for group_id, group in list(groups.items()):
- # for v in group["videos"]:
- # distance = calculate_distance(video, v)
- # if distance < distance_threshold:
- # groups[group_id]["videos"].append(video)
- # groups[group_id]["count"] += 1
- # grouped = True
- # break
- # if grouped:
- # break
- # if not grouped:
- # group_id = video.cameraIndexCode
- # groups[group_id] = {"count": 1, "videos": [video]}
- print("4",time.time())
- return {"code": 200,
- "msg": "操作成功",
- "data": groups}
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
- def calculate_grid_size(distance_threshold):
- # 假设地球半径为6371公里,将距离阈值转换为经纬度的差值
- # 这里假设纬度变化对距离的影响较小,仅根据经度计算网格大小
- earth_radius = 6371 # 地球半径,单位为公里
- grid_size = distance_threshold / earth_radius
- return grid_size
- def get_grid_key(latitude, longitude, grid_size):
- # 根据经纬度和网格大小计算网格键
- return (math.floor(latitude / grid_size), math.floor(longitude / grid_size))
- def calculate_distance(video1, video2):
- # 使用 Haversine 公式计算两点之间的距离
- from math import radians, sin, cos, sqrt, atan2
- R = 6371 # 地球半径(公里)
- lat1, lon1 = radians(video1.latitude), radians(video1.longitude)
- lat2, lon2 = radians(video2.latitude), radians(video2.longitude)
- dlat = lat2 - lat1
- dlon = lon2 - lon1
- a = sin(dlat / 2) ** 2 + cos(lat1) * cos(lat2) * sin(dlon / 2) ** 2
- c = 2 * atan2(sqrt(a), sqrt(1 - a))
- return R * c
- def group_videos(videos, distance_threshold):
- grid_size = calculate_grid_size(distance_threshold)
- grid = defaultdict(lambda:{"count":0}) #,"list":[]
- groups = []
- for video in videos:
- grid_key = get_grid_key(video.latitude, video.longitude, grid_size)
- grid['%s-%s'%grid_key]['count']+=1
- grid['%s-%s'%grid_key]['latitude'] = (grid_key[0] + 0.5) * grid_size
- grid['%s-%s'%grid_key]['longitude'] = (grid_key[1] + 0.5) * grid_size
- # grid['%s-%s'%grid_key]['list'].append(video)
- if grid['%s-%s'%grid_key]['count']>1:
- grid['%s-%s' % grid_key]['type'] ='2'
- else:grid['%s-%s'%grid_key]['type'] ='1'
- groups = list(grid.values())
- # for group_id, group in list(grid.items()):
- # groups.append(group)
- # #使用多线程计算距离
- # def process_video(video, grid_videos, groups, distance_threshold):
- # grouped = False
- # for group_id, group in list(groups.items()):
- # for v in group["videos"]:
- # if calculate_distance(video, v) < distance_threshold:
- # groups[group_id]["videos"].append(video)
- # groups[group_id]["count"] += 1
- # grouped = True
- # break
- # if grouped:
- # break
- # if not grouped:
- # group_id = video.cameraIndexCode
- # groups[group_id] = {"count": 1, "videos": [video]}
- #
- # with ThreadPoolExecutor() as executor:
- # futures = []
- # for grid_key, grid_videos in grid.items():
- # for video in grid_videos:
- # futures.append(executor.submit(process_video, video, grid_videos, groups, distance_threshold))
- # for future in as_completed(futures):
- # future.result() # 确保所有任务完成
- # for grid_key, grid_videos in grid.items():
- # for video in grid_videos:
- # grouped = False
- # for group_id, group in list(groups.items()):
- # for v in group["videos"]:
- # if calculate_distance(video, v) < distance_threshold:
- # groups[group_id]["videos"].append(video)
- # groups[group_id]["count"] += 1
- # grouped = True
- # break
- # if grouped:
- # break
- # if not grouped:
- # group_id = video.cameraIndexCode
- # groups[group_id] = {"count": 1, "videos": [video]}
- # 使用多进程处理每个网格中的视频
- # with Pool(processes=cpu_count()) as pool:
- # for grid_key, grid_videos in grid.items():
- # # 初始化局部分组
- # partial_groups = pool.starmap(process_video, [(video, grid_videos, groups.copy(), distance_threshold) for video in grid_videos])
- # # 合并局部分组结果
- # for partial_group in partial_groups:
- # for group_id, group in partial_group.items():
- # if group_id in groups:
- # groups[group_id]["videos"].extend(group["videos"])
- # groups[group_id]["count"] += group["count"]
- # else:
- # groups[group_id] = group
- # return groups
- return groups
- def process_video(video, grid_videos, groups, distance_threshold):
- grouped = False
- for group_id, group in list(groups.items()):
- for v in group["videos"]:
- if calculate_distance(video, v) < distance_threshold:
- group["videos"].append(video)
- group["count"] += 1
- grouped = True
- break
- if grouped:
- break
- if not grouped:
- group_id = video.cameraIndexCode
- groups[group_id] = {"count": 1, "videos": [video]}
- return groups
|