123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status,WebSocket,WebSocketDisconnect,UploadFile,File
- from common.security import valid_access_token,valid_websocket_token
- from fastapi.responses import JSONResponse,StreamingResponse
- from common.db import db_czrz
- from sqlalchemy.orm import Session
- from sqlalchemy.sql import func
- from common.auth_user import *
- from sqlalchemy import text
- from pydantic import BaseModel
- from common.BigDataCenterAPI import *
- from database import get_db
- from typing import List
- from models import *
- from utils import *
- from utils.spatial import *
- from utils.ry_system_util import *
- from utils.resource_provision_util import *
- from common.websocketManager import *
- import json
- import traceback
- import pandas as pd
- from io import BytesIO
- import openpyxl
- import os
- router = APIRouter()
- def get_data_field(table_id: int, db: Session = Depends(get_db)):
- query = db.query(TpDashboardDataManager)
- query = query.filter_by(del_flag='0')
- query = query.filter_by(id=table_id)
- info = query.first()
- if not info:
- return JSONResponse(status_code=404, content={"code": 404, "msg": "未找到"})
- ignore_fields_list = info.technical_field.split(';') if info.technical_field else []
- # 查询表结构字段信息
- columns_query = text("""
- SELECT COLUMN_NAME, DATA_TYPE, COLUMN_COMMENT,COLUMN_KEY,IS_NULLABLE
- FROM INFORMATION_SCHEMA.COLUMNS
- WHERE TABLE_SCHEMA = :schema_name
- AND TABLE_NAME = :table_name
- AND COLUMN_NAME NOT IN :ignore_fields order by ORDINAL_POSITION
- """)
- columns_result = db.execute(columns_query, {
- "layer_name": info.layer_name,
- "schema_name": info.database_name,
- "table_name": info.table_name,
- "ignore_fields": tuple(ignore_fields_list)
- }).fetchall()
- columns_info = [
- {"column_name": row[0], "data_type": row[1], "column_comment": row[2],"column_key":row[3],"is_nullable":row[4]}
- for row in columns_result
- ]
- return {
- "layer_name": info.layer_name,
- "table_name": info.table_name,
- "schema_name": info.database_name,
- "columns": columns_info
- }
- @router.get("/list")
- async def get_pattern_list(
- # name: str = Query(None, description='名称'),
- user_id=Depends(valid_access_token),
- db: Session = Depends(get_db)
- ):
- try:
- query = db.query(TpDashboardDataManager)
- query = query.filter_by(del_flag='0')
- # 排序
- query = query.order_by(TpDashboardDataManager.order_num)
- # 执行分页查询
- lists = query.all()
- data = [ ]
- for info in lists:
- data.append({"id": info.id,
- "layer_name": info.layer_name,
- "data_update_time": info.data_update_time,
- "data_resouce": info.data_resouce
- })
- return {"code": 200, "msg": "查询成功", "data": data
- }
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
- @router.get("/info/{table_id}")
- async def get_table_structure(table_id: int, db: Session = Depends(get_db)):
- try:
- data = get_data_field(table_id,db)
- return {"code": 200, "msg": "查询成功", "data":data}
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
- @router.get("/get_data/{table_id}")
- async def get_data_list(table_id: int,page: int = Query(1, gt=0, description='页码'),pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db) ):
- try:
- table_structure = get_data_field(table_id, db)
- table_name = table_structure["table_name"]
- schema_name = table_structure["schema_name"]
- columns = [col["column_name"] for col in table_structure["columns"]]
- sql = f"SELECT {', '.join(columns)} FROM `{schema_name}`.`{table_name}` where del_flag='0' order by update_time desc"
- totalsql = f'select count(*) from ({sql})t'
- total = db.execute(text(totalsql)).fetchone()[0]
- if total==0:
- total=1
- pages, pagesmod = divmod(total, pageSize)
- print(total, pages, pagesmod)
- if pagesmod != 0:
- pages += 1
- print(pages, pagesmod)
- if total < pageSize:
- pageSize = total
- # 正式查询
- sql = sql + f' limit {pageSize * (page - 1)}, {pageSize};'
- # 构建查询语句
- query = text(sql)
- result = db.execute(query).fetchall()
- # 将结果转换为字典格式
- data_list = [dict(zip(columns, row)) for row in result]
- return {"code": 200, "msg": "查询成功", "data": data_list,
- "total": total,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (total + pageSize - 1) // pageSize
- }
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=500,content={'code':500,'msg':f"接口发生错误:{e}"})
- @router.put('/update_data/{table_id}/{data_id}')
- async def update_data(table_id: int, data_id: int, body=Depends(remove_xss_json), db: Session = Depends(get_db)):
- try:
- # 获取表结构
- table_structure = get_data_field(table_id, db)
- table_name = table_structure["table_name"]
- schema_name = table_structure["schema_name"]
- # 构建更新语句
- update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()])
- update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET {update_clauses} WHERE id = :data_id")
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
- try:
- db.execute(update_query, {**body, "data_id": data_id})
- db.commit()
- return {"code": 200, "msg": "操作成功", "data":None}
- except Exception as e:
- db.rollback()
- traceback.print_exc()
- return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
- @router.post("/add_data/{table_id}")
- async def add_data(table_id: int, data: dict, db: Session = Depends(get_db)):
- try:
- # 获取表结构
- table_structure = get_data_field(table_id, db)
- table_name = table_structure["table_name"]
- schema_name = table_structure["schema_name"]
- # 构建插入语句
- columns = ", ".join(data.keys())
- values = ", ".join([f":{key}" for key in data.keys()])
- insert_query = text(f"INSERT INTO `{schema_name}`.`{table_name}` ({columns}) VALUES ({values})")
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
- try:
- db.execute(insert_query, data)
- db.commit()
- return {"message": "Data added successfully"}
- except Exception as e:
- db.rollback()
- traceback.print_exc()
- return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
- @router.delete('/delete_data/{table_id}/{data_id}')
- async def delete_data(table_id: int, data_id: int, db: Session = Depends(get_db)):
- try:
- # 获取表结构
- table_structure = get_data_field(table_id, db)
- table_name = table_structure["table_name"]
- schema_name = table_structure["schema_name"]
- # 构建更新语句
- # update_clauses = ", ".join([f"`{key}` = :{key}" for key in body.keys()])
- update_query = text(f"UPDATE `{schema_name}`.`{table_name}` SET del_flag='2' WHERE id = :data_id")
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
- try:
- db.execute(update_query, { "data_id": data_id})
- db.commit()
- return {"code": 200, "msg": "操作成功", "data":None}
- except Exception as e:
- db.rollback()
- traceback.print_exc()
- return JSONResponse(status_code=500, content={'code': 500, 'msg': f"接口发生错误:{e}"})
- @router.get("/generate_import_template/{table_id}")
- async def generate_import_template(table_id: int, db: Session = Depends(get_db)):
- # 获取表结构
- table_structure = get_data_field(table_id, db)
- columns = table_structure["columns"]
- layer_name = table_structure["layer_name"]
- # 创建 DataFrame column_name
- data = {}
- for col in columns :
- if col['column_name'] !='id':
- data[col["column_comment"]]=col['column_name']
- column_names = [col["column_comment"] for col in columns if col["column_name"]!='id']
- df = pd.DataFrame(data=[data],columns=column_names)
- # 将 DataFrame 转换为 Excel 文件
- output = BytesIO()
- with pd.ExcelWriter(output, engine="openpyxl") as writer:
- df.to_excel(writer, index=False, sheet_name=layer_name)
- encoded_filename = f'{layer_name}导入模板.xlsx'
- # 设置响应头
- output.seek(0)
- from urllib.parse import quote
- encoded_filename = quote(encoded_filename, encoding='utf-8')
- headers = {
- 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
- 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
- }
- return StreamingResponse(output, headers=headers,
- media_type="application/vnd.openxmlformats-officedocument.spreadsheetml.sheet")
- # 数据批量导入
- @router.post("/import_data/{table_id}")
- async def import_data(table_id: int,
- body = Depends(remove_xss_json), db: Session = Depends(get_db)):
- # 获取表结构
- table_structure = get_data_field(table_id, db)
- table_name = table_structure["table_name"]
- schema_name = table_structure["schema_name"]
- columns = table_structure["columns"]
- filename = body['filename']
- if '../' in filename or '/' in filename:
- return JSONResponse(status_code=400, content={'code': 400, "msg": '警告:禁止篡改文件路径'})
- file_path = f'/data/upload/mergefile/uploads/{filename}'
- if not os.path.exists(file_path):
- return JSONResponse(status_code=404, content={'code': 404, 'msg': f"文件不存在"})
- # print("文件不存在,请检查路径!")
- # 读取 Excel 文件
- try:
- workbook = openpyxl.load_workbook(file_path)
- sheet = workbook.active
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=400, content={'code': 400, 'msg': f"接口发生错误:{e}"})
- # raise HTTPException(status_code=400, detail="Invalid Excel file")
- # 获取字段名和字段备注名
- column_names = [col["column_name"] for col in columns if col["column_name"]!='id']
- column_comments = [col["column_comment"] for col in columns if col["column_name"]!='id']
- # 检查第一行是否为字段备注名
- first_row = [cell.value for cell in sheet[1]]
- if first_row != column_comments:
- print("接口发生错误:Excel columns do not match the expected columns")
- return JSONResponse(status_code=400, content={'code': 400, 'msg': f"接口发生错误:Excel columns do not match the expected columns"})
- # raise HTTPException(status_code=400, detail="Excel columns do not match the expected columns")
- # 检查第二行是否为字段名
- second_row = [cell.value for cell in sheet[2]]
- if second_row != column_names:
- print("接口发生错误:Excel columns do not match the expected columns")
- return JSONResponse(status_code=400,
- content={'code': 400, 'msg': f"接口发生错误:Excel columns do not match the expected columns"})
- # raise HTTPException(status_code=400, detail="Excel columns do not match the expected columns")
- # 将数据插入到数据库
- try:
- insert_query = text(
- f"INSERT INTO `{schema_name}`.`{table_name}` ({', '.join(column_names)}) VALUES ({', '.join([':' + col for col in column_names])})")
- for row in sheet.iter_rows(min_row=3, values_only=True):
- db.execute(insert_query, dict(zip(column_names, row)))
- db.commit()
- return {"code":200,"msg": "Data imported successfully"}
- except Exception as e:
- db.rollback()
- traceback.print_exc()
- return JSONResponse(status_code=500,
- content={'code': 500, 'msg': f"接口发生错误:{e}"})
- # raise HTTPException(status_code=500, detail=str(e))
|