#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status,WebSocket,WebSocketDisconnect from common.security import valid_access_token,valid_websocket_token from fastapi.responses import JSONResponse,StreamingResponse from common.db import db_czrz from sqlalchemy.orm import Session from sqlalchemy.sql import func from common.auth_user import * from sqlalchemy import text from pydantic import BaseModel from common.BigDataCenterAPI import * from database import get_db from typing import List from models import * from utils import * from utils.spatial import * from utils.ry_system_util import * from utils.resource_provision_util import * from common.websocketManager import * import json import traceback router = APIRouter() def get_data_field(table_id: int, db: Session = Depends(get_db)): query = db.query(TpDashboardDataManager) query = query.filter_by(del_flag='0') query = query.filter_by(id=table_id) info = query.first() if not info: return JSONResponse(status_code=404, content={"code": 404, "msg": "未找到"}) ignore_fields_list = info.technical_field.split(';') if info.technical_field else [] # 查询表结构字段信息 columns_query = text(""" SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT,COLUMN_KEY,IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = :schema_name AND TABLE_NAME = :table_name AND COLUMN_NAME NOT IN :ignore_fields order by ORDINAL_POSITION """) columns_result = db.execute(columns_query, { "schema_name": info.database_name, "table_name": info.table_name, "ignore_fields": tuple(ignore_fields_list) }).fetchall() columns_info = [ {"column_name": row[0], "data_type": row[1], "column_comment": row[2],"column_key":row[3],"is_nullable":row[4]} for row in columns_result ] return { "layer_name": info.layer_name, "table_name": info.table_name, "schema_name": info.database_name, "columns": columns_info } @router.get("/list") async def get_pattern_list( # name: str = Query(None, description='名称'), user_id=Depends(valid_access_token), db: Session = Depends(get_db) ): try: query = db.query(TpDashboardDataManager) query = query.filter_by(del_flag='0') # 排序 query = query.order_by(TpDashboardDataManager.order_num) # 执行分页查询 lists = query.all() data = [ ] for info in lists: data.append({"id": info.id, "layer_name": info.layer_name, "data_update_time": info.data_update_time, "data_resouce": info.data_resouce }) return {"code": 200, "msg": "查询成功", "data": data } except Exception as e: traceback.print_exc() return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"}) @router.get("/info/{table_id}") async def get_table_structure(table_id: int, db: Session = Depends(get_db)): try: data = get_data_field(table_id,db) return {"code": 200, "msg": "查询成功", "data":data} except Exception as e: traceback.print_exc() return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"}) @router.get("/get_data/{table_id}") async def get_data_list(table_id: int,page: int = Query(1, gt=0, description='页码'),pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db) ): try: table_structure = get_data_field(table_id, db) table_name = table_structure["table_name"] schema_name = table_structure["schema_name"] sql = f"SELECT * FROM `{schema_name}`.`{table_name}` where del_flag='0'" totalsql = f'select count(*) from ({sql})t' total = db.execute(text(totalsql)).fetchone()[0] pages, pagesmod = divmod(total, pageSize) print(total, pages, pagesmod) if pagesmod != 0: pages += 1 print(pages, pagesmod) if total < pageSize: pageSize = total # 正式查询 sql = sql + f' limit {pageSize * (page - 1)}, {pageSize};' # 构建查询语句 query = text(sql) result = db.execute(query).fetchall() # 将结果转换为字典格式 columns = [col["column_name"] for col in table_structure["columns"]] data_list = [dict(zip(columns, row)) for row in result] return {"code": 200, "msg": "查询成功", "data": data_list, "total": total, "page": page, "pageSize": pageSize, "totalPages": (total + pageSize - 1) // pageSize } except Exception as e: traceback.print_exc() return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"}) @router.put('/update_data/{table_id}/{data_id}') async def update_data(table_id: int, data_id: int, body=Depends(remove_xss_json), db: Session = Depends(get_db)): try: # 获取表结构 table_structure = get_data_field(table_id, db) table_name = table_structure["table_name"] schema_name = table_structure["schema_name"] # 构建更新语句 update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()]) update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET {update_clauses} WHERE id = :data_id") except Exception as e: traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) try: db.execute(update_query, {**body, "data_id": data_id}) db.commit() return {"code": 200, "msg": "操作成功", "data":None} except Exception as e: db.rollback() traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) @router.post("/add_data/{table_id}") async def add_data(table_id: int, data: dict, db: Session = Depends(get_db)): try: # 获取表结构 table_structure = get_data_field(table_id, db) table_name = table_structure["table_name"] schema_name = table_structure["schema_name"] # 构建插入语句 columns = ", ".join(data.keys()) values = ", ".join([f":{key}" for key in data.keys()]) insert_query = text(f"INSERT INTO `{schema_name}`.`{table_name}` ({columns}) VALUES ({values})") except Exception as e: traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) try: db.execute(insert_query, data) db.commit() return {"message": "Data added successfully"} except Exception as e: db.rollback() traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) @router.delete('/delete_data/{table_id}/{data_id}') async def delete_data(table_id: int, data_id: int, db: Session = Depends(get_db)): try: # 获取表结构 table_structure = get_data_field(table_id, db) table_name = table_structure["table_name"] schema_name = table_structure["schema_name"] # 构建更新语句 # update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()]) update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET del_flag='2' WHERE id = :data_id") except Exception as e: traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) try: db.execute(update_query, { "data_id": data_id}) db.commit() return {"code": 200, "msg": "操作成功", "data":None} except Exception as e: db.rollback() traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) def get_data_info(): pass def insert_data(): pass