Quellcode durchsuchen

使用geojson实现空间分析

baoyubo vor 10 Monaten
Ursprung
Commit
94c0f87d6a
2 geänderte Dateien mit 163 neuen und 73 gelöschten Zeilen
  1. 7 73
      routers/api/spatialAnalysis/__init__.py
  2. 156 0
      utils/spatial/__init__.py

+ 7 - 73
routers/api/spatialAnalysis/__init__.py

@@ -13,81 +13,14 @@ from database import get_db
 from typing import List
 from models import *
 from utils import *
+from utils.spatial import *
 import json
 import traceback
 
 router = APIRouter()
 
 
-def convert_to_polygon(points):
-    # 将点的列表转换为POLYGON格式的字符串
-    polygon_str = "POLYGON(("
-    for point in points:
-        # 假设点的顺序是经度(x),纬度(y)
-        polygon_str += f"{point['y']} {point['x']}, "
-    # 移除最后一个逗号和空格,然后添加闭合点和结束括号
-    polygon_str = polygon_str.rstrip(", ") + f", {points[0]['y']} {points[0]['x']}))"
-    return polygon_str
 
-def count_town_village(location_list:list,db):
-    town_count = 0
-    town_list = []
-    village_count = 0
-    village_list = []
-    result = []
-
-    url = 'https://19.15.75.180:8581/GatewayMsg/http/api/proxy/invoke'
-    service_code= 'YZT1685418808667'
-    service_info = db.query(OneShareApiEntity).filter(OneShareApiEntity.servercode == service_code).first()
-
-    signTime = str(GetTime() // 1000)
-    nonce = GetNonce(5)
-    sign = GetSign(signTime, nonce, service_info.passtoken)
-    headers = {
-        # 'Content-Type': 'application/json',
-        'x-tif-signature': sign,
-        'x-tif-timestamp': signTime,
-        'x-tif-nonce': nonce,
-        'x-tif-paasid': service_info.passid,
-        'x-tif-serviceId': service_code
-    }
-    response = requests.post(url=url, headers=headers, json=location_list, verify=False)
-    if response.status_code==200:
-        data_list = response.json()['data']
-
-        for data in data_list:
-            township = data['townshipCode']
-            if township not in town_list:
-                town_count+=1
-                town_list.append(township)
-                # result.append({'township':data['township'],"townshipCode":data['townshipCode'],"villages":[]})
-                result.append({'township':data['township'],"townshipCode":data['townshipCode'],"village":'-',"villageCode":'-',"populationSize":0,"areaSize":0,"GDP":0})
-            village = data['villageCode']
-            if village not in village_list:
-                village_count+=1
-                village_list.append(village)
-                # for town in result:
-                #     if town['townshipCode']==data['townshipCode']:
-                #         town["villages"].append({'village': data['village'], "villageCode": data['villageCode']})
-
-                result.append({'township':data['township'],"townshipCode":data['townshipCode'],'village': data['village'], "villageCode": data['villageCode'],"populationSize":0,"areaSize":0,"GDP":0})
-
-    return result,town_count,village_count
-
-def count_emergency_expert(location_list:list,db):
-    location = convert_to_polygon(location_list)
-
-    sql = text(f"""SELECT * FROM emergency_expert WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""")
-
-    return len(db.execute(sql).all())
-
-def count_emergency_management(location_list: list, db):
-
-    location = convert_to_polygon(location_list)
-
-    sql = text(f"""SELECT DISTINCT management_unit FROM `rescue_materia`  WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""")
-
-    return len(db.execute(sql).all())
 
 class location_c(BaseModel):
     x:float
@@ -97,7 +30,7 @@ class mine(BaseModel):
     location : List=[]
 
 @router.post('/get_info')
-async def mine(request: Request,from_data:mine,body = Depends(remove_xss_json),db: Session = Depends(get_db)):
+async def mine(request: Request,body = Depends(remove_xss_json),db: Session = Depends(get_db)):
     try:
         # 验证必需的字段
         # required_fields = ['location','location_c']
@@ -106,7 +39,8 @@ async def mine(request: Request,from_data:mine,body = Depends(remove_xss_json),d
         #     raise HTTPException(status_code=401, detail=f"Missing required fields: {', '.join(missing_fields)}")
 
         # 行政镇、行政村数据
-        town_village_data,town_count,village_count = count_town_village(from_data.location,db)
+        # town_village_data,town_count,village_count = count_town_village(from_data.location,db)
+        town_village_data,town_count = get_town_list(body)
         # emergency_expert_count = count_emergency_expert(from_data.location,db)
         # emergency_management_count = count_emergency_management(from_data.location,db)
 
@@ -114,9 +48,9 @@ async def mine(request: Request,from_data:mine,body = Depends(remove_xss_json),d
             "code": 200,
             "msg": "成功",
             "data": {
-                "townVillageData":town_village_data,
+                "townData":town_village_data,
                 "townCount":town_count,
-                "villageCount":village_count,
+                # "villageCount":village_count,
                 "populationSize":0,
                 "areaSize":0,
                 "GDP":0,
@@ -125,7 +59,7 @@ async def mine(request: Request,from_data:mine,body = Depends(remove_xss_json),d
                 },{
                     "name":"易涝点","num":0
                 },{
-                    "name":"医院","num":0
+                    "name":"医院","num":1,"list":[{"name":"测试","longitude":111.275362017,"latitude":21.9912508420001}]
                 }
                 ],
                 # "emergencyExpertCount":emergency_expert_count,

+ 156 - 0
utils/spatial/__init__.py

@@ -0,0 +1,156 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+
+from common.BigDataCenterAPI import *
+from models import *
+from sqlalchemy import  text
+
+from shapely.geometry import Polygon, MultiPolygon
+from shapely.ops import unary_union
+import json
+
+def get_town_list(locations,):
+    # 初始化一个空的MultiPolygon来容纳所有多边形
+
+    multi_polygon = MultiPolygon()
+
+    # 遍历每个位置,创建多边形并添加到multi_polygon中
+    for location in locations:
+        # 将边界列表转换为Polygon
+        polygon = Polygon([(item['x'], item['y']) for item in location])
+        multi_polygon = multi_polygon.union(polygon)
+
+    # 将GeoJSON数据转换为字典
+    with open('/home/python3/xh_twapi01/utils/spatial/zj_geojson.json', 'r', encoding='utf-8') as file:
+        geojson = json.load(file)
+
+    # 假设GeoJSON数据是一个FeatureCollection
+    features = geojson.get('features', [])
+
+    # 初始化一个空列表来存储结果
+    intersected_names_and_pacs = []
+
+    # 遍历GeoJSON中的每个Feature,计算交集
+    for feature in features:
+        geom = feature['geometry']
+        if 'coordinates' in geom:
+            # 将GeoJSON Polygon转换为shapely Polygon
+            if geom['type'] == 'Polygon':
+                polygon = Polygon(geom['coordinates'][0])
+                intersection = polygon.intersection(multi_polygon)
+            elif geom['type'] == 'MultiPolygon':
+                multi_polygon_feature = MultiPolygon([Polygon(coords[0]) for coords in geom['coordinates']])
+                intersection = multi_polygon_feature.intersection(multi_polygon)
+            else:
+                continue  # 跳过非Polygon和非MultiPolygon类型的几何对象
+
+            if not intersection.is_empty:
+                properties = feature['properties']
+                intersected_names_and_pacs.append({
+                    "townName": properties.get('NAME', ''),
+                    "code": properties.get('PAC', ''),
+                    "populationSize": 0,  # 假设值,需要从数据中获取
+                    "areaSize": round(intersection.area, 2),  # 交集区域的面积
+                    "GDP": 0  # 假设值,需要从数据中获取
+                })
+
+    return intersected_names_and_pacs, len(intersected_names_and_pacs)
+# import geopandas as gpd
+# from shapely.geometry import Polygon
+#
+#
+#
+# def get_town_list(locations):
+#     # 读取GeoJSON文件为GeoDataFrame
+#     gdf = gpd.read_file('zj_geojson.json')
+#     gdf = gdf.set_crs("EPSG:4326", allow_override=True)
+#
+#     # 初始化一个空的GeoDataFrame来容纳所有多边形
+#     multi_polygon_gdf = gpd.GeoDataFrame(crs=gdf.crs)
+#
+#     # 遍历每个位置,创建多边形并添加到multi_polygon_gdf中
+#     for location in locations:
+#         # 将边界列表转换为Polygon
+#         polygon = Polygon([(item['x'], item['y']) for item in location])
+#         # 将多边形添加到multi_polygon_gdf中
+#         multi_polygon_gdf = multi_polygon_gdf.append(gpd.GeoDataFrame([1], geometry=[polygon], crs=gdf.crs))
+#
+#     # 使用overlay函数来找出相交的区域
+#     intersected = gpd.overlay(gdf, multi_polygon_gdf, how='intersection')
+#
+#     # 获取相交区域的名称和PAC
+#     intersected_names_and_pacs = [{"name": row['NAME'], "pac": row['PAC'],"populationSize":0,"areaSize":0,"GDP":0} for index, row in intersected.iterrows() if 'NAME' in row and 'PAC' in row]
+#
+#     return intersected_names_and_pacs,len(intersected_names_and_pacs)
+
+
+
+def convert_to_polygon(points):
+    # 将点的列表转换为POLYGON格式的字符串
+    polygon_str = "POLYGON(("
+    for point in points:
+        # 假设点的顺序是经度(x),纬度(y)
+        polygon_str += f"{point['y']} {point['x']}, "
+    # 移除最后一个逗号和空格,然后添加闭合点和结束括号
+    polygon_str = polygon_str.rstrip(", ") + f", {points[0]['y']} {points[0]['x']}))"
+    return polygon_str
+
+def count_town_village(location_list:list,db):
+    town_count = 0
+    town_list = []
+    village_count = 0
+    village_list = []
+    result = []
+
+    url = 'https://19.15.75.180:8581/GatewayMsg/http/api/proxy/invoke'
+    service_code= 'YZT1685418808667'
+    service_info = db.query(OneShareApiEntity).filter(OneShareApiEntity.servercode == service_code).first()
+
+    signTime = str(GetTime() // 1000)
+    nonce = GetNonce(5)
+    sign = GetSign(signTime, nonce, service_info.passtoken)
+    headers = {
+        # 'Content-Type': 'application/json',
+        'x-tif-signature': sign,
+        'x-tif-timestamp': signTime,
+        'x-tif-nonce': nonce,
+        'x-tif-paasid': service_info.passid,
+        'x-tif-serviceId': service_code
+    }
+    response = requests.post(url=url, headers=headers, json=location_list, verify=False)
+    if response.status_code==200:
+        data_list = response.json()['data']
+
+        for data in data_list:
+            township = data['townshipCode']
+            if township not in town_list:
+                town_count+=1
+                town_list.append(township)
+                # result.append({'township':data['township'],"townshipCode":data['townshipCode'],"villages":[]})
+                result.append({'township':data['township'],"townshipCode":data['townshipCode'],"village":'-',"villageCode":'-',"populationSize":0,"areaSize":0,"GDP":0})
+            village = data['villageCode']
+            if village not in village_list:
+                village_count+=1
+                village_list.append(village)
+                # for town in result:
+                #     if town['townshipCode']==data['townshipCode']:
+                #         town["villages"].append({'village': data['village'], "villageCode": data['villageCode']})
+
+                result.append({'township':data['township'],"townshipCode":data['townshipCode'],'village': data['village'], "villageCode": data['villageCode'],"populationSize":0,"areaSize":0,"GDP":0})
+
+    return result,town_count,village_count
+
+def count_emergency_expert(location_list:list,db):
+    location = convert_to_polygon(location_list)
+
+    sql = text(f"""SELECT * FROM emergency_expert WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""")
+
+    return len(db.execute(sql).all())
+
+def count_emergency_management(location_list: list, db):
+
+    location = convert_to_polygon(location_list)
+
+    sql = text(f"""SELECT DISTINCT management_unit FROM `rescue_materia`  WHERE ST_Contains(ST_PolygonFromText( '{location}', 4326 ),ST_PointFromText(CONCAT('POINT(', latitude, ' ', longitude, ')'), 4326))""")
+
+    return len(db.execute(sql).all())