123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from common.BigDataCenterAPI import *
- from models import *
- from sqlalchemy import text
- from shapely.geometry import Polygon, MultiPolygon
- from shapely.ops import unary_union
- import json
- def get_town_list(locations,):
- # 初始化一个空的MultiPolygon来容纳所有多边形
- multi_polygon = MultiPolygon()
- # 遍历每个位置,创建多边形并添加到multi_polygon中
- for location in locations:
- # 将边界列表转换为Polygon
- polygon = Polygon([(item['x'], item['y']) for item in location])
- multi_polygon = multi_polygon.union(polygon)
- # 将GeoJSON数据转换为字典
- with open('/home/python3/xh_twapi01/utils/spatial/zj_geojson.json', 'r', encoding='utf-8') as file:
- geojson = json.load(file)
- # 假设GeoJSON数据是一个FeatureCollection
- features = geojson.get('features', [])
- # 初始化一个空列表来存储结果
- intersected_names_and_pacs = []
- # 遍历GeoJSON中的每个Feature,计算交集
- for feature in features:
- geom = feature['geometry']
- if 'coordinates' in geom:
- # 将GeoJSON Polygon转换为shapely Polygon
- if geom['type'] == 'Polygon':
- polygon = Polygon(geom['coordinates'][0])
- intersection = polygon.intersection(multi_polygon)
- elif geom['type'] == 'MultiPolygon':
- multi_polygon_feature = MultiPolygon([Polygon(coords[0]) for coords in geom['coordinates']])
- intersection = multi_polygon_feature.intersection(multi_polygon)
- else:
- continue # 跳过非Polygon和非MultiPolygon类型的几何对象
- if not intersection.is_empty:
- properties = feature['properties']
- intersected_names_and_pacs.append({
- "townName": properties.get('NAME', ''),
- "code": properties.get('PAC', ''),
- "populationSize": 0, # 假设值,需要从数据中获取
- "areaSize": round(intersection.area, 2), # 交集区域的面积
- "GDP": 0 # 假设值,需要从数据中获取
- })
- return intersected_names_and_pacs, len(intersected_names_and_pacs)
- # import geopandas as gpd
- # from shapely.geometry import Polygon
- #
- #
- #
- # def get_town_list(locations):
- # # 读取GeoJSON文件为GeoDataFrame
- # gdf = gpd.read_file('zj_geojson.json')
- # gdf = gdf.set_crs("EPSG:4326", allow_override=True)
- #
- # # 初始化一个空的GeoDataFrame来容纳所有多边形
- # multi_polygon_gdf = gpd.GeoDataFrame(crs=gdf.crs)
- #
- # # 遍历每个位置,创建多边形并添加到multi_polygon_gdf中
- # for location in locations:
- # # 将边界列表转换为Polygon
- # polygon = Polygon([(item['x'], item['y']) for item in location])
- # # 将多边形添加到multi_polygon_gdf中
- # multi_polygon_gdf = multi_polygon_gdf.append(gpd.GeoDataFrame([1], geometry=[polygon], crs=gdf.crs))
- #
- # # 使用overlay函数来找出相交的区域
- # intersected = gpd.overlay(gdf, multi_polygon_gdf, how='intersection')
- #
- # # 获取相交区域的名称和PAC
- # intersected_names_and_pacs = [{"name": row['NAME'], "pac": row['PAC'],"populationSize":0,"areaSize":0,"GDP":0} for index, row in intersected.iterrows() if 'NAME' in row and 'PAC' in row]
- #
- # return intersected_names_and_pacs,len(intersected_names_and_pacs)
- def convert_to_polygon(points):
- # 将点的列表转换为POLYGON格式的字符串
- polygon_str = "POLYGON(("
- for point in points:
- # 假设点的顺序是经度(x),纬度(y)
- polygon_str += f"{point['y']} {point['x']}, "
- # 移除最后一个逗号和空格,然后添加闭合点和结束括号
- polygon_str = polygon_str.rstrip(", ") + f", {points[0]['y']} {points[0]['x']}))"
- return polygon_str
- def count_town_village(location_list:list,db):
- town_count = 0
- town_list = []
- village_count = 0
- village_list = []
- result = []
- url = 'https://19.15.75.180:8581/GatewayMsg/http/api/proxy/invoke'
- service_code= 'YZT1685418808667'
- service_info = db.query(OneShareApiEntity).filter(OneShareApiEntity.servercode == service_code).first()
- signTime = str(GetTime() // 1000)
- nonce = GetNonce(5)
- sign = GetSign(signTime, nonce, service_info.passtoken)
- headers = {
- # 'Content-Type': 'application/json',
- 'x-tif-signature': sign,
- 'x-tif-timestamp': signTime,
- 'x-tif-nonce': nonce,
- 'x-tif-paasid': service_info.passid,
- 'x-tif-serviceId': service_code
- }
- response = requests.post(url=url, headers=headers, json=location_list, verify=False)
- if response.status_code==200:
- data_list = response.json()['data']
- for data in data_list:
- township = data['townshipCode']
- if township not in town_list:
- town_count+=1
- town_list.append(township)
- # result.append({'township':data['township'],"townshipCode":data['townshipCode'],"villages":[]})
- result.append({'township':data['township'],"townshipCode":data['townshipCode'],"village":'-',"villageCode":'-',"populationSize":0,"areaSize":0,"GDP":0})
- village = data['villageCode']
- if village not in village_list:
- village_count+=1
- village_list.append(village)
- # for town in result:
- # if town['townshipCode']==data['townshipCode']:
- # town["villages"].append({'village': data['village'], "villageCode": data['villageCode']})
- result.append({'township':data['township'],"townshipCode":data['townshipCode'],'village': data['village'], "villageCode": data['villageCode'],"populationSize":0,"areaSize":0,"GDP":0})
- return result,town_count,village_count
- def count_emergency_expert(location_list:list,db):
- location = convert_to_polygon(location_list)
- sql = text(f"""SELECT * FROM emergency_expert WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""")
- return len(db.execute(sql).all())
- def count_emergency_management(location_list: list, db):
- location = convert_to_polygon(location_list)
- sql = text(f"""SELECT DISTINCT management_unit FROM `rescue_materia` WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""")
- return len(db.execute(sql).all())
- def get_hospital_list(location_list:list,db):
- resutl = []
- for location in location_list:
- location = convert_to_polygon(location)
- sql = text(f"""SELECT hospital_name as `name`,longitude,latitude,6 AS `dataType` FROM mid_hospital WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""")
- resutl+=db.execute(sql).all()
- return resutl
- def get_emergency_shelter_list(location_list:list,db):
- resutl = []
- for location in location_list:
- location = convert_to_polygon(location)
- sql = text(f"""SELECT shelter_name as `name`,lng as longitude,lat as latitude,3 AS `dataType` FROM mid_emergency_shelter WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', lat, ' ', lng, ')'), 4326))""")
- resutl+=db.execute(sql).all()
- return resutl
- def get_waterlogged_roads_list(location_list:list,db):
- resutl = []
- for location in location_list:
- location = convert_to_polygon(location)
- sql = text(f"""SELECT flood_name as `name`,lng as longitude,lat as latitude,4 AS `dataType` FROM mid_waterlogged_roads WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', lat, ' ', lng, ')'), 4326))""")
- resutl+=db.execute(sql).all()
- return resutl
|