__init__.py 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Query, HTTPException, status,WebSocket,WebSocketDisconnect,UploadFile,File
  4. from common.security import valid_access_token,valid_websocket_token
  5. from fastapi.responses import JSONResponse,StreamingResponse
  6. from common.db import db_czrz
  7. from sqlalchemy.orm import Session
  8. from sqlalchemy.sql import func
  9. from common.auth_user import *
  10. from sqlalchemy import text
  11. from pydantic import BaseModel
  12. from common.BigDataCenterAPI import *
  13. from database import get_db
  14. from typing import List
  15. from models import *
  16. from utils import *
  17. from utils.spatial import *
  18. from utils.ry_system_util import *
  19. from utils.resource_provision_util import *
  20. from common.websocketManager import *
  21. import json
  22. import traceback
  23. import pandas as pd
  24. from io import BytesIO
  25. import openpyxl
  26. router = APIRouter()
  27. def get_data_field(table_id: int, db: Session = Depends(get_db)):
  28. query = db.query(TpDashboardDataManager)
  29. query = query.filter_by(del_flag='0')
  30. query = query.filter_by(id=table_id)
  31. info = query.first()
  32. if not info:
  33. return JSONResponse(status_code=404, content={"code": 404, "msg": "未找到"})
  34. ignore_fields_list = info.technical_field.split(';') if info.technical_field else []
  35. # 查询表结构字段信息
  36. columns_query = text("""
  37. SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT,COLUMN_KEY,IS_NULLABLE
  38. FROM INFORMATION_SCHEMA.COLUMNS
  39. WHERE TABLE_SCHEMA = :schema_name
  40. AND TABLE_NAME = :table_name
  41. AND COLUMN_NAME NOT IN :ignore_fields order by ORDINAL_POSITION
  42. """)
  43. columns_result = db.execute(columns_query, {
  44. "layer_name": info.layer_name,
  45. "schema_name": info.database_name,
  46. "table_name": info.table_name,
  47. "ignore_fields": tuple(ignore_fields_list)
  48. }).fetchall()
  49. columns_info = [
  50. {"column_name": row[0], "data_type": row[1], "column_comment": row[2],"column_key":row[3],"is_nullable":row[4]}
  51. for row in columns_result
  52. ]
  53. return {
  54. "layer_name": info.layer_name,
  55. "table_name": info.table_name,
  56. "schema_name": info.database_name,
  57. "columns": columns_info
  58. }
  59. @router.get("/list")
  60. async def get_pattern_list(
  61. # name: str = Query(None, description='名称'),
  62. user_id=Depends(valid_access_token),
  63. db: Session = Depends(get_db)
  64. ):
  65. try:
  66. query = db.query(TpDashboardDataManager)
  67. query = query.filter_by(del_flag='0')
  68. # 排序
  69. query = query.order_by(TpDashboardDataManager.order_num)
  70. # 执行分页查询
  71. lists = query.all()
  72. data = [ ]
  73. for info in lists:
  74. data.append({"id": info.id,
  75. "layer_name": info.layer_name,
  76. "data_update_time": info.data_update_time,
  77. "data_resouce": info.data_resouce
  78. })
  79. return {"code": 200, "msg": "查询成功", "data": data
  80. }
  81. except Exception as e:
  82. traceback.print_exc()
  83. return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
  84. @router.get("/info/{table_id}")
  85. async def get_table_structure(table_id: int, db: Session = Depends(get_db)):
  86. try:
  87. data = get_data_field(table_id,db)
  88. return {"code": 200, "msg": "查询成功", "data":data}
  89. except Exception as e:
  90. traceback.print_exc()
  91. return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
  92. @router.get("/get_data/{table_id}")
  93. async def get_data_list(table_id: int,page: int = Query(1, gt=0, description='页码'),pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db) ):
  94. try:
  95. table_structure = get_data_field(table_id, db)
  96. table_name = table_structure["table_name"]
  97. schema_name = table_structure["schema_name"]
  98. columns = [col["column_name"] for col in table_structure["columns"]]
  99. sql = f"SELECT {', '.join(columns)} FROM `{schema_name}`.`{table_name}` where del_flag='0'"
  100. totalsql = f'select count(*) from ({sql})t'
  101. total = db.execute(text(totalsql)).fetchone()[0]
  102. if total==0:
  103. total=1
  104. pages, pagesmod = divmod(total, pageSize)
  105. print(total, pages, pagesmod)
  106. if pagesmod != 0:
  107. pages += 1
  108. print(pages, pagesmod)
  109. if total < pageSize:
  110. pageSize = total
  111. # 正式查询
  112. sql = sql + f' limit {pageSize * (page - 1)}, {pageSize};'
  113. # 构建查询语句
  114. query = text(sql)
  115. result = db.execute(query).fetchall()
  116. # 将结果转换为字典格式
  117. data_list = [dict(zip(columns, row)) for row in result]
  118. return {"code": 200, "msg": "查询成功", "data": data_list,
  119. "total": total,
  120. "page": page,
  121. "pageSize": pageSize,
  122. "totalPages": (total + pageSize - 1) // pageSize
  123. }
  124. except Exception as e:
  125. traceback.print_exc()
  126. return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
  127. @router.put('/update_data/{table_id}/{data_id}')
  128. async def update_data(table_id: int, data_id: int, body=Depends(remove_xss_json), db: Session = Depends(get_db)):
  129. try:
  130. # 获取表结构
  131. table_structure = get_data_field(table_id, db)
  132. table_name = table_structure["table_name"]
  133. schema_name = table_structure["schema_name"]
  134. # 构建更新语句
  135. update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()])
  136. update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET {update_clauses} WHERE id = :data_id")
  137. except Exception as e:
  138. traceback.print_exc()
  139. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  140. try:
  141. db.execute(update_query, {**body, "data_id": data_id})
  142. db.commit()
  143. return {"code": 200, "msg": "操作成功", "data":None}
  144. except Exception as e:
  145. db.rollback()
  146. traceback.print_exc()
  147. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  148. @router.post("/add_data/{table_id}")
  149. async def add_data(table_id: int, data: dict, db: Session = Depends(get_db)):
  150. try:
  151. # 获取表结构
  152. table_structure = get_data_field(table_id, db)
  153. table_name = table_structure["table_name"]
  154. schema_name = table_structure["schema_name"]
  155. # 构建插入语句
  156. columns = ", ".join(data.keys())
  157. values = ", ".join([f":{key}" for key in data.keys()])
  158. insert_query = text(f"INSERT INTO `{schema_name}`.`{table_name}` ({columns}) VALUES ({values})")
  159. except Exception as e:
  160. traceback.print_exc()
  161. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  162. try:
  163. db.execute(insert_query, data)
  164. db.commit()
  165. return {"message": "Data added successfully"}
  166. except Exception as e:
  167. db.rollback()
  168. traceback.print_exc()
  169. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  170. @router.delete('/delete_data/{table_id}/{data_id}')
  171. async def delete_data(table_id: int, data_id: int, db: Session = Depends(get_db)):
  172. try:
  173. # 获取表结构
  174. table_structure = get_data_field(table_id, db)
  175. table_name = table_structure["table_name"]
  176. schema_name = table_structure["schema_name"]
  177. # 构建更新语句
  178. # update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()])
  179. update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET del_flag='2' WHERE id = :data_id")
  180. except Exception as e:
  181. traceback.print_exc()
  182. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  183. try:
  184. db.execute(update_query, { "data_id": data_id})
  185. db.commit()
  186. return {"code": 200, "msg": "操作成功", "data":None}
  187. except Exception as e:
  188. db.rollback()
  189. traceback.print_exc()
  190. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  191. @router.get("/generate_import_template/{table_id}")
  192. async def generate_import_template(table_id: int, db: Session = Depends(get_db)):
  193. # 获取表结构
  194. table_structure = get_data_field(table_id, db)
  195. columns = table_structure["columns"]
  196. layer_name = table_structure["layer_name"]
  197. # 创建 DataFrame column_name
  198. data = [{col["column_comment"]:col['column_name']} for col in columns]
  199. column_names = [col["column_comment"] for col in columns]
  200. df = pd.DataFrame(data=data,columns=column_names)
  201. # 将 DataFrame 转换为 Excel 文件
  202. output = BytesIO()
  203. with pd.ExcelWriter(output, engine="openpyxl") as writer:
  204. df.to_excel(writer, index=False, sheet_name=layer_name)
  205. # 设置响应头
  206. output.seek(0)
  207. headers = {
  208. "Content-Disposition": f"attachment; filename=import_template_{table_structure['table_name']}.xlsx",
  209. 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
  210. }
  211. return StreamingResponse(output, headers=headers,
  212. media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
  213. # 数据批量导入
  214. @router.post("/import_data/{table_id}")
  215. async def import_data(table_id: int, file: UploadFile = File(...), db: Session = Depends(get_db)):
  216. # 获取表结构
  217. table_structure = get_data_field(table_id, db)
  218. table_name = table_structure["table_name"]
  219. schema_name = table_structure["schema_name"]
  220. columns = table_structure["columns"]
  221. # 读取 Excel 文件
  222. try:
  223. workbook = openpyxl.load_workbook(file.file)
  224. sheet = workbook.active
  225. except Exception as e:
  226. raise HTTPException(status_code=400, detail="Invalid Excel file")
  227. # 获取字段名和字段备注名
  228. column_names = [col["column_name"] for col in columns]
  229. column_comments = [col["column_comment"] for col in columns]
  230. # 检查第一行是否为字段备注名
  231. first_row = [cell.value for cell in sheet[1]]
  232. if first_row != column_comments:
  233. raise HTTPException(status_code=400, detail="Excel columns do not match the expected columns")
  234. # 检查第二行是否为字段名
  235. second_row = [cell.value for cell in sheet[2]]
  236. if second_row != column_names:
  237. raise HTTPException(status_code=400, detail="Excel columns do not match the expected columns")
  238. # 将数据插入到数据库
  239. try:
  240. insert_query = text(
  241. f"INSERT INTO `{schema_name}`.`{table_name}` ({', '.join(column_names)}) VALUES ({', '.join([':' + col for col in column_names])})")
  242. for row in sheet.iter_rows(min_row=3, values_only=True):
  243. db.execute(insert_query, dict(zip(column_names, row)))
  244. db.commit()
  245. return {"message": "Data imported successfully"}
  246. except Exception as e:
  247. db.rollback()
  248. raise HTTPException(status_code=500, detail=str(e))