rain_pits.py 3.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
  4. from common.security import valid_access_token
  5. from sqlalchemy.orm import Session
  6. from sqlalchemy.sql import func
  7. from common.auth_user import *
  8. from sqlalchemy import text
  9. from pydantic import BaseModel
  10. from common.BigDataCenterAPI import *
  11. from database import get_db
  12. from typing import List
  13. from models import *
  14. from utils import *
  15. from utils.spatial import *
  16. from utils.rainfall_util import *
  17. import json
  18. import traceback
  19. from jobs.rainfall_conditions_job import get_stcd_data
  20. from datetime import datetime,timedelta
  21. router = APIRouter()
  22. @router.get("/list")
  23. async def get_list(
  24. area_name: str = Query(None),
  25. keyword: str= Query(None),
  26. history_time:int = Query(None),
  27. future_time:int = Query(None),
  28. db: Session = Depends(get_db),
  29. page: int = Query(1, gt=0, description='页码'),
  30. pageSize: int = Query(10, gt=0, description='每页条目数量')
  31. ):
  32. try:
  33. # 计算 OFFSET 值
  34. offset = (page - 1) * pageSize
  35. # 构造基础查询
  36. base_sql = "SELECT * FROM sharedb.govdata_rain_pits"
  37. count_sql = "SELECT COUNT(*) FROM sharedb.govdata_rain_pits"
  38. # 添加 WHERE 条件
  39. conditions = []
  40. params = {}
  41. if area_name:
  42. conditions.append("district = :area_name")
  43. params['area_name'] = area_name
  44. if keyword:
  45. conditions.append("`name` LIKE :keyword")
  46. params['keyword'] = f"%{keyword}%"
  47. if conditions:
  48. base_sql += " WHERE " + " AND ".join(conditions)
  49. count_sql += " WHERE " + " AND ".join(conditions)
  50. count_query = text(count_sql) #.bindparams(**params)
  51. # 执行统计查询并获取总数据量
  52. total = db.execute(count_query,params).scalar()
  53. # 添加 LIMIT 和 OFFSET
  54. paginated_sql = f"{base_sql} LIMIT :limit OFFSET :offset"
  55. params['limit'] = pageSize
  56. params['offset'] = offset
  57. # 构造查询对象
  58. paginated_query = text(paginated_sql) #.bindparams(**params)
  59. # 执行分页查询并获取结果
  60. result = db.execute(paginated_query,params).fetchall()
  61. # 将结果转换为rain_pits.py字典列表
  62. result_list = []
  63. for row in result:
  64. data = dict(row)
  65. data['weather_warning_type'] = '暴雨预警'
  66. data['weather_warninglevel'] = '3'
  67. if history_time:
  68. real_code = get_real_code(db,data['longitude'],data['latitude'])
  69. rainfall = get_rainfall(real_code,history_time,db)
  70. data['rainfall'] = rainfall
  71. if future_time:
  72. data['rainfall'] = 0
  73. result_list.append(data)
  74. return {
  75. "code": 200,
  76. "msg": "操作成功",
  77. "data": result_list,
  78. "total": total,
  79. "page": page,
  80. "pageSize": pageSize,
  81. "totalPages": (total + pageSize - 1) // pageSize
  82. }
  83. except Exception as e:
  84. traceback.print_exc()
  85. raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")