__init__.py 9.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from common.BigDataCenterAPI import *
  4. from models import *
  5. from sqlalchemy import text
  6. from shapely.geometry import Polygon, MultiPolygon
  7. from shapely.ops import unary_union
  8. import json
  9. def get_town_list(locations,):
  10. # 初始化一个空的MultiPolygon来容纳所有多边形
  11. multi_polygon = MultiPolygon()
  12. # 遍历每个位置,创建多边形并添加到multi_polygon中
  13. for location in locations:
  14. # 将边界列表转换为Polygon
  15. polygon = Polygon([(item['x'], item['y']) for item in location])
  16. multi_polygon = multi_polygon.union(polygon)
  17. # 将GeoJSON数据转换为字典
  18. with open('/home/python3/zj_geojson.json', 'r', encoding='utf-8') as file:
  19. geojson = json.load(file)
  20. # 假设GeoJSON数据是一个FeatureCollection
  21. features = geojson.get('features', [])
  22. # 初始化一个空列表来存储结果
  23. intersected_names_and_pacs = []
  24. # 遍历GeoJSON中的每个Feature,计算交集
  25. for feature in features:
  26. geom = feature['geometry']
  27. if 'coordinates' in geom:
  28. # 将GeoJSON Polygon转换为shapely Polygon
  29. if geom['type'] == 'Polygon':
  30. polygon = Polygon(geom['coordinates'][0])
  31. intersection = polygon.intersection(multi_polygon)
  32. elif geom['type'] == 'MultiPolygon':
  33. multi_polygon_feature = MultiPolygon([Polygon(coords[0]) for coords in geom['coordinates']])
  34. intersection = multi_polygon_feature.intersection(multi_polygon)
  35. else:
  36. continue # 跳过非Polygon和非MultiPolygon类型的几何对象
  37. if not intersection.is_empty:
  38. properties = feature['properties']
  39. intersected_names_and_pacs.append({
  40. "townName": properties.get('NAME', ''),
  41. "code": properties.get('PAC', ''),
  42. "populationSize": 0, # 假设值,需要从数据中获取
  43. "areaSize": round(intersection.area, 2), # 交集区域的面积
  44. "GDP": 0 # 假设值,需要从数据中获取
  45. })
  46. return intersected_names_and_pacs, len(intersected_names_and_pacs)
  47. def get_town_village_list(locations,db):
  48. # 初始化一个空的MultiPolygon来容纳所有多边形
  49. multi_polygon = MultiPolygon()
  50. # 遍历每个位置,创建多边形并添加到multi_polygon中
  51. for location in locations:
  52. # 将边界列表转换为Polygon
  53. polygon = Polygon([(item['x'], item['y']) for item in location])
  54. multi_polygon = multi_polygon.union(polygon)
  55. intersected_towns = TpZjGeoJSONData.query.filter(
  56. db.func.ST_Intersects(TpZjGeoJSONData.geometry, multi_polygon) == True
  57. ).all()
  58. # 初始化一个空列表来存储结果
  59. intersected_names_and_pacs = []
  60. for town in intersected_towns:
  61. town_pac = town.pac[:-3]
  62. town_data = {
  63. "townName": town.name,
  64. "code": town.pac,
  65. "populationSize": 0, # 假设值,需要从数据中获取
  66. "areaSize": town.geometry.area, # 交集区域的面积
  67. "GDP": 0 # 假设值,需要从数据中获取
  68. }
  69. intersected_villages = TpCjSqGeoJSONData.query.filter(
  70. db.func.ST_Intersects(TpCjSqGeoJSONData.geometry, multi_polygon) == True
  71. ).filter(TpCjSqGeoJSONData.pac.like(f'{town_pac}%')).all()
  72. intersected_villages_names_and_pacs = []
  73. for village in intersected_villages:
  74. village_data = {
  75. "villageName": village.name,
  76. "code": village.pac,
  77. "populationSize": 0, # 假设值,需要从数据中获取
  78. "areaSize": village.geometry.area, # 交集区域的面积
  79. "GDP": 0 # 假设值,需要从数据中获取
  80. }
  81. intersected_villages_names_and_pacs.append(village_data)
  82. if len(intersected_villages_names_and_pacs)>0:
  83. town_data['children']=intersected_villages_names_and_pacs
  84. town_data['villageCount'] = len(intersected_villages_names_and_pacs)
  85. intersected_names_and_pacs.append(town_data)
  86. return intersected_names_and_pacs, len(intersected_names_and_pacs)
  87. # import geopandas as gpd
  88. # from shapely.geometry import Polygon
  89. #
  90. #
  91. #
  92. # def get_town_list(locations):
  93. # # 读取GeoJSON文件为GeoDataFrame
  94. # gdf = gpd.read_file('zj_geojson.json')
  95. # gdf = gdf.set_crs("EPSG:4326", allow_override=True)
  96. #
  97. # # 初始化一个空的GeoDataFrame来容纳所有多边形
  98. # multi_polygon_gdf = gpd.GeoDataFrame(crs=gdf.crs)
  99. #
  100. # # 遍历每个位置,创建多边形并添加到multi_polygon_gdf中
  101. # for location in locations:
  102. # # 将边界列表转换为Polygon
  103. # polygon = Polygon([(item['x'], item['y']) for item in location])
  104. # # 将多边形添加到multi_polygon_gdf中
  105. # multi_polygon_gdf = multi_polygon_gdf.append(gpd.GeoDataFrame([1], geometry=[polygon], crs=gdf.crs))
  106. #
  107. # # 使用overlay函数来找出相交的区域
  108. # intersected = gpd.overlay(gdf, multi_polygon_gdf, how='intersection')
  109. #
  110. # # 获取相交区域的名称和PAC
  111. # intersected_names_and_pacs = [{"name": row['NAME'], "pac": row['PAC'],"populationSize":0,"areaSize":0,"GDP":0} for index, row in intersected.iterrows() if 'NAME' in row and 'PAC' in row]
  112. #
  113. # return intersected_names_and_pacs,len(intersected_names_and_pacs)
  114. def convert_to_polygon(points):
  115. # 将点的列表转换为POLYGON格式的字符串
  116. polygon_str = "POLYGON(("
  117. for point in points:
  118. # 假设点的顺序是经度(x),纬度(y)
  119. polygon_str += f"{point['y']} {point['x']}, "
  120. # 移除最后一个逗号和空格,然后添加闭合点和结束括号
  121. polygon_str = polygon_str.rstrip(", ") + f", {points[0]['y']} {points[0]['x']}))"
  122. return polygon_str
  123. def count_town_village(location_list:list,db):
  124. town_count = 0
  125. town_list = []
  126. village_count = 0
  127. village_list = []
  128. result = []
  129. url = 'https://19.15.75.180:8581/GatewayMsg/http/api/proxy/invoke'
  130. service_code= 'YZT1685418808667'
  131. service_info = db.query(OneShareApiEntity).filter(OneShareApiEntity.servercode == service_code).first()
  132. signTime = str(GetTime() // 1000)
  133. nonce = GetNonce(5)
  134. sign = GetSign(signTime, nonce, service_info.passtoken)
  135. headers = {
  136. # 'Content-Type': 'application/json',
  137. 'x-tif-signature': sign,
  138. 'x-tif-timestamp': signTime,
  139. 'x-tif-nonce': nonce,
  140. 'x-tif-paasid': service_info.passid,
  141. 'x-tif-serviceId': service_code
  142. }
  143. response = requests.post(url=url, headers=headers, json=location_list, verify=False)
  144. if response.status_code==200:
  145. data_list = response.json()['data']
  146. for data in data_list:
  147. township = data['townshipCode']
  148. if township not in town_list:
  149. town_count+=1
  150. town_list.append(township)
  151. # result.append({'township':data['township'],"townshipCode":data['townshipCode'],"villages":[]})
  152. result.append({'township':data['township'],"townshipCode":data['townshipCode'],"village":'-',"villageCode":'-',"populationSize":0,"areaSize":0,"GDP":0})
  153. village = data['villageCode']
  154. if village not in village_list:
  155. village_count+=1
  156. village_list.append(village)
  157. # for town in result:
  158. # if town['townshipCode']==data['townshipCode']:
  159. # town["villages"].append({'village': data['village'], "villageCode": data['villageCode']})
  160. result.append({'township':data['township'],"townshipCode":data['townshipCode'],'village': data['village'], "villageCode": data['villageCode'],"populationSize":0,"areaSize":0,"GDP":0})
  161. return result,town_count,village_count
  162. def count_emergency_expert(location_list:list,db):
  163. location = convert_to_polygon(location_list)
  164. sql = text(f"""SELECT * FROM emergency_expert WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""")
  165. return len(db.execute(sql).all())
  166. def count_emergency_management(location_list: list, db):
  167. location = convert_to_polygon(location_list)
  168. sql = text(f"""SELECT DISTINCT management_unit FROM `rescue_materia` WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""")
  169. return len(db.execute(sql).all())
  170. def get_hospital_list(location_list:list,db):
  171. resutl = []
  172. for location in location_list:
  173. location = convert_to_polygon(location)
  174. sql = text(f"""SELECT hospital_name as `name`,longitude,latitude,6 AS `dataType` FROM mid_hospital WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""")
  175. resutl+=db.execute(sql).all()
  176. return resutl
  177. def get_emergency_shelter_list(location_list:list,db):
  178. resutl = []
  179. for location in location_list:
  180. location = convert_to_polygon(location)
  181. sql = text(f"""SELECT shelter_name as `name`,lng as longitude,lat as latitude,3 AS `dataType` FROM mid_emergency_shelter WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', lat, ' ', lng, ')'), 4326))""")
  182. resutl+=db.execute(sql).all()
  183. return resutl
  184. def get_waterlogged_roads_list(location_list:list,db):
  185. resutl = []
  186. for location in location_list:
  187. location = convert_to_polygon(location)
  188. sql = text(f"""SELECT flood_name as `name`,lng as longitude,lat as latitude,4 AS `dataType` FROM mid_waterlogged_roads WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', lat, ' ', lng, ')'), 4326))""")
  189. resutl+=db.execute(sql).all()
  190. return resutl