123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
- from common.security import valid_access_token
- from sqlalchemy.orm import Session
- from sqlalchemy.sql import func
- from common.auth_user import *
- from sqlalchemy import text
- from pydantic import BaseModel
- from common.BigDataCenterAPI import *
- from database import get_db
- from typing import List
- from utils import *
- from utils.risk import *
- from utils.StripTagsHTMLParser import *
- import json
- import traceback
- from sqlalchemy import text, exists, and_, or_, not_
- router = APIRouter()
- @router.post('/get_emergency_response_list')
- async def get_emergency_response_list(
- request: Request,
- body: dict = Depends(remove_xss_json),
- db: Session = Depends(get_db)
- ):
- response_type = get_req_param(body, "response_type")
- data = []
- rows = db.query(GovdataEmergencyResponse).filter(and_(GovdataEmergencyResponse.response_type == response_type, GovdataEmergencyResponse.del_flag == '0', GovdataEmergencyResponse.response_status == '0')).all()
- for row in rows:
- data.append({
- "area": row.response_addr,
- "level": roman_to_int(row.response_level),
- "time": row.response_time,
- "maxLevel": "",
- "maxLevelTime": ""
- })
- rows = db.query(GovdataEmergencyResponse).filter(and_(GovdataEmergencyResponse.response_type == response_type, GovdataEmergencyResponse.del_flag == '0', GovdataEmergencyResponse.response_status != '0')).all()
- for row in rows:
- for i in range(len(data)):
- if row.response_addr == data[i]['area']:
- if roman_to_int(row.response_level) > data[i]["level"]:
- data[i]['maxLevel'] = roman_to_int(row.response_level)
- data[i]['maxLevelTime'] = row.response_time
- break
- return {
- "code": 200,
- "msg": "成功",
- "data": data
- }
-
- def roman_to_int(val: str):
- if val == 'Ⅰ':
- return '1'
- elif val == 'Ⅱ':
- return '2'
- elif val == 'Ⅲ':
- return '3'
- elif val == 'Ⅳ':
- return '4'
- else:
- return '0'
-
- @router.get('/get_mms_area_info')
- async def mine(request: Request,response_type: str = '0', body = Depends(remove_xss_json),db: Session = Depends(get_db)):
- try:
- with open('/home/python3/xh_twapi01/routers/api/spatialAnalysis/茂名市.json','r', encoding='utf-8') as f:
- data = json.load(f)
- features = data['features']
- result = [{'name': 'Ⅳ级','color': '#ff2f3c',"points":[]},
- {'name': 'Ⅲ级', 'color': '#ffaf00',"points":[]},
- {'name': 'Ⅱ级', 'color': '#ffd800',"points":[]},
- {'name': 'Ⅰ级', 'color': '#40c75f',"points":[]},]
- # 格式
- adcode_dit = {
- "Ⅳ级":[],
- "Ⅲ级":[],
- "Ⅱ级":[],
- "Ⅰ级":[]
- }
- rows = db.query(GovdataEmergencyResponse).filter(and_(GovdataEmergencyResponse.response_type == response_type, GovdataEmergencyResponse.del_flag == '0', GovdataEmergencyResponse.response_status == '0')).all()
- for row in rows:
- area_code = row.area_code
- level = row.response_level + "级"
- if area_code not in adcode_dit[level]:
- adcode_dit[level].append(area_code)
- area_data = {}
- for feature in features:
- properties = feature['properties']
- area_code = properties['adcode']
- geometry = feature['geometry']
- coordinates = geometry['coordinates']
- area_data[str(area_code)]=[]
- for i in coordinates:
- area_data[str(area_code)] += i
- for i in result:
- for x in adcode_dit[i['name']]:
- i['points']+=area_data[x]
- return {
- "code": 200,
- "msg": "成功",
- "data": result}
- except Exception as e:
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|