123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
- from common.security import valid_access_token
- from fastapi.responses import JSONResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import and_, or_
- from sqlalchemy.sql import func
- from common.auth_user import *
- from pydantic import BaseModel
- from database import get_db
- from typing import List
- from models import *
- from utils import *
- import traceback
- import json
- router = APIRouter()
- @router.get('/type/list')
- async def get_dict_types(
- db: Session = Depends(get_db), # 假设 get_db 是获取数据库会话的依赖项
- pageNum: int = Query(1, gt=0),
- pageSize: int = Query(10, gt=0),
- dictName: str = Query(None, max_length=100),
- dictType: str = Query(None, max_length=100),
- beginTime: datetime = Query(None),
- endTime: datetime = Query(None)
- ):
- try:
- # 构建查询
- query = db.query(SysDictType)
- query = query.filter(SysDictType.del_flag!='2')
- # 添加查询条件
- if dictName:
- query = query.filter(SysDictType.dict_name.like(f'%{dictName}%'))
- if dictType:
- query = query.filter(SysDictType.dict_type.like(f'%{dictType}%'))
- if beginTime and endTime:
- query = query.filter(
- and_(
- SysDictType.create_time >= beginTime,
- SysDictType.create_time <= endTime
- )
- )
- # 计算总记录数
- total_count = query.count()
- # 计算分页
- offset = (pageNum - 1) * pageSize
- dict_types = query.offset(offset).limit(pageSize).all()
- # 转换为字典
- dict_type_dicts = [
- {
- "dictId": d.dict_id,
- "dictName": d.dict_name,
- "dictType": d.dict_type,
- "remark": d.remark,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
- }
- for d in dict_types
- ]
- # 构建返回结果
- result = {
- "total": total_count,
- "rows": dict_type_dicts,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- raise HTTPException(status_code=500, detail=str(e))
- @router.get('/type/optionselect')
- async def get_dict_types_optionselect(
- db: Session = Depends(get_db), # 假设 get_db 是获取数据库会话的依赖项
- ):
- try:
- # 构建查询
- query = db.query(SysDictType)
- query = query.filter(SysDictType.del_flag!='2')
- # 计算总记录数
- dict_types = query.all()
- # 转换为字典
- dict_type_dicts = [
- {
- "dictId": d.dict_id,
- "dictName": d.dict_name,
- "dictType": d.dict_type,
- "remark": d.remark,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
- }
- for d in dict_types
- ]
- # 构建返回结果
- result = {
- "data": dict_type_dicts,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- raise HTTPException(status_code=500, detail=str(e))
- @router.get('/type/{dictId}')
- async def get_dict_types_optionselect(
- dictId: str,
- db: Session = Depends(get_db), # 假设 get_db 是获取数据库会话的依赖项
- body = Depends(remove_xss_json),
- user: AuthUser = Depends(get_auth_user)
- ):
- try:
- # 构建查询
- query = db.query(SysDictType)
- query = query.filter(SysDictType.dict_id==dictId)
- query = query.filter(SysDictType.del_flag!='2')
- dict_types = query.first()
- # 转换为字典
- dict_type_dicts ={
- "dictId": dict_types.dict_id,
- "dictName": dict_types.dict_name,
- "dictType": dict_types.dict_type,
- "remark": dict_types.remark,
- "createTime": dict_types.create_time.strftime('%Y-%m-%d %H:%M:%S') if dict_types.create_time else ''
- }
- # 构建返回结果
- result = {
- "data": dict_type_dicts,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- raise HTTPException(status_code=500, detail=str(e))
- class DictTypeCreateForm(BaseModel):
- dictName: str
- dictType: str
- remark: str = None
- @router.post("/type")
- async def create_dict_type(
- form_data: DictTypeCreateForm,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从请求数据创建一个新的 SysDictType 实例
- new_dict_type = SysDictType(
- dict_name=form_data.dictName,
- dict_type=form_data.dictType,
- remark=form_data.remark,
- create_time=datetime.now(), # 假设自动设置创建时间
- update_time=datetime.now() # 假设自动设置更新时间
- )
- # 添加到数据库会话并提交
- db.add(new_dict_type)
- db.commit()
- db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None # 根据你的响应示例,data 为 null
- }
- except Exception as e:
- # 处理异常
- raise HTTPException(status_code=500, detail=str(e))
- class DictTypeUpdataForm(BaseModel):
- createTime : str
- dictId : str
- dictName: str
- dictType: str
- remark: str = None
- @router.put("/type")
- async def updata_dict_type(
- form_data: DictTypeUpdataForm,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从请求数据创建一个新的 SysDictType 实例
- dict_type = db.query(SysDictType).filter(SysDictType.dict_id == form_data.dictId and SysDictType.del_flag != '2').first()
- if not dict_type:
- detail = "dict_type不存在"
- raise HTTPException(status_code=404, detail="dict_type不存在")
- query = db.query(SysDictData)
- query = query.filter(SysDictData.dict_type == dict_type.dict_type)
- query = query.filter(SysDictData.del_flag != '2')
- # dict_data = db.query(SysDictData).filter_by(dict_type==dict_type and del_flag != '2').all()
- for dict_data in query.all():
- dict_data.dict_type=form_data.dictType
- dict_type.dict_name = form_data.dictName
- dict_type.dict_type = form_data.dictType
- dict_type.remark = form_data.remark
- dict_type.update_time = datetime.now()
- # 添加到数据库会话并提交
- db.commit()
- # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None # 根据你的响应示例,data 为 null
- }
- except Exception as e:
- # 处理异常
- if str(e)=='':
- e=detail
- raise HTTPException(status_code=500, detail=str(e))
- @router.delete("/type/{dictId}")
- @router.delete("/type/delete/{dictId}") # 使用 ID 来标识要删除的接口
- async def delete_dict_type(
- dictId: int,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从数据库中获取要删除的 OneShareApiEntity 实例
- dict_type = db.query(SysDictType).filter(SysDictType.dict_id == dictId and SysDictType.del_flag != '2').first()
- if not dict_type:
- detail = "dict_type不存在"
- raise HTTPException(status_code=404, detail="dict_type不存在")
- dict_type.del_flag = '2'
- # 删除实例
- # db.delete(api)
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- if str(e)=='':
- e=detail
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
- @router.get('/data/list')
- async def get_dict_data_by_type(
- dictType: str = Query(None, max_length=100),
- pageNum: int = Query(1, gt=0),
- pageSize: int = Query(10, gt=0),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
- query = db.query(SysDictData)
- # 添加查询条件
- # if dictType:
- query = query.filter(SysDictData.dict_type==f'{dictType}')
- query = query.filter(SysDictData.del_flag != '2')
- # 计算总记录数
- total_count = query.count()
- # 排序
- query = query.order_by(SysDictData.dict_sort.asc())
- # 计算分页
- offset = (pageNum - 1) * pageSize
- dict_data = query.offset(offset).limit(pageSize).all()
- # 转换为字典
- dict_data_list = [
- {
- "dictCode": d.dict_code,
- "dictSort": d.dict_sort,
- "dictLabel": d.dict_label,
- "dictValue": d.dict_value,
- "dictType": d.dict_type,
- "cssClass": d.css_class,
- "listClass": d.list_class,
- "isDefault": d.is_default,
- "remark": d.remark,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
- }
- for d in dict_data
- ]
- # 构建返回结果
- result = {
- "total": total_count,
- "rows": dict_data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
- @router.get('/data/{dictCode}')
- async def get_dict_data_by_type(
- dictCode: str ,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
- query = db.query(SysDictData)
- # 添加查询条件
- # if dictType:
- query = query.filter(SysDictData.dict_code==dictCode)
- query = query.filter(SysDictData.del_flag != '2')
- d = query.first()
- # 转换为字典
- dict_data_list = {
- "dictCode": d.dict_code,
- "dictSort": d.dict_sort,
- "dictLabel": d.dict_label,
- "dictValue": d.dict_value,
- "dictType": d.dict_type,
- "cssClass": d.css_class,
- "listClass": d.list_class,
- "isDefault": d.is_default,
- "remark": d.remark,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
- }
- # 构建返回结果
- result = {
- "data": dict_data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
- class DictDataCreateForm(BaseModel):
- dictLabel:str
- dictValue:str
- cssClass:str
- listClass:str
- dictSort:int
- remark:str
- dictType:str
- @router.post('/data')
- async def create_dict_data(
- form_data: DictDataCreateForm,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 创建一个新的 SysDictData 实例
- new_dict_data = SysDictData(
- dict_sort=form_data.dictSort,
- dict_label=form_data.dictLabel,
- dict_value=form_data.dictValue,
- dict_type=form_data.dictType,
- css_class=form_data.cssClass,
- list_class=form_data.listClass,
- remark=form_data.remark
- )
- # 添加到会话并提交
- db.add(new_dict_data)
- db.commit()
- db.refresh(new_dict_data) # 可选,如果需要刷新实例状态
- # 构建返回结果
- result = {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- return result
- except Exception as e:
- # 处理异常
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
- @router.get('/data/type/{dict_type}')
- async def get_dict_data_by_type(
- dict_type: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- query = db.query(SysDictData)
- query = query.filter(SysDictData.dict_type==dict_type)
- query = query.filter(SysDictData.del_flag != '2')
- query = query.order_by(SysDictData.dict_sort)
- # dict_data = db.query(SysDictData).filter_by(dict_type==dict_type and del_flag != '2').all()
- dict_data = query.all()
- # 将模型转换为字典
- dict_data_list = [
- {
- "dictCode": d.dict_code,
- "dictSort": d.dict_sort,
- "dictLabel": d.dict_label,
- "dictValue": d.dict_value,
- "dictType": d.dict_type,
- "cssClass": d.css_class,
- "listClass": d.list_class,
- "isDefault": d.is_default,
- "remark": d.remark,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
- }
- for d in dict_data
- ]
- # 构建返回结果
- result = {
- "code": 200,
- "msg": "操作成功",
- "data": dict_data_list
- }
- return result
- except Exception as e:
- # 处理异常
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
- class DictDataUpdataForm(BaseModel):
- dictCode:str
- dictLabel:str= None
- dictValue:str= None
- cssClass:str= None
- listClass:str= None
- dictSort:int= None
- remark:str= None
- dictType:str = None
- isDefault:str = None
- @router.put("/data")
- async def updata_dict_type(
- form_data: DictDataUpdataForm,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从请求数据创建一个新的 SysDictType 实例
- query = db.query(SysDictData)
- query = query.filter(SysDictData.dict_code == form_data.dictCode)
- query = query.filter(SysDictData.del_flag != '2')
- # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode)
- # query = db.query(SysDictData).filter(SysDictData.del_flag != '2')
- dict_data = query.first()
- if not dict_data:
- detail = "dict_data不存在"
- raise HTTPException(status_code=404, detail="dict_data不存在")
- if "dictSort" in body:
- dict_data.dict_sort = body['dictSort']
- if form_data.dictLabel:
- dict_data.dict_label = form_data.dictLabel
- if form_data.dictValue:
- dict_data.dict_value = form_data.dictValue
- if form_data.dictType:
- dict_data.dict_type = form_data.dictType
- if 'cssClass' in body:
- dict_data.css_class = body['cssClass']
- if 'listClass'in body :
- dict_data.list_class = body['listClass']
- if form_data.remark:
- dict_data.remark = form_data.remark
- if form_data.isDefault:
- dict_data.is_default = form_data.isDefault
- # 添加到数据库会话并提交
- db.commit()
- # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None # 根据你的响应示例,data 为 null
- }
- except Exception as e:
- # 处理异常
- if str(e)=='':
- e=detail
- raise HTTPException(status_code=500, detail=str(e))
- @router.delete("/data/delete/{dictCode}") # 使用 ID 来标识要删除的接口
- async def delete_dict_data(
- dictCode: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从数据库中获取要删除的 OneShareApiEntity 实例
- query = db.query(SysDictData)
- query = query.filter(SysDictData.dict_code == dictCode)
- query = query.filter(SysDictData.del_flag != '2')
- dict_data = query.first()
- # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
- if not dict_data:
- detail = "dict_data不存在"
- raise HTTPException(status_code=404, detail="dict_data不存在")
- dict_data.del_flag = '2'
- # 删除实例
- # db.delete(api)
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- if str(e)=='':
- e=detail
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
- @router.delete("/data/{dictCode}") # 使用 ID 来标识要删除的接口
- async def delete_dict_data(
- dictCode: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从数据库中获取要删除的 OneShareApiEntity 实例
- query = db.query(SysDictData)
- query = query.filter(SysDictData.del_flag != '2')
- dictCodeList = dictCode.split(',')
- query = query.filter(SysDictData.dict_code.in_(dictCodeList))
- dict_data = query.all()
- # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
- if not dict_data:
- return JSONResponse(content={"msg":"dict不存在"},status_code=404)
- # detail = "dict不存在"
- # raise HTTPException(status_code=404, detail="dict不存在")
- for dict in dict_data:
- dict.del_flag = '2'
- # 删除实例
- # db.delete(api)
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))
|