__init__.py 13 KB


  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Query, HTTPException, status,WebSocket,WebSocketDisconnect,UploadFile,File
  4. from common.security import valid_access_token,valid_websocket_token
  5. from fastapi.responses import JSONResponse,StreamingResponse
  6. from common.db import db_czrz
  7. from sqlalchemy.orm import Session
  8. from sqlalchemy.sql import func
  9. from common.auth_user import *
  10. from sqlalchemy import text
  11. from pydantic import BaseModel
  12. from common.BigDataCenterAPI import *
  13. from database import get_db
  14. from typing import List
  15. from models import *
  16. from utils import *
  17. from utils.spatial import *
  18. from utils.ry_system_util import *
  19. from utils.resource_provision_util import *
  20. from common.websocketManager import *
  21. import json
  22. import traceback
  23. import pandas as pd
  24. from io import BytesIO
  25. import openpyxl
  26. import os
  27. router = APIRouter()
  28. def get_data_field(table_id: int, db: Session = Depends(get_db)):
  29. query = db.query(TpDashboardDataManager)
  30. query = query.filter_by(del_flag='0')
  31. query = query.filter_by(id=table_id)
  32. info = query.first()
  33. if not info:
  34. return JSONResponse(status_code=404, content={"code": 404, "msg": "未找到"})
  35. ignore_fields_list = info.technical_field.split(';') if info.technical_field else []
  36. # 查询表结构字段信息
  37. columns_query = text("""
  38. SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT,COLUMN_KEY,IS_NULLABLE
  39. FROM INFORMATION_SCHEMA.COLUMNS
  40. WHERE TABLE_SCHEMA = :schema_name
  41. AND TABLE_NAME = :table_name
  42. AND COLUMN_NAME NOT IN :ignore_fields order by ORDINAL_POSITION
  43. """)
  44. columns_result = db.execute(columns_query, {
  45. "layer_name": info.layer_name,
  46. "schema_name": info.database_name,
  47. "table_name": info.table_name,
  48. "ignore_fields": tuple(ignore_fields_list)
  49. }).fetchall()
  50. columns_info = [
  51. {"column_name": row[0], "data_type": row[1], "column_comment": row[2],"column_key":row[3],"is_nullable":row[4]}
  52. for row in columns_result
  53. ]
  54. return {
  55. "layer_name": info.layer_name,
  56. "table_name": info.table_name,
  57. "schema_name": info.database_name,
  58. "columns": columns_info
  59. }
  60. @router.get("/list")
  61. async def get_pattern_list(
  62. # name: str = Query(None, description='名称'),
  63. user_id=Depends(valid_access_token),
  64. db: Session = Depends(get_db)
  65. ):
  66. try:
  67. query = db.query(TpDashboardDataManager)
  68. query = query.filter_by(del_flag='0')
  69. # 排序
  70. query = query.order_by(TpDashboardDataManager.order_num)
  71. # 执行分页查询
  72. lists = query.all()
  73. data = [ ]
  74. for info in lists:
  75. data.append({"id": info.id,
  76. "layer_name": info.layer_name,
  77. "data_update_time": info.data_update_time,
  78. "data_resouce": info.data_resouce
  79. })
  80. return {"code": 200, "msg": "查询成功", "data": data
  81. }
  82. except Exception as e:
  83. traceback.print_exc()
  84. return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
  85. @router.get("/info/{table_id}")
  86. async def get_table_structure(table_id: int, db: Session = Depends(get_db)):
  87. try:
  88. data = get_data_field(table_id,db)
  89. return {"code": 200, "msg": "查询成功", "data":data}
  90. except Exception as e:
  91. traceback.print_exc()
  92. return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
  93. @router.get("/get_data/{table_id}")
  94. async def get_data_list(table_id: int,page: int = Query(1, gt=0, description='页码'),pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db) ):
  95. try:
  96. table_structure = get_data_field(table_id, db)
  97. table_name = table_structure["table_name"]
  98. schema_name = table_structure["schema_name"]
  99. columns = [col["column_name"] for col in table_structure["columns"]]
  100. sql = f"SELECT {', '.join(columns)} FROM `{schema_name}`.`{table_name}` where del_flag='0' order by update_time desc"
  101. totalsql = f'select count(*) from ({sql})t'
  102. total = db.execute(text(totalsql)).fetchone()[0]
  103. if total==0:
  104. total=1
  105. pages, pagesmod = divmod(total, pageSize)
  106. print(total, pages, pagesmod)
  107. if pagesmod != 0:
  108. pages += 1
  109. print(pages, pagesmod)
  110. if total < pageSize:
  111. pageSize = total
  112. # 正式查询
  113. sql = sql + f' limit {pageSize * (page - 1)}, {pageSize};'
  114. # 构建查询语句
  115. query = text(sql)
  116. result = db.execute(query).fetchall()
  117. # 将结果转换为字典格式
  118. data_list = [dict(zip(columns, row)) for row in result]
  119. return {"code": 200, "msg": "查询成功", "data": data_list,
  120. "total": total,
  121. "page": page,
  122. "pageSize": pageSize,
  123. "totalPages": (total + pageSize - 1) // pageSize
  124. }
  125. except Exception as e:
  126. traceback.print_exc()
  127. return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
  128. @router.put('/update_data/{table_id}/{data_id}')
  129. async def update_data(table_id: int, data_id: int, body=Depends(remove_xss_json), db: Session = Depends(get_db)):
  130. try:
  131. # 获取表结构
  132. table_structure = get_data_field(table_id, db)
  133. table_name = table_structure["table_name"]
  134. schema_name = table_structure["schema_name"]
  135. # 构建更新语句
  136. update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()])
  137. update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET {update_clauses} WHERE id = :data_id")
  138. except Exception as e:
  139. traceback.print_exc()
  140. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  141. try:
  142. db.execute(update_query, {**body, "data_id": data_id})
  143. db.commit()
  144. return {"code": 200, "msg": "操作成功", "data":None}
  145. except Exception as e:
  146. db.rollback()
  147. traceback.print_exc()
  148. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  149. @router.post("/add_data/{table_id}")
  150. async def add_data(table_id: int, data: dict, db: Session = Depends(get_db)):
  151. try:
  152. # 获取表结构
  153. table_structure = get_data_field(table_id, db)
  154. table_name = table_structure["table_name"]
  155. schema_name = table_structure["schema_name"]
  156. # 构建插入语句
  157. columns = ", ".join(data.keys())
  158. values = ", ".join([f":{key}" for key in data.keys()])
  159. insert_query = text(f"INSERT INTO `{schema_name}`.`{table_name}` ({columns}) VALUES ({values})")
  160. except Exception as e:
  161. traceback.print_exc()
  162. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  163. try:
  164. db.execute(insert_query, data)
  165. db.commit()
  166. return {"message": "Data added successfully"}
  167. except Exception as e:
  168. db.rollback()
  169. traceback.print_exc()
  170. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  171. @router.delete('/delete_data/{table_id}/{data_id}')
  172. async def delete_data(table_id: int, data_id: int, db: Session = Depends(get_db)):
  173. try:
  174. # 获取表结构
  175. table_structure = get_data_field(table_id, db)
  176. table_name = table_structure["table_name"]
  177. schema_name = table_structure["schema_name"]
  178. # 构建更新语句
  179. # update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()])
  180. update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET del_flag='2' WHERE id = :data_id")
  181. except Exception as e:
  182. traceback.print_exc()
  183. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  184. try:
  185. db.execute(update_query, { "data_id": data_id})
  186. db.commit()
  187. return {"code": 200, "msg": "操作成功", "data":None}
  188. except Exception as e:
  189. db.rollback()
  190. traceback.print_exc()
  191. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  192. @router.get("/generate_import_template/{table_id}")
  193. async def generate_import_template(table_id: int, db: Session = Depends(get_db)):
  194. # 获取表结构
  195. table_structure = get_data_field(table_id, db)
  196. columns = table_structure["columns"]
  197. layer_name = table_structure["layer_name"]
  198. # 创建 DataFrame column_name
  199. data = {}
  200. for col in columns :
  201. if col['column_name'] !='id':
  202. data[col["column_comment"]]=col['column_name']
  203. column_names = [col["column_comment"] for col in columns if col["column_name"]!='id']
  204. df = pd.DataFrame(data=[data],columns=column_names)
  205. # 将 DataFrame 转换为 Excel 文件
  206. output = BytesIO()
  207. with pd.ExcelWriter(output, engine="openpyxl") as writer:
  208. df.to_excel(writer, index=False, sheet_name=layer_name)
  209. encoded_filename = f'{layer_name}导入模板.xlsx'
  210. # 设置响应头
  211. output.seek(0)
  212. from urllib.parse import quote
  213. encoded_filename = quote(encoded_filename, encoding='utf-8')
  214. headers = {
  215. 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
  216. 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
  217. }
  218. return StreamingResponse(output, headers=headers,
  219. media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
  220. # 数据批量导入
  221. @router.post("/import_data/{table_id}")
  222. async def import_data(table_id: int,
  223. body = Depends(remove_xss_json), db: Session = Depends(get_db)):
  224. # 获取表结构
  225. table_structure = get_data_field(table_id, db)
  226. table_name = table_structure["table_name"]
  227. schema_name = table_structure["schema_name"]
  228. columns = table_structure["columns"]
  229. filename = body['filename']
  230. if '../' in filename or '/' in filename:
  231. return JSONResponse(status_code=400, content={'code': 400, "msg": '警告:禁止篡改文件路径'})
  232. file_path = f'/data/upload/mergefile/uploads/{filename}'
  233. if not os.path.exists(file_path):
  234. return JSONResponse(status_code=404, content={'code': 404, 'msg': f"文件不存在"})
  235. # print("文件不存在,请检查路径!")
  236. # 读取 Excel 文件
  237. try:
  238. workbook = openpyxl.load_workbook(file_path)
  239. sheet = workbook.active
  240. except Exception as e:
  241. traceback.print_exc()
  242. return JSONResponse(status_code=400, content={'code': 400, 'msg': f"接口发生错误:{e}"})
  243. # raise HTTPException(status_code=400, detail="Invalid Excel file")
  244. # 获取字段名和字段备注名
  245. column_names = [col["column_name"] for col in columns if col["column_name"]!='id']
  246. column_comments = [col["column_comment"] for col in columns if col["column_name"]!='id']
  247. # 检查第一行是否为字段备注名
  248. first_row = [cell.value for cell in sheet[1]]
  249. if first_row != column_comments:
  250. print("接口发生错误:Excel columns do not match the expected columns")
  251. return JSONResponse(status_code=400, content={'code': 400, 'msg': f"接口发生错误:Excel columns do not match the expected columns"})
  252. # raise HTTPException(status_code=400, detail="Excel columns do not match the expected columns")
  253. # 检查第二行是否为字段名
  254. second_row = [cell.value for cell in sheet[2]]
  255. if second_row != column_names:
  256. print("接口发生错误:Excel columns do not match the expected columns")
  257. return JSONResponse(status_code=400,
  258. content={'code': 400, 'msg': f"接口发生错误:Excel columns do not match the expected columns"})
  259. # raise HTTPException(status_code=400, detail="Excel columns do not match the expected columns")
  260. # 将数据插入到数据库
  261. try:
  262. insert_query = text(
  263. f"INSERT INTO `{schema_name}`.`{table_name}` ({', '.join(column_names)}) VALUES ({', '.join([':' + col for col in column_names])})")
  264. for row in sheet.iter_rows(min_row=3, values_only=True):
  265. db.execute(insert_query, dict(zip(column_names, row)))
  266. db.commit()
  267. return {"code":200,"msg": "Data imported successfully"}
  268. except Exception as e:
  269. db.rollback()
  270. traceback.print_exc()
  271. return JSONResponse(status_code=500,
  272. content={'code': 500, 'msg': f"接口发生错误:{e}"})
  273. # raise HTTPException(status_code=500, detail=str(e))