__init__.py 8.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Query, HTTPException, status,WebSocket,WebSocketDisconnect
  4. from common.security import valid_access_token,valid_websocket_token
  5. from fastapi.responses import JSONResponse,StreamingResponse
  6. from common.db import db_czrz
  7. from sqlalchemy.orm import Session
  8. from sqlalchemy.sql import func
  9. from common.auth_user import *
  10. from sqlalchemy import text
  11. from pydantic import BaseModel
  12. from common.BigDataCenterAPI import *
  13. from database import get_db
  14. from typing import List
  15. from models import *
  16. from utils import *
  17. from utils.spatial import *
  18. from utils.ry_system_util import *
  19. from utils.resource_provision_util import *
  20. from common.websocketManager import *
  21. import json
  22. import traceback
  23. router = APIRouter()
  24. def get_data_field(table_id: int, db: Session = Depends(get_db)):
  25. query = db.query(TpDashboardDataManager)
  26. query = query.filter_by(del_flag='0')
  27. query = query.filter_by(id=table_id)
  28. info = query.first()
  29. if not info:
  30. return JSONResponse(status_code=404, content={"code": 404, "msg": "未找到"})
  31. ignore_fields_list = info.technical_field.split(';') if info.technical_field else []
  32. # 查询表结构字段信息
  33. columns_query = text("""
  34. SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT,COLUMN_KEY,IS_NULLABLE
  35. FROM INFORMATION_SCHEMA.COLUMNS
  36. WHERE TABLE_SCHEMA = :schema_name
  37. AND TABLE_NAME = :table_name
  38. AND COLUMN_NAME NOT IN :ignore_fields order by ORDINAL_POSITION
  39. """)
  40. columns_result = db.execute(columns_query, {
  41. "schema_name": info.database_name,
  42. "table_name": info.table_name,
  43. "ignore_fields": tuple(ignore_fields_list)
  44. }).fetchall()
  45. columns_info = [
  46. {"column_name": row[0], "data_type": row[1], "column_comment": row[2],"column_key":row[3],"is_nullable":row[4]}
  47. for row in columns_result
  48. ]
  49. return {
  50. "layer_name": info.layer_name,
  51. "table_name": info.table_name,
  52. "schema_name": info.database_name,
  53. "columns": columns_info
  54. }
  55. @router.get("/list")
  56. async def get_pattern_list(
  57. # name: str = Query(None, description='名称'),
  58. user_id=Depends(valid_access_token),
  59. db: Session = Depends(get_db)
  60. ):
  61. try:
  62. query = db.query(TpDashboardDataManager)
  63. query = query.filter_by(del_flag='0')
  64. # 排序
  65. query = query.order_by(TpDashboardDataManager.order_num)
  66. # 执行分页查询
  67. lists = query.all()
  68. data = [ ]
  69. for info in lists:
  70. data.append({"id": info.id,
  71. "layer_name": info.layer_name,
  72. "data_update_time": info.data_update_time,
  73. "data_resouce": info.data_resouce
  74. })
  75. return {"code": 200, "msg": "查询成功", "data": data
  76. }
  77. except Exception as e:
  78. traceback.print_exc()
  79. return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
  80. @router.get("/info/{table_id}")
  81. async def get_table_structure(table_id: int, db: Session = Depends(get_db)):
  82. try:
  83. data = get_data_field(table_id,db)
  84. return {"code": 200, "msg": "查询成功", "data":data}
  85. except Exception as e:
  86. traceback.print_exc()
  87. return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
  88. @router.get("/get_data/{table_id}")
  89. async def get_data_list(table_id: int,page: int = Query(1, gt=0, description='页码'),pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db) ):
  90. try:
  91. table_structure = get_data_field(table_id, db)
  92. table_name = table_structure["table_name"]
  93. schema_name = table_structure["schema_name"]
  94. columns = [col["column_name"] for col in table_structure["columns"]]
  95. sql = f"SELECT {', '.join(columns)} FROM `{schema_name}`.`{table_name}` where del_flag='0'"
  96. totalsql = f'select count(*) from ({sql})t'
  97. total = db.execute(text(totalsql)).fetchone()[0]
  98. if total==0:
  99. total=1
  100. pages, pagesmod = divmod(total, pageSize)
  101. print(total, pages, pagesmod)
  102. if pagesmod != 0:
  103. pages += 1
  104. print(pages, pagesmod)
  105. if total < pageSize:
  106. pageSize = total
  107. # 正式查询
  108. sql = sql + f' limit {pageSize * (page - 1)}, {pageSize};'
  109. # 构建查询语句
  110. query = text(sql)
  111. result = db.execute(query).fetchall()
  112. # 将结果转换为字典格式
  113. data_list = [dict(zip(columns, row)) for row in result]
  114. return {"code": 200, "msg": "查询成功", "data": data_list,
  115. "total": total,
  116. "page": page,
  117. "pageSize": pageSize,
  118. "totalPages": (total + pageSize - 1) // pageSize
  119. }
  120. except Exception as e:
  121. traceback.print_exc()
  122. return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
  123. @router.put('/update_data/{table_id}/{data_id}')
  124. async def update_data(table_id: int, data_id: int, body=Depends(remove_xss_json), db: Session = Depends(get_db)):
  125. try:
  126. # 获取表结构
  127. table_structure = get_data_field(table_id, db)
  128. table_name = table_structure["table_name"]
  129. schema_name = table_structure["schema_name"]
  130. # 构建更新语句
  131. update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()])
  132. update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET {update_clauses} WHERE id = :data_id")
  133. except Exception as e:
  134. traceback.print_exc()
  135. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  136. try:
  137. db.execute(update_query, {**body, "data_id": data_id})
  138. db.commit()
  139. return {"code": 200, "msg": "操作成功", "data":None}
  140. except Exception as e:
  141. db.rollback()
  142. traceback.print_exc()
  143. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  144. @router.post("/add_data/{table_id}")
  145. async def add_data(table_id: int, data: dict, db: Session = Depends(get_db)):
  146. try:
  147. # 获取表结构
  148. table_structure = get_data_field(table_id, db)
  149. table_name = table_structure["table_name"]
  150. schema_name = table_structure["schema_name"]
  151. # 构建插入语句
  152. columns = ", ".join(data.keys())
  153. values = ", ".join([f":{key}" for key in data.keys()])
  154. insert_query = text(f"INSERT INTO `{schema_name}`.`{table_name}` ({columns}) VALUES ({values})")
  155. except Exception as e:
  156. traceback.print_exc()
  157. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  158. try:
  159. db.execute(insert_query, data)
  160. db.commit()
  161. return {"message": "Data added successfully"}
  162. except Exception as e:
  163. db.rollback()
  164. traceback.print_exc()
  165. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  166. @router.delete('/delete_data/{table_id}/{data_id}')
  167. async def delete_data(table_id: int, data_id: int, db: Session = Depends(get_db)):
  168. try:
  169. # 获取表结构
  170. table_structure = get_data_field(table_id, db)
  171. table_name = table_structure["table_name"]
  172. schema_name = table_structure["schema_name"]
  173. # 构建更新语句
  174. # update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()])
  175. update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET del_flag='2' WHERE id = :data_id")
  176. except Exception as e:
  177. traceback.print_exc()
  178. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  179. try:
  180. db.execute(update_query, { "data_id": data_id})
  181. db.commit()
  182. return {"code": 200, "msg": "操作成功", "data":None}
  183. except Exception as e:
  184. db.rollback()
  185. traceback.print_exc()
  186. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  187. def get_data_info():
  188. pass
  189. def insert_data():
  190. pass