#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status,WebSocket,WebSocketDisconnect,UploadFile,File from common.security import valid_access_token,valid_websocket_token from fastapi.responses import JSONResponse,StreamingResponse from common.db import db_czrz from sqlalchemy.orm import Session from sqlalchemy.sql import func from common.auth_user import * from sqlalchemy import text from pydantic import BaseModel from common.BigDataCenterAPI import * from database import get_db from typing import List from models import * from utils import * from utils.spatial import * from utils.ry_system_util import * from utils.resource_provision_util import * from common.websocketManager import * import json import traceback import pandas as pd from io import BytesIO import openpyxl import os router = APIRouter() def get_data_field(table_id: int, db: Session = Depends(get_db)): query = db.query(TpDashboardDataManager) query = query.filter_by(del_flag='0') query = query.filter_by(id=table_id) info = query.first() if not info: return JSONResponse(status_code=404, content={"code": 404, "msg": "未找到"}) ignore_fields_list = info.technical_field.split(';') if info.technical_field else [] # 查询表结构字段信息 columns_query = text(""" SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT,COLUMN_KEY,IS_NULLABLE FROM INFORMATION_SCHEMA.COLUMNS WHERE TABLE_SCHEMA = :schema_name AND TABLE_NAME = :table_name AND COLUMN_NAME NOT IN :ignore_fields order by ORDINAL_POSITION """) columns_result = db.execute(columns_query, { "layer_name": info.layer_name, "schema_name": info.database_name, "table_name": info.table_name, "ignore_fields": tuple(ignore_fields_list) }).fetchall() columns_info = [ {"column_name": row[0], "data_type": row[1], "column_comment": row[2],"column_key":row[3],"is_nullable":row[4]} for row in columns_result ] return { "layer_name": info.layer_name, "table_name": info.table_name, "schema_name": info.database_name, "columns": columns_info } @router.get("/list") async def get_pattern_list( # name: str = Query(None, description='名称'), user_id=Depends(valid_access_token), db: Session = Depends(get_db) ): try: query = db.query(TpDashboardDataManager) query = query.filter_by(del_flag='0') # 排序 query = query.order_by(TpDashboardDataManager.order_num) # 执行分页查询 lists = query.all() data = [ ] for info in lists: data.append({"id": info.id, "layer_name": info.layer_name, "data_update_time": info.data_update_time, "data_resouce": info.data_resouce }) return {"code": 200, "msg": "查询成功", "data": data } except Exception as e: traceback.print_exc() return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"}) @router.get("/info/{table_id}") async def get_table_structure(table_id: int, db: Session = Depends(get_db)): try: data = get_data_field(table_id,db) return {"code": 200, "msg": "查询成功", "data":data} except Exception as e: traceback.print_exc() return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"}) @router.get("/get_data/{table_id}") async def get_data_list(table_id: int,page: int = Query(1, gt=0, description='页码'),pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db) ): try: table_structure = get_data_field(table_id, db) table_name = table_structure["table_name"] schema_name = table_structure["schema_name"] columns = [col["column_name"] for col in table_structure["columns"]] sql = f"SELECT {', '.join(columns)} FROM `{schema_name}`.`{table_name}` where del_flag='0' order by update_time desc" totalsql = f'select count(*) from ({sql})t' total = db.execute(text(totalsql)).fetchone()[0] if total==0: total=1 pages, pagesmod = divmod(total, pageSize) print(total, pages, pagesmod) if pagesmod != 0: pages += 1 print(pages, pagesmod) if total < pageSize: pageSize = total # 正式查询 sql = sql + f' limit {pageSize * (page - 1)}, {pageSize};' # 构建查询语句 query = text(sql) result = db.execute(query).fetchall() # 将结果转换为字典格式 data_list = [dict(zip(columns, row)) for row in result] return {"code": 200, "msg": "查询成功", "data": data_list, "total": total, "page": page, "pageSize": pageSize, "totalPages": (total + pageSize - 1) // pageSize } except Exception as e: traceback.print_exc() return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"}) @router.put('/update_data/{table_id}/{data_id}') async def update_data(table_id: int, data_id: int, body=Depends(remove_xss_json), db: Session = Depends(get_db)): try: # 获取表结构 table_structure = get_data_field(table_id, db) table_name = table_structure["table_name"] schema_name = table_structure["schema_name"] # 构建更新语句 update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()]) update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET {update_clauses} WHERE id = :data_id") except Exception as e: traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) try: db.execute(update_query, {**body, "data_id": data_id}) db.commit() return {"code": 200, "msg": "操作成功", "data":None} except Exception as e: db.rollback() traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) @router.post("/add_data/{table_id}") async def add_data(table_id: int, data: dict, db: Session = Depends(get_db)): try: # 获取表结构 table_structure = get_data_field(table_id, db) table_name = table_structure["table_name"] schema_name = table_structure["schema_name"] # 构建插入语句 columns = ", ".join(data.keys()) values = ", ".join([f":{key}" for key in data.keys()]) insert_query = text(f"INSERT INTO `{schema_name}`.`{table_name}` ({columns}) VALUES ({values})") except Exception as e: traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) try: db.execute(insert_query, data) db.commit() return {"message": "Data added successfully"} except Exception as e: db.rollback() traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) @router.delete('/delete_data/{table_id}/{data_id}') async def delete_data(table_id: int, data_id: int, db: Session = Depends(get_db)): try: # 获取表结构 table_structure = get_data_field(table_id, db) table_name = table_structure["table_name"] schema_name = table_structure["schema_name"] # 构建更新语句 # update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()]) update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET del_flag='2' WHERE id = :data_id") except Exception as e: traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) try: db.execute(update_query, { "data_id": data_id}) db.commit() return {"code": 200, "msg": "操作成功", "data":None} except Exception as e: db.rollback() traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) @router.get("/generate_import_template/{table_id}") async def generate_import_template(table_id: int, db: Session = Depends(get_db)): # 获取表结构 table_structure = get_data_field(table_id, db) columns = table_structure["columns"] layer_name = table_structure["layer_name"] # 创建 DataFrame column_name data = {} for col in columns : if col['column_name'] !='id': data[col["column_comment"]]=col['column_name'] column_names = [col["column_comment"] for col in columns if col["column_name"]!='id'] df = pd.DataFrame(data=[data],columns=column_names) # 将 DataFrame 转换为 Excel 文件 output = BytesIO() with pd.ExcelWriter(output, engine="openpyxl") as writer: df.to_excel(writer, index=False, sheet_name=layer_name) encoded_filename = f'{layer_name}导入模板.xlsx' # 设置响应头 output.seek(0) from urllib.parse import quote encoded_filename = quote(encoded_filename, encoding='utf-8') headers = { 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}', 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' } return StreamingResponse(output, headers=headers, media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet") # 数据批量导入 @router.post("/import_data/{table_id}") async def import_data(table_id: int, body = Depends(remove_xss_json), db: Session = Depends(get_db)): # 获取表结构 table_structure = get_data_field(table_id, db) table_name = table_structure["table_name"] schema_name = table_structure["schema_name"] columns = table_structure["columns"] filename = body['filename'] if '../' in filename or '/' in filename: return JSONResponse(status_code=400, content={'code': 400, "msg": '警告:禁止篡改文件路径'}) file_path = f'/data/upload/mergefile/uploads/{filename}' if not os.path.exists(file_path): return JSONResponse(status_code=404, content={'code': 404, 'msg': f"文件不存在"}) # print("文件不存在,请检查路径!") # 读取 Excel 文件 try: workbook = openpyxl.load_workbook(file_path) sheet = workbook.active except Exception as e: traceback.print_exc() return JSONResponse(status_code=400, content={'code': 400, 'msg': f"接口发生错误:{e}"}) # raise HTTPException(status_code=400, detail="Invalid Excel file") # 获取字段名和字段备注名 column_names = [col["column_name"] for col in columns if col["column_name"]!='id'] column_comments = [col["column_comment"] for col in columns if col["column_name"]!='id'] # 检查第一行是否为字段备注名 first_row = [cell.value for cell in sheet[1]] if first_row != column_comments: print("接口发生错误:Excel columns do not match the expected columns") return JSONResponse(status_code=400, content={'code': 400, 'msg': f"接口发生错误:Excel columns do not match the expected columns"}) # raise HTTPException(status_code=400, detail="Excel columns do not match the expected columns") # 检查第二行是否为字段名 second_row = [cell.value for cell in sheet[2]] if second_row != column_names: print("接口发生错误:Excel columns do not match the expected columns") return JSONResponse(status_code=400, content={'code': 400, 'msg': f"接口发生错误:Excel columns do not match the expected columns"}) # raise HTTPException(status_code=400, detail="Excel columns do not match the expected columns") # 将数据插入到数据库 try: insert_query = text( f"INSERT INTO `{schema_name}`.`{table_name}` ({', '.join(column_names)}) VALUES ({', '.join([':' + col for col in column_names])})") for row in sheet.iter_rows(min_row=3, values_only=True): db.execute(insert_query, dict(zip(column_names, row))) db.commit() return {"code":200,"msg": "Data imported successfully"} except Exception as e: db.rollback() traceback.print_exc() return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"}) # raise HTTPException(status_code=500, detail=str(e))