#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends,Query,HTTPException from database import get_db from sqlalchemy.orm import Session from models import * from utils import * from utils.ry_system_util import * from common.security import valid_access_token import traceback router = APIRouter() @router.get('/getInfo') async def getInfo(request: Request,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): # 用户信息 info = db.query(SysUser).filter(SysUser.user_id == user_id).first() # 部门信息 dept_name = "" dept_info = db.query(SysDept).filter(SysDept.dept_id == info.dept_id).first() if dept_name is not None: dept_name = dept_info.dept_name # 角色信息 roles = [] role_ids = db.query(SysUserRole).filter(SysUserRole.user_id == user_id).all() for role in role_ids: role_info = db.query(SysRole).filter(SysRole.role_id == role.role_id).first() roles.append( { "roleId": role_info.role_id, "roleName": role_info.role_name, "roleKey": role_info.role_key, "roleSort": role_info.role_sort, "dataScope": role_info.data_scope, "menuCheckStrictly": role_info.menu_check_strictly, "deptCheckStrictly": role_info.dept_check_strictly, "status": role_info.status, "remark": role_info.remark, "createTime": get_datetime_str(role_info.create_time), "flag": False, "superAdmin": True } ) role_keys = [ n['roleKey'] for n in roles ] return { "code": 200, "msg": "操作成功", "data": { "user": { "userId": info.user_id, "tenantId": info.tenant_id, "deptId": info.dept_id, "userName": info.user_name, "nickName": info.nick_name, "userType": info.user_type, "email": info.email, "phonenumber": info.phonenumber, "sex": info.sex, "avatar": info.avatar, "status": info.status, "loginIp": info.login_ip, "loginDate": get_datetime_str(info.login_date), "remark": info.remark, "createTime": get_datetime_str(info.create_time), "deptName": dept_name, "roles": roles, "roleIds": None, "postIds": None, "roleId": None }, "permissions": [ "*:*:*" ], "roles": role_keys } } @router.get('/deptTree') async def deptTree(request: Request,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): def build_dept_tree(depts, parent_dept): dept_tree = [] for dept_info in depts: dept = { "id": dept_info.dept_id, "label": dept_info.dept_name, "parentId": dept_info.parent_id, "weight": dept_info.order_num } # print(dept_info.dept_id) children = parent_id_get_dept_info(db, dept_info.dept_id) if len(children) > 0: children_depts = build_dept_tree(children, dept) dept["children"] = children_depts dept_tree.append(dept) return dept_tree result = build_dept_tree(parent_id_get_dept_info(db, 0),None) return { "code": 200, "msg": "操作成功", "data": result } # def get_query_params(params: dict): # return params # def get_time_params(params: dict = Depends(get_query_params)): # try: # begin_time = params.get("beginTime") # end_time = params.get("endTime") # return begin_time, end_time # except: # return None,None @router.get('/list') async def userlist( deptId: int = Query(None ,description='部门id'), userName: str = Query(None, description='用户名'), status: int = Query(None, description='用户状态'), phonenumber : str = Query(None, description='手机号'), # params:dict = Query(None, description='创建日期'), # beginTime: str = Depends(get_time_params), # endTime: str = Depends(get_time_params), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: # 构建查询 query = db.query(SysUser) query = query.filter(SysUser.del_flag != '2') # 应用查询条件 # if beginTime: # query = query.filter(SysUser.create_time >= beginTime) # if endTime: # query = query.filter(SysUser.create_time <= endTime) # if params: # query = query.filter(SysUser.create_time >= params.get("beginTime")) # query = query.filter(SysUser.create_time <= params.get("endTime")) if userName: query =query.filter(SysUser.user_name.like(f'%{userName}%')) if status: query =query.filter(SysUser.status==status) if phonenumber: query =query.filter(SysUser.phonenumber.like(f'%{phonenumber}%')) def get_dept_chli(dept_list : list,parent_id : int): depts = parent_id_get_dept_info(db,parent_id) if depts: for dept in depts: dept_list.append(dept.dept_id) get_dept_chli(dept_list, dept.dept_id) return dept_list if deptId: query = query.filter(SysUser.dept_id.in_(get_dept_chli([],deptId))) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(SysUser.create_time.desc()) # 执行分页查询 users = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 user_list = [ { "userId": user.user_id, "tenantId": user.tenant_id, "deptId": user.dept_id, "userName": user.user_name, "nickName": user.nick_name, "userType": user.user_type, "email": user.email, "phonenumber": user.phonenumber, "sex": user.sex, "avatar": user.avatar, "status": user.status, "loginIp": user.login_ip, "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "remark": user.remark, "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "deptName": user.dept_name, "roles": None, "roleIds": None, "postIds": None, "roleId": None } for user in users ] # 返回结果 return { "code": 200, "msg": "成功用户列表", "rows": user_list, "total": total_items, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/{user_id_1}') async def userlist( user_id_1: int = Query(None ,description='用户id'), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: user = user_id_get_user_info(db,user_id_1) # 将查询结果转换为列表形式的字典 roleIds = user_id_get_user_roleIds(db, user_id_1) user_roles = role_id_list_get_user_role(db,roleIds) user_list = { "userId": user.user_id, "tenantId": user.tenant_id, "deptId": user.dept_id, "userName": user.user_name, "nickName": user.nick_name, "userType": user.user_type, "email": user.email, "phonenumber": user.phonenumber, "sex": user.sex, "avatar": user.avatar, "status": user.status, "loginIp": user.login_ip, "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "remark": user.remark, "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "deptName": user.dept_name, "roles": user_roles, "roleIds": None, "postIds": None, "roleId": None } roles = get_role(db) # 返回结果 return { "code": 200, "msg": "成功用户列表", "data": { "user":user_list, "roleIds":roleIds, "roles":roles, "postIds":[], "posts":[] } } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/list/dept/{dept_id}') async def get_dept_user_list( # request: Request, dept_id: int, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysUser) query = query.filter(SysUser.del_flag != '2') if dept_id: query = query.filter(SysUser.dept_id == dept_id) user_list = query.all() # 将模型实例转换为字典 user_list_dict = [{ "userId": user.user_id, "tenantId": user.tenant_id, "deptId": user.dept_id, "userName": user.user_name, "nickName": user.nick_name, "userType": user.user_type, "email": user.email, "phonenumber": user.phonenumber, "sex": user.sex, "avatar": user.avatar, "status": user.status, "loginIp": user.login_ip, "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "remark": user.remark, "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "deptName": user.dept_name, "roles": None, "roleIds": None, "postIds": None, "roleId": None } for user in user_list] return { "code": 200, "data": user_list_dict, "msg": "操作成功" } @router.put('/changeStatus') async def change_user_status( db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token) ): try: userId = body['userId'] status = body['status'] query = db.query(SysUser) query = query.filter(SysUser.del_flag != '2') query = query.filter(SysUser.user_id == userId) user = query.first() user.status= status user.update_by=user_id db.commit() return { "code": 200, "msg": "操作成功" } except Exception as e: # 处理异常 db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.delete('/{user_id1}') async def delete_dept( user_id1:int, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): user = user_id_get_user_info(db,user_id1) # 将模型实例转换为字典 print(user) user.del_flag = '2' user.update_by = user_id db.commit() return { "code": 200, "data": None, "msg": "操作成功" }