#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from common.security import valid_access_token from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from sqlalchemy.sql import func from common.auth_user import * from pydantic import BaseModel from database import get_db from typing import List from models import * from utils import * import traceback import json router = APIRouter() @router.get('/type/list') async def get_dict_types( db: Session = Depends(get_db), # 假设 get_db 是获取数据库会话的依赖项 pageNum: int = Query(1, gt=0), pageSize: int = Query(10, gt=0), dictName: str = Query(None, max_length=100), dictType: str = Query(None, max_length=100), beginTime: datetime = Query(None), endTime: datetime = Query(None) ): try: # 构建查询 query = db.query(SysDictType) query = query.filter(SysDictType.del_flag!='2') # 添加查询条件 if dictName: query = query.filter(SysDictType.dict_name.like(f'%{dictName}%')) if dictType: query = query.filter(SysDictType.dict_type.like(f'%{dictType}%')) if beginTime and endTime: query = query.filter( and_( SysDictType.create_time >= beginTime, SysDictType.create_time <= endTime ) ) # 计算总记录数 total_count = query.count() # 计算分页 offset = (pageNum - 1) * pageSize dict_types = query.offset(offset).limit(pageSize).all() # 转换为字典 dict_type_dicts = [ { "dictId": d.dict_id, "dictName": d.dict_name, "dictType": d.dict_type, "remark": d.remark, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '' } for d in dict_types ] # 构建返回结果 result = { "total": total_count, "rows": dict_type_dicts, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 raise HTTPException(status_code=500, detail=str(e)) @router.get('/type/optionselect') async def get_dict_types_optionselect( db: Session = Depends(get_db), # 假设 get_db 是获取数据库会话的依赖项 ): try: # 构建查询 query = db.query(SysDictType) query = query.filter(SysDictType.del_flag!='2') # 计算总记录数 dict_types = query.all() # 转换为字典 dict_type_dicts = [ { "dictId": d.dict_id, "dictName": d.dict_name, "dictType": d.dict_type, "remark": d.remark, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '' } for d in dict_types ] # 构建返回结果 result = { "data": dict_type_dicts, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 raise HTTPException(status_code=500, detail=str(e)) @router.get('/type/{dictId}') async def get_dict_types_optionselect( dictId: str, db: Session = Depends(get_db), # 假设 get_db 是获取数据库会话的依赖项 body = Depends(remove_xss_json), user: AuthUser = Depends(get_auth_user) ): try: # 构建查询 query = db.query(SysDictType) query = query.filter(SysDictType.dict_id==dictId) query = query.filter(SysDictType.del_flag!='2') dict_types = query.first() # 转换为字典 dict_type_dicts ={ "dictId": dict_types.dict_id, "dictName": dict_types.dict_name, "dictType": dict_types.dict_type, "remark": dict_types.remark, "createTime": dict_types.create_time.strftime('%Y-%m-%d %H:%M:%S') if dict_types.create_time else '' } # 构建返回结果 result = { "data": dict_type_dicts, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 raise HTTPException(status_code=500, detail=str(e)) class DictTypeCreateForm(BaseModel): dictName: str dictType: str remark: str = None @router.post("/type") async def create_dict_type( form_data: DictTypeCreateForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从请求数据创建一个新的 SysDictType 实例 new_dict_type = SysDictType( dict_name=form_data.dictName, dict_type=form_data.dictType, remark=form_data.remark, create_time=datetime.now(), # 假设自动设置创建时间 update_time=datetime.now() # 假设自动设置更新时间 ) # 添加到数据库会话并提交 db.add(new_dict_type) db.commit() db.refresh(new_dict_type) # 可选,如果需要刷新实例状态 # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None # 根据你的响应示例,data 为 null } except Exception as e: # 处理异常 raise HTTPException(status_code=500, detail=str(e)) class DictTypeUpdataForm(BaseModel): createTime : str dictId : str dictName: str dictType: str remark: str = None @router.put("/type") async def updata_dict_type( form_data: DictTypeUpdataForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从请求数据创建一个新的 SysDictType 实例 dict_type = db.query(SysDictType).filter(SysDictType.dict_id == form_data.dictId and SysDictType.del_flag != '2').first() if not dict_type: detail = "dict_type不存在" raise HTTPException(status_code=404, detail="dict_type不存在") dict_type.dict_name = form_data.dictName dict_type.dict_type = form_data.dictType dict_type.remark = form_data.remark dict_type.update_time = datetime.now() # 添加到数据库会话并提交 db.commit() # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态 # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None # 根据你的响应示例,data 为 null } except Exception as e: # 处理异常 if str(e)=='': e=detail raise HTTPException(status_code=500, detail=str(e)) @router.delete("/type/delete/{dictId}") # 使用 ID 来标识要删除的接口 async def delete_dict_type( dictId: int, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从数据库中获取要删除的 OneShareApiEntity 实例 dict_type = db.query(SysDictType).filter(SysDictType.dict_id == dictId and SysDictType.del_flag != '2').first() if not dict_type: detail = "dict_type不存在" raise HTTPException(status_code=404, detail="dict_type不存在") dict_type.del_flag = '2' # 删除实例 # db.delete(api) db.commit() # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } except Exception as e: # 处理异常 if str(e)=='': e=detail raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.get('/data/list') async def get_dict_data_by_type( dictType: str = Query(None, max_length=100), pageNum: int = Query(1, gt=0), pageSize: int = Query(10, gt=0), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 根据 dict_type 查询字典数据 # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all() query = db.query(SysDictData) # 添加查询条件 # if dictType: query = query.filter(SysDictData.dict_type.like(f'%{dictType}%')) query = query.filter(SysDictData.del_flag != '2') # 计算总记录数 total_count = query.count() # 计算分页 offset = (pageNum - 1) * pageSize dict_data = query.offset(offset).limit(pageSize).all() # 转换为字典 dict_data_list = [ { "dictCode": d.dict_code, "dictSort": d.dict_sort, "dictLabel": d.dict_label, "dictValue": d.dict_value, "dictType": d.dict_type, "cssClass": d.css_class, "listClass": d.list_class, "isDefault": d.is_default, "remark": d.remark, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '' } for d in dict_data ] # 构建返回结果 result = { "total": total_count, "rows": dict_data_list, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.get('/data/{dictCode}') async def get_dict_data_by_type( dictCode: str , db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 根据 dict_type 查询字典数据 # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all() query = db.query(SysDictData) # 添加查询条件 # if dictType: query = query.filter(SysDictData.dict_code==dictCode) query = query.filter(SysDictData.del_flag != '2') d = query.first() # 转换为字典 dict_data_list = { "dictCode": d.dict_code, "dictSort": d.dict_sort, "dictLabel": d.dict_label, "dictValue": d.dict_value, "dictType": d.dict_type, "cssClass": d.css_class, "listClass": d.list_class, "isDefault": d.is_default, "remark": d.remark, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '' } # 构建返回结果 result = { "data": dict_data_list, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) class DictDataCreateForm(BaseModel): dictLabel:str dictValue:str cssClass:str listClass:str dictSort:int remark:str dictType:str @router.post('/data') async def create_dict_data( form_data: DictDataCreateForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 创建一个新的 SysDictData 实例 new_dict_data = SysDictData( dict_sort=form_data.dictSort, dict_label=form_data.dictLabel, dict_value=form_data.dictValue, dict_type=form_data.dictType, css_class=form_data.cssClass, list_class=form_data.listClass, remark=form_data.remark ) # 添加到会话并提交 db.add(new_dict_data) db.commit() db.refresh(new_dict_data) # 可选,如果需要刷新实例状态 # 构建返回结果 result = { "code": 200, "msg": "操作成功", "data": None } return result except Exception as e: # 处理异常 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.get('/data/type/{dict_type}') async def get_dict_data_by_type( dict_type: str, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 根据 dict_type 查询字典数据 query = db.query(SysDictData) query = query.filter(SysDictData.dict_type==dict_type) query = query.filter(SysDictData.del_flag != '2') # dict_data = db.query(SysDictData).filter_by(dict_type==dict_type and del_flag != '2').all() dict_data = query.all() # 将模型转换为字典 dict_data_list = [ { "dictCode": d.dict_code, "dictSort": d.dict_sort, "dictLabel": d.dict_label, "dictValue": d.dict_value, "dictType": d.dict_type, "cssClass": d.css_class, "listClass": d.list_class, "isDefault": d.is_default, "remark": d.remark, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '' } for d in dict_data ] # 构建返回结果 result = { "code": 200, "msg": "操作成功", "data": dict_data_list } return result except Exception as e: # 处理异常 raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) class DictDataUpdataForm(BaseModel): dictCode:str dictLabel:str= None dictValue:str= None cssClass:str= None listClass:str= None dictSort:int= None remark:str= None dictType:str = None isDefault:str = None @router.put("/data") async def updata_dict_type( form_data: DictDataUpdataForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从请求数据创建一个新的 SysDictType 实例 query = db.query(SysDictData) query = query.filter(SysDictData.dict_code == form_data.dictCode) query = query.filter(SysDictData.del_flag != '2') # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode) # query = db.query(SysDictData).filter(SysDictData.del_flag != '2') dict_data = query.first() if not dict_data: detail = "dict_data不存在" raise HTTPException(status_code=404, detail="dict_data不存在") if form_data.dictSort: dict_data.dict_sort = form_data.dictSort if form_data.dictLabel: dict_data.dict_label = form_data.dictLabel if form_data.dictValue: dict_data.dict_value = form_data.dictValue if form_data.dictType: dict_data.dict_type = form_data.dictType if form_data.cssClass: dict_data.css_class = form_data.cssClass if form_data.listClass: dict_data.list_class = form_data.listClass if form_data.remark: dict_data.remark = form_data.remark if form_data.isDefault: dict_data.is_default = form_data.isDefault # 添加到数据库会话并提交 db.commit() # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态 # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None # 根据你的响应示例,data 为 null } except Exception as e: # 处理异常 if str(e)=='': e=detail raise HTTPException(status_code=500, detail=str(e)) @router.delete("/data/delete/{dictCode}") # 使用 ID 来标识要删除的接口 async def delete_dict_data( dictCode: str, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从数据库中获取要删除的 OneShareApiEntity 实例 query = db.query(SysDictData) query = query.filter(SysDictData.dict_code == dictCode) query = query.filter(SysDictData.del_flag != '2') dict_data = query.first() # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first() if not dict_data: detail = "dict_data不存在" raise HTTPException(status_code=404, detail="dict_data不存在") dict_data.del_flag = '2' # 删除实例 # db.delete(api) db.commit() # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } except Exception as e: # 处理异常 if str(e)=='': e=detail raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.delete("/data/{dictCode}") # 使用 ID 来标识要删除的接口 async def delete_dict_data( dictCode: str, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从数据库中获取要删除的 OneShareApiEntity 实例 query = db.query(SysDictData) query = query.filter(SysDictData.del_flag != '2') dictCodeList = dictCode.split(',') query = query.filter(SysDictData.dict_code.in_(dictCodeList)) dict_data = query.all() # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first() if not dict_data: return JSONResponse(content={"msg":"dict不存在"},status_code=404) # detail = "dict不存在" # raise HTTPException(status_code=404, detail="dict不存在") for dict in dict_data: dict.del_flag = '2' # 删除实例 # db.delete(api) db.commit() # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))