123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status,WebSocket,WebSocketDisconnect
- from common.security import valid_access_token,valid_websocket_token
- from fastapi.responses import JSONResponse,StreamingResponse
- from common.db import db_czrz
- from sqlalchemy.orm import Session
- from sqlalchemy.sql import func
- from common.auth_user import *
- from sqlalchemy import text
- from pydantic import BaseModel
- from common.BigDataCenterAPI import *
- from database import get_db
- from typing import List
- from models import *
- from utils import *
- from utils.spatial import *
- from utils.ry_system_util import *
- from utils.resource_provision_util import *
- from common.websocketManager import *
- import json
- import traceback
- router = APIRouter()
- def get_data_field(table_id: int, db: Session = Depends(get_db)):
- query = db.query(TpDashboardDataManager)
- query = query.filter_by(del_flag='0')
- query = query.filter_by(id=table_id)
- info = query.first()
- if not info:
- return JSONResponse(status_code=404, content={"code": 404, "msg": "未找到"})
- ignore_fields_list = info.technical_field.split(';') if info.technical_field else []
- # 查询表结构字段信息
- columns_query = text("""
- SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT,COLUMN_KEY,IS_NULLABLE
- FROM INFORMATION_SCHEMA.COLUMNS
- WHERE TABLE_SCHEMA = :schema_name
- AND TABLE_NAME = :table_name
- AND COLUMN_NAME NOT IN :ignore_fields order by ORDINAL_POSITION
- """)
- columns_result = db.execute(columns_query, {
- "schema_name": info.database_name,
- "table_name": info.table_name,
- "ignore_fields": tuple(ignore_fields_list)
- }).fetchall()
- columns_info = [
- {"column_name": row[0], "data_type": row[1], "column_comment": row[2],"column_key":row[3],"is_nullable":row[4]}
- for row in columns_result
- ]
- return {
- "layer_name": info.layer_name,
- "table_name": info.table_name,
- "schema_name": info.database_name,
- "columns": columns_info
- }
- @router.get("/list")
- async def get_pattern_list(
- # name: str = Query(None, description='名称'),
- user_id=Depends(valid_access_token),
- db: Session = Depends(get_db)
- ):
- try:
- query = db.query(TpDashboardDataManager)
- query = query.filter_by(del_flag='0')
- # 排序
- query = query.order_by(TpDashboardDataManager.order_num)
- # 执行分页查询
- lists = query.all()
- data = [ ]
- for info in lists:
- data.append({"id": info.id,
- "layer_name": info.layer_name,
- "data_update_time": info.data_update_time,
- "data_resouce": info.data_resouce
- })
- return {"code": 200, "msg": "查询成功", "data": data
- }
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
- @router.get("/info/{table_id}")
- async def get_table_structure(table_id: int, db: Session = Depends(get_db)):
- try:
- data = get_data_field(table_id,db)
- return {"code": 200, "msg": "查询成功", "data":data}
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
- @router.get("/get_data/{table_id}")
- async def get_data_list(table_id: int,page: int = Query(1, gt=0, description='页码'),pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db) ):
- try:
- table_structure = get_data_field(table_id, db)
- table_name = table_structure["table_name"]
- schema_name = table_structure["schema_name"]
- sql = f"SELECT * FROM `{schema_name}`.`{table_name}` where del_flag='0'"
- totalsql = f'select count(*) from ({sql})t'
- total = db.execute(text(totalsql)).fetchone()[0]
- pages, pagesmod = divmod(total, pageSize)
- print(total, pages, pagesmod)
- if pagesmod != 0:
- pages += 1
- print(pages, pagesmod)
- if total < pageSize:
- pageSize = total
- # 正式查询
- sql = sql + f' limit {pageSize * (page - 1)}, {pageSize};'
- # 构建查询语句
- query = text(sql)
- result = db.execute(query).fetchall()
- # 将结果转换为字典格式
- columns = [col["column_name"] for col in table_structure["columns"]]
- data_list = [dict(zip(columns, row)) for row in result]
- return {"code": 200, "msg": "查询成功", "data": data_list,
- "total": total,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (total + pageSize - 1) // pageSize
- }
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
- @router.put('/update_data/{table_id}/{data_id}')
- async def update_data(table_id: int, data_id: int, body=Depends(remove_xss_json), db: Session = Depends(get_db)):
- try:
- # 获取表结构
- table_structure = get_data_field(table_id, db)
- table_name = table_structure["table_name"]
- schema_name = table_structure["schema_name"]
- # 构建更新语句
- update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()])
- update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET {update_clauses} WHERE id = :data_id")
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
- try:
- db.execute(update_query, {**body, "data_id": data_id})
- db.commit()
- return {"code": 200, "msg": "操作成功", "data":None}
- except Exception as e:
- db.rollback()
- traceback.print_exc()
- return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
- @router.post("/add_data/{table_id}")
- async def add_data(table_id: int, data: dict, db: Session = Depends(get_db)):
- try:
- # 获取表结构
- table_structure = get_data_field(table_id, db)
- table_name = table_structure["table_name"]
- schema_name = table_structure["schema_name"]
- # 构建插入语句
- columns = ", ".join(data.keys())
- values = ", ".join([f":{key}" for key in data.keys()])
- insert_query = text(f"INSERT INTO `{schema_name}`.`{table_name}` ({columns}) VALUES ({values})")
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
- try:
- db.execute(insert_query, data)
- db.commit()
- return {"message": "Data added successfully"}
- except Exception as e:
- db.rollback()
- traceback.print_exc()
- return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
- @router.delete('/delete_data/{table_id}/{data_id}')
- async def delete_data(table_id: int, data_id: int, db: Session = Depends(get_db)):
- try:
- # 获取表结构
- table_structure = get_data_field(table_id, db)
- table_name = table_structure["table_name"]
- schema_name = table_structure["schema_name"]
- # 构建更新语句
- # update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()])
- update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET del_flag='2' WHERE id = :data_id")
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
- try:
- db.execute(update_query, { "data_id": data_id})
- db.commit()
- return {"code": 200, "msg": "操作成功", "data":None}
- except Exception as e:
- db.rollback()
- traceback.print_exc()
- return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
- def get_data_info():
- pass
- def insert_data():
- pass
|