#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends,Query, HTTPException, status from common.security import valid_access_token from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from pydantic import BaseModel from datetime import datetime from database import get_db from typing import List from models import * from utils import * import json import traceback router = APIRouter() @router.get('/list') async def get_inspection_user_list( nickName: str = Query(None, description='姓名'), deptId :str = Query(None, description='部门id'), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(5, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 构建查询 query = db.query(RiskManagementInspectionUser) query = query.filter(RiskManagementInspectionUser.del_flag != '2') # 应用查询条件 if nickName: query = query.filter(RiskManagementInspectionUser.nick_name.like(f'%{nickName}%')) if deptId: query = query.filter(RiskManagementInspectionUser.dept_id == deptId) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(RiskManagementInspectionUser.create_time.desc()) # 执行分页查询 InspectionUsers = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 InspectionUsers_list = [ { "id": user.id, "user_id": user.user_id, "dept_id": user.dept_id, "dept_name": user.dept_name, "ancestors_names": user.ancestors_names, "user_name": user.user_name, "create_time": user.create_time.strftime('%Y-%m-%d'), "phonenumber": user.phonenumber, "nick_name": user.nick_name, "area_code": user.area_code, "area": user.area, "yzy_account":user.yzy_account } for user in InspectionUsers ] # 返回结果 return { "code": 200, "msg": "成功", "data": InspectionUsers_list, "total": total_items, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } except Exception as e: # 处理异常 raise HTTPException(status_code=500, detail=str(e)) @router.get('/{id}') async def get_inspection_user( id: str , db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 构建查询 query = db.query(RiskManagementInspectionUser) query = query.filter(RiskManagementInspectionUser.del_flag != '2') # 应用查询条件 if id: query = query.filter(RiskManagementInspectionUser.id == id) # 执行查询 user = query.first() if not user: detail = "用户不存在" raise HTTPException(status_code=404, detail="用户不存在") # 将查询结果转换为列表形式的字典 inspection_user_result = { "id": user.id, "user_id": user.user_id, "dept_id": user.dept_id, "dept_name": user.dept_name, "ancestors_names": user.ancestors_names, "user_name": user.user_name, "create_time": user.create_time.strftime('%Y-%m-%d'), "phonenumber": user.phonenumber, "nick_name": user.nick_name, "area_code": user.area_code, "area": user.area, "yzy_account":user.yzy_account } # 返回结果 return { "code": 200, "msg": "成功", "data": inspection_user_result } except Exception as e: # 处理异常 if str(e)=='': e = detail raise HTTPException(status_code=500, detail=str(e)) @router.post('/create') async def create_inspection_user( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 创建新的预案记录 new_user = RiskManagementInspectionUser( user_id=body['user_id'], dept_id = body['dept_id'], dept_name = body['dept_name'], ancestors_names = body['ancestors_names'], user_name = body['user_name'], nick_name = body['nick_name'], phonenumber = body['phonenumber'], area_code = body['area_code'], area = body['area'], yzy_account = body['yzy_account'], create_by = user_id ) # 添加到数据库会话并提交 db.add(new_user) db.commit() db.refresh(new_user) # 可选,如果需要刷新实例状态 # 返回创建成功的响应 return { "code": 200, "msg": "成功", "data": None } except Exception as e: # 处理异常 raise HTTPException(status_code=500, detail=str(e)) @router.put('/update') async def update_inspection_user( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 提取请求数据 query = db.query(RiskManagementInspectionUser) query = query.filter(RiskManagementInspectionUser.id == body['id']) query = query.filter(RiskManagementInspectionUser.del_flag != '2') user = query.first() if not user: detail = "用户不存在" raise HTTPException(status_code=404, detail="用户不存在") if 'user_id' in body: user.user_id = body['user_id'] if 'dept_id' in body: user.dept_id = body['dept_id'] if 'dept_name' in body: user.dept_name = body['dept_name'] if 'ancestors_names' in body: user.ancestors_names = body['ancestors_names'] if 'user_name' in body: user.user_name = body['user_name'] if 'nick_name' in body: user.nick_name = body['nick_name'] if 'phonenumber' in body: user.phonenumber = body['phonenumber'] if 'area_code' in body: user.area_code = body['area_code'] if 'area' in body: user.area = body['area'] if user_id: user.update_by = user_id # 更新到数据库会话并提交 db.commit() db.refresh(user) # 可选,如果需要刷新实例状态 # 返回创建成功的响应 return { "code": 200, "msg": "成功", "data": None } except Exception as e: # 处理异常 if str(e)=='': e = detail raise HTTPException(status_code=500, detail=str(e)) @router.delete('/delete') async def delete_inspection_users( userIds: list, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 提取请求数据 query = db.query(RiskManagementInspectionUser) query = query.filter(RiskManagementInspectionUser.del_flag != '2') query = query.filter(RiskManagementInspectionUser.id.in_(userIds)) users = query.all() if not users: detail = "用户不存在" raise HTTPException(status_code=404, detail="用户不存在") for user in users: user.del_flag = '2' user.update_by=user_id # 更新到数据库会话并提交 db.commit() # 返回创建成功的响应 return { "code": 200, "msg": "删除成功", "data": None } except Exception as e: # 处理异常 if str(e) == '': e = detail raise HTTPException(status_code=500, detail=str(e)) @router.delete('/delete/{userId}') async def delete_inspection_user( userId: str, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 提取请求数据 query = db.query(RiskManagementInspectionUser) query = query.filter(RiskManagementInspectionUser.del_flag != '2') query = query.filter(RiskManagementInspectionUser.id==userId) user = query.first() if not user: detail = "用户不存在" raise HTTPException(status_code=404, detail="用户不存在") user.del_flag = '2' user.update_by = user_id # 更新到数据库会话并提交 db.commit() db.refresh(user) # 可选,如果需要刷新实例状态 # 返回创建成功的响应 return { "code": 200, "msg": "删除成功", "data": None } except Exception as e: # 处理异常 if str(e) == '': e = detail raise HTTPException(status_code=500, detail=str(e))