#!/usr/bin/env python3 # -*- coding: utf-8 -*- from common.BigDataCenterAPI import * from models import * from sqlalchemy import text from sqlalchemy import func from shapely.geometry import Polygon, MultiPolygon from shapely.ops import unary_union import json def convert_to_polygon(points): # 将点的列表转换为POLYGON格式的字符串 polygon_str = "POLYGON((" for point in points: # 假设点的顺序是经度(x),纬度(y) polygon_str += f"{point['y']} {point['x']}, " # 移除最后一个逗号和空格,然后添加闭合点和结束括号 polygon_str = polygon_str.rstrip(", ") + f", {points[0]['y']} {points[0]['x']}))" return polygon_str def get_town_list2(location_list:list,db): resutl = [] for location in location_list: location = convert_to_polygon(location) #,geometry sql = text(f"""SELECT DISTINCT `name`,properties,pac FROM tp_geojson_data_zj WHERE ST_Intersects(geometry,ST_PolygonFromText( '{location}', 4326 ))""") # print(sql) resutl+=db.execute(sql).all() return resutl def get_village_list(location_list:list,db,pac=''): resutl = [] for location in location_list: location = convert_to_polygon(location) #geometry, sql = text(f"""SELECT DISTINCT `name`,properties,pac,populationSize,GDP FROM (select * from tp_geojson_data_cj_sq {pac})A WHERE ST_Intersects(geometry,ST_PolygonFromText( '{location}', 4326 )) """) # print(sql) resutl+=db.execute(sql).all() return resutl def get_town_list(locations,): # 初始化一个空的MultiPolygon来容纳所有多边形 multi_polygon = MultiPolygon() # 遍历每个位置,创建多边形并添加到multi_polygon中 for location in locations: # 将边界列表转换为Polygon polygon = Polygon([(item['x'], item['y']) for item in location]) multi_polygon = multi_polygon.union(polygon) # 将GeoJSON数据转换为字典 with open('/home/python3/zj_geojson.json', 'r', encoding='utf-8') as file: geojson = json.load(file) # 假设GeoJSON数据是一个FeatureCollection features = geojson.get('features', []) # 初始化一个空列表来存储结果 intersected_names_and_pacs = [] # 遍历GeoJSON中的每个Feature,计算交集 for feature in features: geom = feature['geometry'] if 'coordinates' in geom: # 将GeoJSON Polygon转换为shapely Polygon if geom['type'] == 'Polygon': polygon = Polygon(geom['coordinates'][0]) intersection = polygon.intersection(multi_polygon) elif geom['type'] == 'MultiPolygon': multi_polygon_feature = MultiPolygon([Polygon(coords[0]) for coords in geom['coordinates']]) intersection = multi_polygon_feature.intersection(multi_polygon) else: continue # 跳过非Polygon和非MultiPolygon类型的几何对象 if not intersection.is_empty: properties = feature['properties'] intersected_names_and_pacs.append({ "townName": properties.get('NAME', ''), "code": properties.get('PAC', ''), "populationSize": 0, # 假设值,需要从数据中获取 "areaSize": round(intersection.area, 2), # 交集区域的面积 "GDP": 0 # 假设值,需要从数据中获取 }) return intersected_names_and_pacs, len(intersected_names_and_pacs) def get_town_village_list(locations,db): # 初始化一个空的MultiPolygon来容纳所有多边形 # multi_polygon = MultiPolygon() # # # 遍历每个位置,创建多边形并添加到multi_polygon中 # for location in locations: # # 将边界列表转换为Polygon # polygon = Polygon([(item['x'], item['y']) for item in location]) # multi_polygon = multi_polygon.union(polygon) # intersected_towns = db.query(TpZjGeoJSONData).filter( # func.ST_Intersects(TpZjGeoJSONData.geometry, multi_polygon) == True # ).all() intersected_towns = get_town_list2(locations,db) # 初始化一个空列表来存储结果 intersected_names_and_pacs = [] town_count = len(intersected_towns) village_count = 0 for town in intersected_towns: # town_count+=1 town_pac = town.pac properties = json.loads(town.properties) town_data = { "townName": town.name, "code": town.pac, "populationSize": 0, # 假设值,需要从数据中获取 "areaSize": properties['GEO_AREA'], # 交集区域的面积 "GDP": 0 # 假设值,需要从数据中获取 } # intersected_villages = db.query(TpCjSqGeoJSONData).filter( # func.ST_Intersects(TpCjSqGeoJSONData.geometry, multi_polygon) == True # ).filter(TpCjSqGeoJSONData.pac.like(f'{town_pac}%')).all() intersected_villages = get_village_list(locations,db,pac=f""" where pac like '{town_pac}%'""") intersected_villages_names_and_pacs = [] for village in intersected_villages: town_data['populationSize']+=village.populationSize town_data['GDP']+=village.GDP # properties = json.loads(village.properties) village_data = { "villageName": village.name, "code": village.pac, "populationSize": village.populationSize, # 假设值,需要从数据中获取 "areaSize": properties['GEO_AREA'], # 交集区域的面积 0,# "GDP": village.GDP # 假设值,需要从数据中获取 } intersected_villages_names_and_pacs.append(village_data) villageCount= len(intersected_villages_names_and_pacs) if villageCount>0: town_data['children']=intersected_villages_names_and_pacs town_data['villageCount'] =villageCount village_count += villageCount intersected_names_and_pacs.append(town_data) return intersected_names_and_pacs, town_count,village_count # import geopandas as gpd # from shapely.geometry import Polygon # # # # def get_town_list(locations): # # 读取GeoJSON文件为GeoDataFrame # gdf = gpd.read_file('zj_geojson.json') # gdf = gdf.set_crs("EPSG:4326", allow_override=True) # # # 初始化一个空的GeoDataFrame来容纳所有多边形 # multi_polygon_gdf = gpd.GeoDataFrame(crs=gdf.crs) # # # 遍历每个位置,创建多边形并添加到multi_polygon_gdf中 # for location in locations: # # 将边界列表转换为Polygon # polygon = Polygon([(item['x'], item['y']) for item in location]) # # 将多边形添加到multi_polygon_gdf中 # multi_polygon_gdf = multi_polygon_gdf.append(gpd.GeoDataFrame([1], geometry=[polygon], crs=gdf.crs)) # # # 使用overlay函数来找出相交的区域 # intersected = gpd.overlay(gdf, multi_polygon_gdf, how='intersection') # # # 获取相交区域的名称和PAC # intersected_names_and_pacs = [{"name": row['NAME'], "pac": row['PAC'],"populationSize":0,"areaSize":0,"GDP":0} for index, row in intersected.iterrows() if 'NAME' in row and 'PAC' in row] # # return intersected_names_and_pacs,len(intersected_names_and_pacs) def count_town_village(location_list:list,db): town_count = 0 town_list = [] village_count = 0 village_list = [] result = [] url = 'https://19.15.75.180:8581/GatewayMsg/http/api/proxy/invoke' service_code= 'YZT1685418808667' service_info = db.query(OneShareApiEntity).filter(OneShareApiEntity.servercode == service_code).first() signTime = str(GetTime() // 1000) nonce = GetNonce(5) sign = GetSign(signTime, nonce, service_info.passtoken) headers = { # 'Content-Type': 'application/json', 'x-tif-signature': sign, 'x-tif-timestamp': signTime, 'x-tif-nonce': nonce, 'x-tif-paasid': service_info.passid, 'x-tif-serviceId': service_code } response = requests.post(url=url, headers=headers, json=location_list, verify=False) if response.status_code==200: data_list = response.json()['data'] for data in data_list: township = data['townshipCode'] if township not in town_list: town_count+=1 town_list.append(township) # result.append({'township':data['township'],"townshipCode":data['townshipCode'],"villages":[]}) result.append({'township':data['township'],"townshipCode":data['townshipCode'],"village":'-',"villageCode":'-',"populationSize":0,"areaSize":0,"GDP":0}) village = data['villageCode'] if village not in village_list: village_count+=1 village_list.append(village) # for town in result: # if town['townshipCode']==data['townshipCode']: # town["villages"].append({'village': data['village'], "villageCode": data['villageCode']}) result.append({'township':data['township'],"townshipCode":data['townshipCode'],'village': data['village'], "villageCode": data['villageCode'],"populationSize":0,"areaSize":0,"GDP":0}) return result,town_count,village_count def count_emergency_expert(location_list:list,db): location = convert_to_polygon(location_list) sql = text(f"""SELECT * FROM emergency_expert WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""") return len(db.execute(sql).all()) def count_emergency_management(location_list: list, db): location = convert_to_polygon(location_list) sql = text(f"""SELECT DISTINCT management_unit FROM `rescue_materia` WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""") return len(db.execute(sql).all()) def get_hospital_list(location_list:list,db): resutl = [] for location in location_list: location = convert_to_polygon(location) sql = text(f"""SELECT hospital_name as `name`,longitude,latitude,6 AS `dataType` FROM mid_hospital WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""") resutl+=db.execute(sql).all() return resutl def get_emergency_shelter_list(location_list:list,db): resutl = [] for location in location_list: location = convert_to_polygon(location) sql = text(f"""SELECT shelter_name as `name`,lng as longitude,lat as latitude,3 AS `dataType` FROM mid_emergency_shelter WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', lat, ' ', lng, ')'), 4326))""") resutl+=db.execute(sql).all() return resutl def get_waterlogged_roads_list(location_list:list,db): resutl = [] for location in location_list: location = convert_to_polygon(location) sql = text(f"""SELECT flood_name as `name`,lng as longitude,lat as latitude,4 AS `dataType` FROM mid_waterlogged_roads WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', lat, ' ', lng, ')'), 4326))""") resutl+=db.execute(sql).all() return resutl def get_point_list(location_list:list,db): resutl = [] for location in location_list: location = convert_to_polygon(location) sql = text(f"""SELECT `id` ,`name`, longitude, latitude, `dataType` FROM point_data WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""") resutl+=db.execute(sql).all() return resutl