rain_pits.py 3.3 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
  4. from common.security import valid_access_token
  5. from sqlalchemy.orm import Session
  6. from sqlalchemy.sql import func
  7. from common.auth_user import *
  8. from sqlalchemy import text
  9. from pydantic import BaseModel
  10. from common.BigDataCenterAPI import *
  11. from database import get_db
  12. from typing import List
  13. from models import *
  14. from utils import *
  15. from utils.spatial import *
  16. from utils.rainfall_util import *
  17. import json
  18. import traceback
  19. from jobs.rainfall_conditions_job import get_stcd_data
  20. from datetime import datetime,timedelta
  21. router = APIRouter()
  22. @router.get("/list")
  23. async def get_list(
  24. area_name: str = Query(None),
  25. keyword: str= Query(None),
  26. history_time:int = Query(None),
  27. future_time:int = Query(None),
  28. db: Session = Depends(get_db),
  29. page: int = Query(1, gt=0, description='页码'),
  30. pageSize: int = Query(10, gt=0, description='每页条目数量')
  31. ):
  32. try:
  33. # 计算 OFFSET 值
  34. offset = (page - 1) * pageSize
  35. # 构造基础查询
  36. base_sql = "SELECT * FROM sharedb.govdata_rain_pits"
  37. count_sql = "SELECT COUNT(*) FROM sharedb.govdata_rain_pits"
  38. # 添加 WHERE 条件
  39. conditions = []
  40. params = {}
  41. if area_name:
  42. conditions.append("district = :area_name")
  43. params['area_name'] = area_name
  44. if keyword:
  45. conditions.append("`name` LIKE :keyword")
  46. params['keyword'] = f"%{keyword}%"
  47. if conditions:
  48. base_sql += " WHERE " + " AND ".join(conditions)
  49. count_sql += " WHERE " + " AND ".join(conditions)
  50. count_query = text(count_sql) #.bindparams(**params)
  51. # 执行统计查询并获取总数据量
  52. total = db.execute(count_query,params).scalar()
  53. # 添加 LIMIT 和 OFFSET
  54. paginated_sql = f"{base_sql} LIMIT :limit OFFSET :offset"
  55. params['limit'] = pageSize
  56. params['offset'] = offset
  57. # 构造查询对象
  58. paginated_query = text(paginated_sql) #.bindparams(**params)
  59. # 执行分页查询并获取结果
  60. result = db.execute(paginated_query,params).fetchall()
  61. # 将结果转换为rain_pits.py字典列表
  62. result_list = []
  63. for row in result:
  64. data = dict(row)
  65. data['weather_warning_type'],data['weather_warninglevel'] = get_weather_warning(data['district'], db)
  66. # data['weather_warning_type'] = '暴雨预警'
  67. # data['weather_warninglevel'] = '3'
  68. if history_time:
  69. real_code = get_real_code(db,data['longitude'],data['latitude'])
  70. rainfall = get_rainfall(real_code,history_time,db)
  71. data['rainfall'] = rainfall
  72. if future_time:
  73. data['rainfall'] = 0
  74. result_list.append(data)
  75. return {
  76. "code": 200,
  77. "msg": "操作成功",
  78. "data": result_list,
  79. "total": total,
  80. "page": page,
  81. "pageSize": pageSize,
  82. "totalPages": (total + pageSize - 1) // pageSize
  83. }
  84. except Exception as e:
  85. traceback.print_exc()
  86. raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")