__init__.py 6.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Query, HTTPException, status,WebSocket,WebSocketDisconnect
  4. from common.security import valid_access_token,valid_websocket_token
  5. from fastapi.responses import JSONResponse,StreamingResponse
  6. from common.db import db_czrz
  7. from sqlalchemy.orm import Session
  8. from sqlalchemy.sql import func
  9. from common.auth_user import *
  10. from sqlalchemy import text
  11. from pydantic import BaseModel
  12. from common.BigDataCenterAPI import *
  13. from database import get_db
  14. from typing import List
  15. from models import *
  16. from utils import *
  17. from utils.spatial import *
  18. from utils.ry_system_util import *
  19. from utils.resource_provision_util import *
  20. from common.websocketManager import *
  21. import json
  22. import traceback
  23. router = APIRouter()
  24. def get_data_field(table_id: int, db: Session = Depends(get_db)):
  25. query = db.query(TpDashboardDataManager)
  26. query = query.filter_by(del_flag='0')
  27. query = query.filter_by(id=table_id)
  28. info = query.first()
  29. if not info:
  30. return JSONResponse(status_code=404, content={"code": 404, "msg": "未找到"})
  31. ignore_fields_list = info.technical_field.split(';') if info.technical_field else []
  32. # 查询表结构字段信息
  33. columns_query = text("""
  34. SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT
  35. FROM INFORMATION_SCHEMA.COLUMNS
  36. WHERE TABLE_SCHEMA = :schema_name
  37. AND TABLE_NAME = :table_name
  38. AND COLUMN_NAME NOT IN :ignore_fields order by ORDINAL_POSITION
  39. """)
  40. columns_result = db.execute(columns_query, {
  41. "schema_name": info.database_name,
  42. "table_name": info.table_name,
  43. "ignore_fields": tuple(ignore_fields_list)
  44. }).fetchall()
  45. columns_info = [
  46. {"column_name": row[0], "data_type": row[1], "column_comment": row[2]}
  47. for row in columns_result
  48. ]
  49. return {
  50. "layer_name": info.layer_name,
  51. "table_name": info.table_name,
  52. "schema_name": info.database_name,
  53. "columns": columns_info
  54. }
  55. @router.get("/list")
  56. async def get_pattern_list(
  57. # name: str = Query(None, description='名称'),
  58. user_id=Depends(valid_access_token),
  59. db: Session = Depends(get_db)
  60. ):
  61. try:
  62. query = db.query(TpDashboardDataManager)
  63. query = query.filter_by(del_flag='0')
  64. # 排序
  65. query = query.order_by(TpDashboardDataManager.order_num)
  66. # 执行分页查询
  67. lists = query.all()
  68. data = [ ]
  69. for info in lists:
  70. data.append({"id": info.id,
  71. "layer_name": info.layer_name,
  72. "data_update_time": info.data_update_time,
  73. "data_resouce": info.data_resouce
  74. })
  75. return {"code": 200, "msg": "查询成功", "data": data
  76. }
  77. except Exception as e:
  78. traceback.print_exc()
  79. return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
  80. @router.get("/info/{table_id}")
  81. async def get_table_structure(table_id: int, db: Session = Depends(get_db)):
  82. try:
  83. data = get_data_field(table_id,db)
  84. return {"code": 200, "msg": "查询成功", "data":data}
  85. except Exception as e:
  86. traceback.print_exc()
  87. return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
  88. @router.get("/get_data/{table_id}")
  89. async def get_data_list(table_id: int,page: int = Query(1, gt=0, description='页码'),pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db) ):
  90. try:
  91. table_structure = get_data_field(table_id, db)
  92. table_name = table_structure["table_name"]
  93. schema_name = table_structure["schema_name"]
  94. sql = f"SELECT * FROM `{schema_name}`.`{table_name}` where del_flag='0'"
  95. totalsql = f'select count(*) from ({sql})t'
  96. total = db.execute(text(totalsql)).fetchone()[0]
  97. pages, pagesmod = divmod(total, pageSize)
  98. print(total, pages, pagesmod)
  99. if pagesmod != 0:
  100. pages += 1
  101. print(pages, pagesmod)
  102. if total < pageSize:
  103. pageSize = total
  104. # 正式查询
  105. sql = sql + f' limit {pageSize * (page - 1)}, {pageSize};'
  106. # 构建查询语句
  107. query = text(sql)
  108. result = db.execute(query).fetchall()
  109. # 将结果转换为字典格式
  110. columns = [col["column_name"] for col in table_structure["columns"]]
  111. data_list = [dict(zip(columns, row)) for row in result]
  112. return {"code": 200, "msg": "查询成功", "data": data_list,
  113. "total": total,
  114. "page": page,
  115. "pageSize": pageSize,
  116. "totalPages": (total + pageSize - 1) // pageSize
  117. }
  118. except Exception as e:
  119. traceback.print_exc()
  120. return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
  121. @router.put('/update_data/{table_id}/{data_id}')
  122. async def update_data(table_id: int, data_id: int, body=Depends(remove_xss_json), db: Session = Depends(get_db)):
  123. try:
  124. # 获取表结构
  125. table_structure = get_data_field(table_id, db)
  126. table_name = table_structure["table_name"]
  127. schema_name = table_structure["schema_name"]
  128. # 构建更新语句
  129. update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()])
  130. update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET {update_clauses} WHERE id = :data_id")
  131. except Exception as e:
  132. traceback.print_exc()
  133. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  134. try:
  135. db.execute(update_query, {**body, "data_id": data_id})
  136. db.commit()
  137. return {"code": 200, "msg": "操作成功", "data":None}
  138. except Exception as e:
  139. db.rollback()
  140. traceback.print_exc()
  141. return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
  142. def get_data_info():
  143. pass
  144. def insert_data():
  145. pass