#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from common.security import valid_access_token from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from sqlalchemy.sql import func from common.auth_user import * from pydantic import BaseModel from database import get_db from typing import List from models import * from utils import * from utils.ry_system_util import * from utils.video_util import * import traceback import json router = APIRouter() @router.get('/list') async def get_dict_data_by_type( keywords:str =Query(None), page: int = Query(1, gt=0), pageSize: int = Query(10, gt=0), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 根据 dict_type 查询字典数据 # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all() query = db.query(DutyPosition) query = query.filter(DutyPosition.del_flag != '2') # 添加查询条件 if keywords: query = query.filter(DutyPosition.position_name.like(f'%{keywords}%')) # 计算总记录数 total_count = query.count() # 计算分页 offset = (page - 1) * pageSize position_data = query.offset(offset).limit(pageSize).all() # 转换为字典 data_list = [] for d in position_data: data_list.append({ "id": d.id, "sort_number": d.sort_number, "position_name":d.position_name, "type": d.type, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '' }) # 构建返回结果 result = { "total": total_count, "page": page, "pageSize": pageSize, "totalPages": (total_count + pageSize - 1) // pageSize, "data": data_list, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.get('/info/{id}') async def get_dict_data_by_type( id: str , db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 根据 dict_type 查询字典数据 # dict_data = dict_type_get_dict_data_info(db,'three_proofing') query = db.query(DutyPosition) # 添加查询条件 # if dictType: query = query.filter(DutyPosition.id==id) query = query.filter(DutyPosition.del_flag != '2') d = query.first() data_list = { "id": d.id, "sort_number": d.sort_number, "position_name": d.position_name, "type": d.type, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '' } # 构建返回结果 result = { "data": data_list, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.post('/create') async def create_dict_data( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 创建一个新的 SysDictData 实例 new_position_data = DutyPosition( sort_number=body['sort_number'], position_name=body['position_name'], type=body['type'], create_by=user_id ) # 添加到会话并提交 db.add(new_position_data) db.commit() # 构建返回结果 result = { "code": 200, "msg": "操作成功", "data": None } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.put("/update") async def updata_dict_type( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从请求数据创建一个新的 SysDictType 实例 query = db.query(DutyPosition) query = query.filter(DutyPosition.id == body['id']) query = query.filter(DutyPosition.del_flag != '2') # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode) # query = db.query(SysDictData).filter(SysDictData.del_flag != '2') position_data = query.first() if not position_data: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '岗位不存在' }) position_data.sort_number=body['sort_number'] position_data.position_name = body['position_name'] position_data.type = body['type'] position_data.update_by = user_id # 添加到数据库会话并提交 db.commit() # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态 # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None # 根据你的响应示例,data 为 null } except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.delete("/delete/{id}") # 使用 ID 来标识要删除的接口 async def delete_dict_data( id: str, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从数据库中获取要删除的 OneShareApiEntity 实例 query = db.query(DutyPosition) query = query.filter(DutyPosition.id == id) query = query.filter(DutyPosition.del_flag != '2') position_data = query.first() # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first() if not position_data: return JSONResponse(status_code=404, content={ 'code': 404, 'msg': '值班岗位不存在' }) position_data.del_flag = '2' # 删除实例 # db.delete(api) db.commit() # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } except Exception as e: traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) })