emergency.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
  4. from common.security import valid_access_token
  5. from sqlalchemy.orm import Session
  6. from sqlalchemy.sql import func
  7. from common.auth_user import *
  8. from sqlalchemy import text
  9. from pydantic import BaseModel
  10. from common.BigDataCenterAPI import *
  11. from database import get_db
  12. from typing import List
  13. from utils import *
  14. from utils.risk import *
  15. from utils.StripTagsHTMLParser import *
  16. import json
  17. import traceback
  18. from sqlalchemy import text, exists, and_, or_, not_
  19. router = APIRouter()
  20. @router.post('/get_emergency_response_list')
  21. async def get_emergency_response_list(
  22. request: Request,
  23. body: dict = Depends(remove_xss_json),
  24. db: Session = Depends(get_db)
  25. ):
  26. response_type = get_req_param(body, "response_type")
  27. data = []
  28. rows = db.query(GovdataEmergencyResponse).filter(and_(GovdataEmergencyResponse.response_type == response_type, GovdataEmergencyResponse.del_flag == '0', GovdataEmergencyResponse.response_status == '0')).all()
  29. for row in rows:
  30. data.append({
  31. "area": row.response_addr,
  32. "level": roman_to_int(row.response_level),
  33. "time": row.response_time,
  34. "maxLevel": "",
  35. "maxLevelTime": ""
  36. })
  37. rows = db.query(GovdataEmergencyResponse).filter(and_(GovdataEmergencyResponse.response_type == response_type, GovdataEmergencyResponse.del_flag == '0', GovdataEmergencyResponse.response_status != '0')).all()
  38. for row in rows:
  39. for i in range(len(data)):
  40. if row.response_addr == data[i]['area']:
  41. if roman_to_int(row.response_level) > data[i]["level"]:
  42. data[i]['maxLevel'] = roman_to_int(row.response_level)
  43. data[i]['maxLevelTime'] = row.response_time
  44. break
  45. return {
  46. "code": 200,
  47. "msg": "成功",
  48. "data": data
  49. }
  50. def roman_to_int(val: str):
  51. if val == 'Ⅰ':
  52. return '1'
  53. elif val == 'Ⅱ':
  54. return '2'
  55. elif val == 'Ⅲ':
  56. return '3'
  57. elif val == 'Ⅳ':
  58. return '4'
  59. else:
  60. return '0'
  61. @router.get('/get_mms_area_info')
  62. async def mine(request: Request,response_type: str = '0', body = Depends(remove_xss_json),db: Session = Depends(get_db)):
  63. try:
  64. with open('/home/python3/xh_twapi01/routers/api/spatialAnalysis/茂名市.json','r', encoding='utf-8') as f:
  65. data = json.load(f)
  66. features = data['features']
  67. result = [{'name': 'Ⅳ级','color': '#ff2f3c',"points":[]},
  68. {'name': 'Ⅲ级', 'color': '#ffaf00',"points":[]},
  69. {'name': 'Ⅱ级', 'color': '#ffd800',"points":[]},
  70. {'name': 'Ⅰ级', 'color': '#40c75f',"points":[]},]
  71. # 格式
  72. adcode_dit = {
  73. "Ⅳ级":[],
  74. "Ⅲ级":[],
  75. "Ⅱ级":[],
  76. "Ⅰ级":[]
  77. }
  78. rows = db.query(GovdataEmergencyResponse).filter(and_(GovdataEmergencyResponse.response_type == response_type, GovdataEmergencyResponse.del_flag == '0', GovdataEmergencyResponse.response_status == '0')).all()
  79. for row in rows:
  80. area_code = row.area_code
  81. level = row.response_level + "级"
  82. if area_code not in adcode_dit[level]:
  83. adcode_dit[level].append(area_code)
  84. area_data = {}
  85. for feature in features:
  86. properties = feature['properties']
  87. area_code = properties['adcode']
  88. geometry = feature['geometry']
  89. coordinates = geometry['coordinates']
  90. area_data[str(area_code)]=[]
  91. for i in coordinates:
  92. area_data[str(area_code)] += i
  93. for i in result:
  94. for x in adcode_dit[i['name']]:
  95. i['points']+=area_data[x]
  96. return {
  97. "code": 200,
  98. "msg": "成功",
  99. "data": result}
  100. except Exception as e:
  101. traceback.print_exc()
  102. raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")