#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from common.security import valid_access_token from sqlalchemy.orm import Session from sqlalchemy.sql import func from common.auth_user import * from sqlalchemy import text from pydantic import BaseModel from common.BigDataCenterAPI import * from database import get_db from typing import List from models import * from utils import * import json import traceback router = APIRouter() def convert_to_polygon(points): # 将点的列表转换为POLYGON格式的字符串 polygon_str = "POLYGON((" for point in points: # 假设点的顺序是经度(x),纬度(y) polygon_str += f"{point['y']} {point['x']}, " # 移除最后一个逗号和空格,然后添加闭合点和结束括号 polygon_str = polygon_str.rstrip(", ") + f", {points[0]['y']} {points[0]['x']}))" return polygon_str def count_town_village(location_list:list,db): town_count = 0 town_list = [] village_count = 0 village_list = [] result = [] url = 'https://19.15.75.180:8581/GatewayMsg/http/api/proxy/invoke' service_code= 'YZT1685418808667' service_info = db.query(OneShareApiEntity).filter(OneShareApiEntity.servercode == service_code).first() signTime = str(GetTime() // 1000) nonce = GetNonce(5) sign = GetSign(signTime, nonce, service_info.passtoken) headers = { # 'Content-Type': 'application/json', 'x-tif-signature': sign, 'x-tif-timestamp': signTime, 'x-tif-nonce': nonce, 'x-tif-paasid': service_info.passid, 'x-tif-serviceId': service_code } response = requests.post(url=url, headers=headers, json=location_list, verify=False) if response.status_code==200: data_list = response.json()['data'] for data in data_list: township = data['townshipCode'] if township not in town_list: town_count+=1 town_list.append(township) # result.append({'township':data['township'],"townshipCode":data['townshipCode'],"villages":[]}) result.append({'township':data['township'],"townshipCode":data['townshipCode'],"village":'-',"villageCode":'-',"populationSize":0,"areaSize":0,"GDP":0}) village = data['villageCode'] if village not in village_list: village_count+=1 village_list.append(village) # for town in result: # if town['townshipCode']==data['townshipCode']: # town["villages"].append({'village': data['village'], "villageCode": data['villageCode']}) result.append({'township':data['township'],"townshipCode":data['townshipCode'],'village': data['village'], "villageCode": data['villageCode'],"populationSize":0,"areaSize":0,"GDP":0}) return result,town_count,village_count def count_emergency_expert(location_list:list,db): location = convert_to_polygon(location_list) sql = text(f"""SELECT * FROM emergency_expert WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""") return len(db.execute(sql).all()) def count_emergency_management(location_list: list, db): location = convert_to_polygon(location_list) sql = text(f"""SELECT DISTINCT management_unit FROM `rescue_materia` WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""") return len(db.execute(sql).all()) class location_c(BaseModel): x:float y:float class mine(BaseModel): location : List=[] @router.post('/get_info') async def mine(request: Request,from_data:mine,body = Depends(remove_xss_json),db: Session = Depends(get_db)): try: # 验证必需的字段 # required_fields = ['location','location_c'] # missing_fields = [field for field in required_fields if field not in body] # if missing_fields: # raise HTTPException(status_code=401, detail=f"Missing required fields: {', '.join(missing_fields)}") # 行政镇、行政村数据 town_village_data,town_count,village_count = count_town_village(from_data.location,db) emergency_expert_count = count_emergency_expert(from_data.location,db) emergency_management_count = count_emergency_management(from_data.location,db) return { "code": 200, "msg": "成功", "data": { "townVillageData":town_village_data, "townCount":town_count, "villageCount":village_count, "emergencyExpertCount":emergency_expert_count, "emergencyManagementCount":emergency_management_count } } except Exception as e: db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")