فهرست منبع

250403-1代码。

baoyubo 1 ماه پیش
والد
کامیت
b278cca53f
2فایلهای تغییر یافته به همراه128 افزوده شده و 9 حذف شده
  1. 123 9
      routers/api/rainfall/chemical_company.py
  2. 5 0
      utils/video_util.py

+ 123 - 9
routers/api/rainfall/chemical_company.py

@@ -14,6 +14,7 @@ from typing import List
 from models import *
 from utils import *
 from utils.spatial import *
+from utils.rainfall_util import *
 import json
 import traceback
 from jobs.rainfall_conditions_job import get_stcd_data
@@ -25,19 +26,132 @@ router = APIRouter()
 
 @router.get("/list")
 async def get_list(
-    area_name: str,
-    db: Session = Depends(get_db)
+    area_name: str = Query(None),
+    company_type:str = Query(None),
+    keyword: str= Query(None),
+    history_time:int = Query(None),
+    future_time:int = Query(None),
+    db: Session = Depends(get_db),
+    page: int = Query(1, gt=0, description='页码'),
+    pageSize: int = Query(10, gt=0, description='每页条目数量')
 ):
     try:
-        sql = text("""SELECT * from sharedb.chemical_company""")
-        if area_name :
-            sql.bindparams(area_name=area_name).where(text("area = :area_name"))
-        result = db.execute(sql).fetchall()
+        # 计算 OFFSET 值
+        offset = (page - 1) * pageSize
+        # 构造基础查询
+        base_sql = "SELECT * FROM sharedb.chemical_company"
+        count_sql = "SELECT COUNT(*) FROM sharedb.chemical_company"
+
+        # 添加 WHERE 条件
+        conditions = []
+        params = {}
+        if area_name:
+            conditions.append("area = :area_name")
+            params['area_name'] = area_name
+        if keyword:
+            conditions.append("`company_name` LIKE :keyword")
+            params['keyword'] = f"%{keyword}%"
+        if company_type:
+            conditions.append("`company_type` LIKE :company_type")
+            params['company_type'] = f"%{company_type}%"
+
+        if conditions:
+            base_sql += " WHERE " + " AND ".join(conditions)
+            count_sql += " WHERE " + " AND ".join(conditions)
+
+        count_query = text(count_sql) #.bindparams(**params)
+
+        # 执行统计查询并获取总数据量
+        total = db.execute(count_query,params).scalar()
+
+        # 添加 LIMIT 和 OFFSET
+        paginated_sql = f"{base_sql} LIMIT :limit OFFSET :offset"
+        params['limit'] = pageSize
+        params['offset'] = offset
+
+        # 构造查询对象
+        paginated_query = text(paginated_sql) #.bindparams(**params)
+        # 执行分页查询并获取结果
+        result = db.execute(paginated_query,params).fetchall()
         # 将结果转换为rain_pits.py字典列表
         result_list = []
         for row in result:
-            result_list.append(dict(row))
-        return result_list
+            data = dict(row)
+            data['have_video'] = False
+            data['video_unit_indexcode'] = ''
+            company_name = data['company_name']
+            video_unit_info = unitName_get_video_region_info(db,company_name)
+            if video_unit_info:
+                data['have_video'] = True
+                data['video_unit_indexcode'] = video_unit_info.indexCode
+            data['weather_warning_type'] = '暴雨预警'
+            data['weather_warninglevel'] = '3'
+            if history_time:
+                real_code = get_real_code(db,data['longitude'],data['latitude'])
+                rainfall = get_rainfall(real_code,history_time,db)
+                data['rainfall'] = rainfall
+            if future_time:
+                data['rainfall'] = 0
+            result_list.append(data)
+        return {
+            "code": 200,
+            "msg": "操作成功",
+            "data": result_list,
+            "total": total,
+            "page": page,
+            "pageSize": pageSize,
+            "totalPages": (total + pageSize - 1) // pageSize
+        }
+    except Exception as e:
+        traceback.print_exc()
+        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
+
+
+@router.get('/get_video_list')
+async def get_video_list(
+    video_unit_indexcode: str = Query(None, description='所属单位索引码'),
+    db: Session = Depends(get_db),
+    page: int = Query(1, gt=0, description='页码'),
+    pageSize: int = Query(10, gt=0, description='每页条目数量')
+):
+    # 大屏左下角视频及更多视频
+    try:
+        query = db.query(TPVideoInfo)
+
+        if video_unit_indexcode:
+            query = query.filter(TPVideoInfo.regionPath.like(f'%{video_unit_indexcode}%'))
+        else:
+            return {
+            "code": 200,
+            "msg": "操作成功",
+            "data": None}
+        total_items = query.count()
+        # query = query.order_by()
+        videos = query.offset(pageSize * (page - 1)).limit(pageSize).all()
+        data = []
+        for row in videos:
+            # row = row[0]
+            # print(type(row),row[0])
+            data.append({
+                "name":row.name,
+                "video_code": row.gbIndexCode,
+                "status":row.status,
+                "statusName":row.statusName,
+                "regionPath":row.regionPath,
+                "installPlace":row.installPlace,
+                "cameraTypeName":row.cameraTypeName,
+                "cameraType":row.cameraType
+            })
+        return {
+            "code": 200,
+            "msg": "操作成功",
+            "data": data,
+            "total": total_items,
+            "page": page,
+            "pageSize": pageSize,
+            "totalPages": (total_items + pageSize - 1) // pageSize
+        }
     except Exception as e:
+        # 处理异常
         traceback.print_exc()
-        raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
+        raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))

+ 5 - 0
utils/video_util.py

@@ -17,4 +17,9 @@ def tag_get_video_tag_list(db,dict_value:str):
 def unitIndexCode_get_video_region_info(db,unitIndexCode:str):
     query = db.query(TPVideoRegion)
     query = query.filter(TPVideoRegion.indexCode == unitIndexCode)
+    return query.first()
+
+def unitName_get_video_region_info(db,name:str):
+    query = db.query(TPVideoRegion)
+    query = query.filter(TPVideoRegion.name == name)
     return query.first()