#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from common.security import valid_access_token from pydantic import BaseModel from database import get_db from sqlalchemy.orm import Session from typing import List from models import * from utils import * from utils.ry_system_util import * import json from sqlalchemy.sql import func from common.enc import mpfun, sys_dept_data from common.auth_user import * from common.db import db_czrz import traceback router = APIRouter() # def dept_id_get_dept_info(db,dept_id): # query = db.query(SysDept) # query = query.filter(SysDept.del_flag != '2') # query = query.filter(SysDept.dept_id == dept_id) # return query.first() # # def user_id_get_user_info(db,user_id): # query = db.query(SysUser) # query = query.filter(SysUser.del_flag != '2') # query = query.filter(SysUser.dept_id == user_id) # return query.first() @router.get('/list') async def get_list( # request: Request, deptName: str = Query(None, max_length=100), deptCategory:str = Query(None, max_length=100), status: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysDept) query = query.filter(SysDept.del_flag != '2') if deptName: query = query.filter(SysDept.dept_name.like(f'%{deptName}%')) if deptCategory: query = query.filter(SysDept.dept_category.like(f'%{deptCategory}%')) if status: query = query.filter(SysDept.status.like(f'%{status}%')) dept_list = query.all() # 将模型实例转换为字典 dept_list_dict = [{ "deptId": dept.dept_id, "deptName": dept.dept_name, "ancestors": dept.ancestors, "deptCategory": dept.dept_category, "leader": dept.leader, "email": mpfun.dec_data(dept.email), "leaderName": dept.leader_name, "orderNum": dept.order_num, "parentId": dept.parent_id, "parentName": dept.parent_name, "status": dept.status, "phone": mpfun.dec_data(dept.phone), "createTime": dept.create_time.strftime('%Y-%m-%d %H:%M:%S') if dept.create_time else '', } for dept in dept_list] return { "code": 200, "data": dept_list_dict, "msg": "操作成功" } @router.get('/list/exclude/{dept_id}') async def get_list( # request: Request, dept_id: int, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): # query = db.query(SysDept) # query = query.filter(SysDept.del_flag != '2') # if dept_id: # query = query.filter(SysDept.dept_id != dept_id) dept_list = parent_id_get_exclude_dept_info(db,dept_id) # 将模型实例转换为字典 dept_list_dict = [{ "deptId": dept.dept_id, "deptName": dept.dept_name, "ancestors": dept.ancestors, "deptCategory": dept.dept_category, "leader": dept.leader, "email": mpfun.dec_data(dept.email), "leaderName": dept.leader_name, "orderNum": dept.order_num, "parentId": dept.parent_id, "parentName": dept.parent_name, "status": dept.status, "phone": mpfun.dec_data(dept.phone), "createTime": dept.create_time.strftime('%Y-%m-%d %H:%M:%S') if dept.create_time else '', } for dept in dept_list] return { "code": 200, "data": dept_list_dict, "msg": "操作成功" } @router.get('/{dept_id}') async def get_dept_info( dept_id:int, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): dept = dept_id_get_dept_info(db,dept_id) # 将模型实例转换为字典 dept_dict = { "deptId": dept.dept_id, "deptName": dept.dept_name, "ancestors": dept.ancestors, "deptCategory": dept.dept_category, "leader": dept.leader, "email": mpfun.dec_data(dept.email), "leaderName": dept.leader_name, "orderNum": dept.order_num, "parentId": dept.parent_id, "parentName": dept.parent_name, "status": dept.status, "phone": mpfun.dec_data(dept.phone), "createTime": dept.create_time.strftime('%Y-%m-%d %H:%M:%S') if dept.create_time else '', } return { "code": 200, "data": dept_dict, "msg": "操作成功" } @router.delete('/{dept_id}') async def delete_dept( request: Request, dept_id: int, auth_user: AuthUser = Depends(find_auth_user), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): dept = dept_id_get_dept_info(db,dept_id) # 将模型实例转换为字典 if dept is not None: user_list = dept_id_get_user_info(db, dept.dept_id) user_count = len(user_list) if user_count > 0: return { "code": 500, "data": None, "msg": f"该部门还有{user_count}个用户,不能删除" } dept.del_flag = '2' dept.update_by = user_id dept.update_time = datetime.now() dept.sign = sys_dept_data.get_sign_hmac(dept) db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理删除部门(单位)【{dept.dept_name}】成功", request.client.host) return { "code": 200, "data": None, "msg": "操作成功" } @router.post('') async def create_dept( request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), auth_user: AuthUser = Depends(find_auth_user), user_id = Depends(valid_access_token) ): try: # 验证必需的字段 required_fields = ['deptName', 'orderNum', 'parentId', 'status'] missing_fields = [field for field in required_fields if field not in body] print(missing_fields) if missing_fields: raise HTTPException(status_code=401, detail=f"Missing required fields: {', '.join(missing_fields)}") deptName = body['deptName'] orderNum = body['orderNum'] parentId = body['parentId'] status = body['status'] deptCategory = None email = None phone = None leader = None if 'deptCategory' in body: deptCategory = body['deptCategory'] if 'email' in body: email = body['email'] if 'phone' in body: phone = body['phone'] if 'leader' in body: leader = body['leader'] dept_parent = dept_id_get_dept_info(db,parentId) user = user_id_get_user_info(db,user_id) new_dept = SysDept( parent_id=parentId, parent_name = dept_parent.dept_name, ancestors = dept_parent.ancestors+','+str(parentId), dept_name = deptName, dept_category = deptCategory, order_num = orderNum, phone = phone, email = email, status = status, leader = leader, create_by = user_id, create_dept = user.dept_id, update_by = user_id, sign = '' ) db.add(new_dept) db.commit() sys_dept_data.sign_table() db_czrz.log(db, auth_user, "系统管理", f"后台管理新建部门(单位)【{deptName}】成功", request.client.host) return { "code": 200, "data": None, "msg": "操作成功" } except Exception as e: db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('') async def dept_info_update( request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), auth_user: AuthUser = Depends(find_auth_user), user_id = Depends(valid_access_token) ): try: required_fields = ['deptId','deptName', 'orderNum', 'parentId', 'status'] missing_fields = [field for field in required_fields if field not in body] if missing_fields: raise HTTPException(status_code=401, detail=f"Missing required fields: {', '.join(missing_fields)}") deptId = body['deptId'] dept = dept_id_get_dept_info(db,deptId) if not dept: detail = "部门不存在" raise HTTPException(status_code=404, detail="部门不存在") dept.deptName = body['deptName'] dept.orderNum = body['orderNum'] dept.parentId = body['parentId'] dept.status = body['status'] if 'deptCategory' in body: dept.dept_category = body['deptCategory'] if 'email' in body: dept.email = body['email'] if 'phone' in body: dept.phone = body['phone'] if 'leader' in body: dept.leader = body['leader'] leader_user_info = user_id_get_user_info(db, dept.leader) if leader_user_info is not None: dept.leader_name = leader_user_info.nick_name dept_parent = dept_id_get_dept_info(db, dept.parentId) dept.parent_name = dept_parent.dept_name, dept.ancestors = dept_parent.ancestors+','+str(dept.parentId), dept.update_by = user_id dept.update_time = datetime.now() dept.sign = '' db.commit() sys_dept_data.sign_table() db_czrz.log(db, auth_user, "系统管理", f"后台管理更新部门(单位)【{body['deptName']}】成功", request.client.host) return { "code": 200, "msg": "部门更新成功" } except Exception as e: traceback.print_exc() db.rollback() if str(e)=='': e = detail raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))