Forráskód Böngészése

250402-1代码。

baoyubo 3 hónapja
szülő
commit
31370248ec

+ 6 - 1
routers/api/rainfall/__init__.py

@@ -18,9 +18,14 @@ import json
 import traceback
 from jobs.rainfall_conditions_job import get_stcd_data
 from datetime import datetime,timedelta
-
+from .rain_pits import router as rain_pits_router
+from .dzzh import router as dzzh_router
+from .chemical_company import router as chemical_company_router
 
 router = APIRouter()
+router.include_router(rain_pits_router, prefix="/rain_pits")
+router.include_router(dzzh_router, prefix="/dzzh")
+router.include_router(chemical_company_router, prefix="/chemical_company")
 
 @router.get("/info/{code}")
 async def get_pattern_info(

+ 43 - 0
routers/api/rainfall/chemical_company.py

@@ -0,0 +1,43 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
+from common.security import valid_access_token
+from sqlalchemy.orm import Session
+from sqlalchemy.sql import func
+from common.auth_user import *
+from sqlalchemy import  text
+from pydantic import BaseModel
+from common.BigDataCenterAPI import *
+from database import get_db
+from typing import List
+from models import *
+from utils import *
+from utils.spatial import *
+import json
+import traceback
+from jobs.rainfall_conditions_job import get_stcd_data
+from datetime import datetime,timedelta
+
+
+router = APIRouter()
+
+
+@router.get("/list")
+async def get_list(
+    area_name: str,
+    db: Session = Depends(get_db)
+):
+    try:
+        sql = text("""SELECT * from sharedb.chemical_company""")
+        if area_name :
+            sql.bindparams(area_name=area_name).where(text("area = :area_name"))
+        result = db.execute(sql).fetchall()
+        # 将结果转换为rain_pits.py字典列表
+        result_list = []
+        for row in result:
+            result_list.append(dict(row))
+        return result_list
+    except Exception as e:
+        traceback.print_exc()
+        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")

+ 81 - 0
routers/api/rainfall/dzzh.py

@@ -0,0 +1,81 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
+from common.security import valid_access_token
+from sqlalchemy.orm import Session
+from sqlalchemy.sql import func
+from common.auth_user import *
+from sqlalchemy import  text
+from pydantic import BaseModel
+from common.BigDataCenterAPI import *
+from database import get_db
+from typing import List
+from models import *
+from utils import *
+from utils.spatial import *
+from utils.rainfall_util import *
+import json
+import traceback
+from jobs.rainfall_conditions_job import get_stcd_data
+from datetime import datetime,timedelta
+
+
+router = APIRouter()
+
+
+@router.get("/list")
+async def get_list(
+    area_name: str= Query(None),
+    history_time:int = Query(None),
+    future_time:int = Query(None),
+    db: Session = Depends(get_db),
+    page: int = Query(1, gt=0, description='页码'),
+    pageSize: int = Query(10, gt=0, description='每页条目数量')
+):
+    try:
+        # 计算 OFFSET 值
+        offset = (page - 1) * pageSize
+        # 构造基础查询
+        base_query = text("SELECT * FROM sharedb.midmap_dzzh")
+        if area_name:
+            base_query = base_query.bindparams(area_name=area_name).where(text("area = :area_name"))
+
+        # 构造分页查询
+        paginated_query = f"{base_query} LIMIT :limit OFFSET :offset"
+        paginated_query = text(paginated_query).bindparams(limit=pageSize, offset=offset)
+
+        # 构造统计总数据量的查询
+        count_query = select(func.count()).select_from(text("sharedb.midmap_dzzh"))
+        if area_name:
+            count_query = count_query.where(text("area = :area_name")).bindparams(area_name=area_name)
+
+        # 执行统计查询并获取总数据量
+        total = db.execute(count_query).scalar()
+
+        # 执行分页查询并获取结果
+        result = db.execute(paginated_query).fetchall()
+
+        # 将结果转换为rain_pits.py字典列表
+        result_list = []
+        for row in result:
+            data = dict(row)
+            if history_time:
+                real_code = get_real_code(db, data['longitude'], data['latitude'])
+                rainfall = get_rainfall(real_code, history_time, db)
+                data['rainfall'] = rainfall
+            if future_time:
+                data['rainfall'] = 0
+            result_list.append(data)
+        return {
+            "code": 200,
+            "msg": "操作成功",
+            "data": result_list,
+            "total": total,
+            "page": page,
+            "pageSize": pageSize,
+            "totalPages": (total + pageSize - 1) // pageSize
+        }
+    except Exception as e:
+        traceback.print_exc()
+        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")

+ 80 - 0
routers/api/rainfall/rain_pits.py

@@ -0,0 +1,80 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
+from common.security import valid_access_token
+from sqlalchemy.orm import Session
+from sqlalchemy.sql import func
+from common.auth_user import *
+from sqlalchemy import  text
+from pydantic import BaseModel
+from common.BigDataCenterAPI import *
+from database import get_db
+from typing import List
+from models import *
+from utils import *
+from utils.spatial import *
+from utils.rainfall_util import *
+import json
+import traceback
+from jobs.rainfall_conditions_job import get_stcd_data
+from datetime import datetime,timedelta
+
+
+router = APIRouter()
+
+
+@router.get("/list")
+async def get_list(
+    area_name: str = Query(None),
+    history_time:int = Query(None),
+    future_time:int = Query(None),
+    db: Session = Depends(get_db),
+    page: int = Query(1, gt=0, description='页码'),
+    pageSize: int = Query(10, gt=0, description='每页条目数量')
+):
+    try:
+        # 计算 OFFSET 值
+        offset = (page - 1) * pageSize
+        # 构造基础查询
+        base_query = text("SELECT * FROM sharedb.govdata_rain_pits")
+        if area_name:
+            base_query = base_query.bindparams(area_name=area_name).where(text("district = :area_name"))
+
+        # 构造分页查询
+        paginated_query = f"{base_query} LIMIT :limit OFFSET :offset"
+        paginated_query = text(paginated_query).bindparams(limit=pageSize, offset=offset)
+
+        # 构造统计总数据量的查询
+        count_query = select(func.count()).select_from(text("sharedb.govdata_rain_pits"))
+        if area_name:
+            count_query = count_query.where(text("district = :area_name")).bindparams(area_name=area_name)
+
+        # 执行统计查询并获取总数据量
+        total = db.execute(count_query).scalar()
+
+        # 执行分页查询并获取结果
+        result = db.execute(paginated_query).fetchall()
+        # 将结果转换为rain_pits.py字典列表
+        result_list = []
+        for row in result:
+            data = dict(row)
+            if history_time:
+                real_code = get_real_code(db,data['longitude'],data['latitude'])
+                rainfall = get_rainfall(real_code,history_time,db)
+                data['rainfall'] = rainfall
+            if future_time:
+                data['rainfall'] = 0
+            result_list.append(data)
+        return {
+            "code": 200,
+            "msg": "操作成功",
+            "data": result_list,
+            "total": total,
+            "page": page,
+            "pageSize": pageSize,
+            "totalPages": (total + pageSize - 1) // pageSize
+        }
+    except Exception as e:
+        traceback.print_exc()
+        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")

+ 55 - 0
routers/api/videoResource/location/__init__.py

@@ -27,6 +27,61 @@ import math
 router = APIRouter()
 
 
+
+@router.get('/video/{video_code}')
+async def get_video_url_by_id(
+    video_code:str,
+    db: Session = Depends(get_db),
+    body=Depends(remove_xss_json),
+    user_id=Depends(valid_access_token)
+):
+    # 大屏左下角视频及更多视频
+    try:
+        query = db.query(TPVideoInfo)
+        query = query.filter(TPVideoInfo.gbIndexCode==video_code)
+        videoIds = user_id_get_user_videoIds(db, user_id)
+        video_list = [i.video_code_int for i in videoIds]
+        video = query.first()
+        tag_list =get_video_tag_list(db,video.gbIndexCode)
+        tag = []
+        tag_lable = []
+        for info in tag_list:
+            tag_info = get_dict_data_info(db, info.dict_type, info.dict_value)
+            if tag_info:
+                if tag_info.dict_label not in tag_lable and tag_info.dict_label!='全量视频':
+                    tag.append({"id": info.id,
+                                "video_code": video.gbIndexCode,
+                                "dict_type": info.dict_type,
+                                "dict_value": info.dict_value,
+                                "dict_label": tag_info.dict_label,
+                                "dict_code": tag_info.dict_code})
+                    tag_lable.append(tag_info.dict_label)
+        data={
+            "name":video.name,
+            "isUserVideos":video.gbIndexCode in video_list,
+            "video_code": video.gbIndexCode,
+            "isTag" : len(tag_list)>0,
+            "tag" : tag,
+            "tagLabels" : "、".join(tag_lable),
+            "status":video.status,
+            "longitude":video.longitude,
+            "latitude":video.latitude,
+            "statusName":video.statusName,
+            "regionPath":video.regionPath,
+            "installPlace":video.installPlace,
+            "cameraTypeName":video.cameraTypeName,
+            "cameraType":video.cameraType
+        }
+        return {
+            "code": 200,
+            "msg": "操作成功",
+            "data": data}
+    except Exception as e:
+        # 处理异常
+        traceback.print_exc()
+        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
+
+
 @router.get("/videos")
 async def get_videos(
         zoom_level: float = Query(..., description="Zoom level for clustering"),

+ 2 - 2
routers/api/videoResource/videoinfo.py

@@ -124,7 +124,7 @@ async def get_video_url_by_id(
             tag = []
             tag_lable = []
             for info in tag_list:
-                tag_info = get_dict_data_info(db, info.dict_type, info.dict_value)
+                tag_info = get_dict_data_info(db, 'video_type', info.dict_value)
                 if tag_info:
                     if tag_info.dict_label not in tag_lable and tag_info.dict_label!='全量视频':
                         tag.append({"id": info.id,
@@ -606,7 +606,7 @@ async def get_video_tag_info(
         tag = []
         tag_lable = []
         for info in get_video_tag_list(db,video_code):
-            tag_info = get_dict_data_info(db,info.dict_type,info.dict_value)
+            tag_info = get_dict_data_info(db,'video_type',info.dict_value)
             if tag_info:
                 if tag_info.dict_label not in tag_lable:
                     tag.append({"id":info.id,

+ 64 - 0
utils/rainfall_util.py

@@ -0,0 +1,64 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
+from common.security import valid_access_token
+from fastapi.responses import JSONResponse
+from sqlalchemy.orm import Session
+from sqlalchemy import and_, or_,text
+from sqlalchemy.sql import func
+from sqlalchemy.future import select
+from common.auth_user import *
+from pydantic import BaseModel
+from database import get_db
+from typing import List
+from models import *
+from utils import *
+from utils.ry_system_util import *
+from utils.video_util import *
+from collections import defaultdict
+import traceback
+from concurrent.futures import ThreadPoolExecutor, as_completed
+from jobs.rainfall_conditions_job import get_stcd_data
+from multiprocessing import Pool, cpu_count
+import json
+import time
+import math
+
+def get_real_code(db,longitude,latitude):
+    sql = text("""
+            SELECT area_name, `code`, longitude, latitude, distance FROM (
+                SELECT 
+                    area_name, 
+                    `code`, 
+                    longitude, 
+                    latitude,
+                    ST_Distance_Sphere(
+                        ST_GeomFromText(CONCAT('POINT(', longitude, ' ', latitude, ')')), 
+                        ST_GeomFromText(:point)
+                    ) AS distance
+                FROM govdata_real_time_address 
+                WHERE longitude IS NOT NULL AND latitude IS NOT NULL 
+                ORDER BY distance ASC 
+                LIMIT 1
+            ) T
+        """).bindparams(point=f"POINT({longitude} {latitude})")
+    # 执行查询
+    result = db.execute(sql).fetchone()
+
+    # 处理结果
+    if result:
+        return dict(result)['code']
+    else:
+        return None
+
+def get_rainfall(
+    code: str,num:int,
+    db: Session = Depends(get_db)
+):
+    value=0
+    rainfulldata = get_stcd_data(code,num+1)
+    for i in rainfulldata:
+        value += i['F3070220000034_000018005']
+
+    return value