#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends,Query,HTTPException from database import get_db from sqlalchemy.orm import Session from sqlalchemy import not_ from fastapi.responses import JSONResponse from models import * from utils import * from utils.ry_system_util import * from common.security import valid_access_token import traceback router = APIRouter() @router.post('') async def rolecreate( db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: menuCheckStrictly = body['menuCheckStrictly'] if menuCheckStrictly: menu_check_strictly = 1 else: menu_check_strictly = 0 role_key = body['roleKey'] role_name = body['roleName'] role_sort = body['roleSort'] status = body['status'] remark = body['remark'] menuIds = body['menuIds'] new_role = SysRole( menu_check_strictly=menu_check_strictly, role_key=role_key, role_name=role_name, role_sort=role_sort, status=status, remark=remark ) db.add(new_role) db.commit() db.refresh(new_role) # 创建新的用户角色关联 new_role_menus = [SysRoleMenu(role_id=new_role.role_id, menu_id=menu_id) for menu_id in menuIds] db.add_all(new_role_menus) db.commit() return {"code": 200, "msg": "创建成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('') async def roleupdate( db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: roleId = body['roleId'] query = db.query(SysRole) query = query.filter(SysRole.del_flag != '2') query = query.filter(SysRole.role_id == roleId) role = query.first() if not role : return JSONResponse(status_code=410, content={ 'errcode': 410, 'errmsg': f'角色{roleId}不存在' }) role.role_key = body['roleKey'] role.role_name = body['roleName'] role.role_sort = body['roleSort'] role.status = body['status'] role.remark = body['remark'] menuCheckStrictly = body['menuCheckStrictly'] if menuCheckStrictly: role.menu_check_strictly = 1 else: role.menu_check_strictly = 0 menuIds = body['menuIds'] # 清除当前用户的所有角色关联 db.query(SysRoleMenu).filter(SysRoleMenu.role_id == roleId).delete() # 创建新的用户角色关联 new_role_menus = [SysRoleMenu(role_id=roleId, menu_id=menu_id) for menu_id in menuIds] db.add_all(new_role_menus) db.commit() return {"code": 200, "msg": "更新成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/dataScope') async def roleupdate( db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: roleId = body['roleId'] query = db.query(SysRole) query = query.filter(SysRole.del_flag != '2') query = query.filter(SysRole.role_id == roleId) role = query.first() if not role : return JSONResponse(status_code=410, content={ 'errcode': 410, 'errmsg': f'角色{roleId}不存在' }) role.data_scope = body['dataScope'] deptIds = body['deptIds'] # 清除当前用户的所有角色关联 db.query(SysRoleMenu).filter(SysRoleMenu.role_id == roleId).delete() # 创建新的用户角色关联 new_role_depts = [SysRoleDept(role_id=roleId, dept_id=dept_id) for dept_id in deptIds] db.add_all(new_role_depts) db.commit() return {"code": 200, "msg": "更新成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/changeStatus') async def roleupdate( db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: roleId = body['roleId'] query = db.query(SysRole) query = query.filter(SysRole.del_flag != '2') query = query.filter(SysRole.role_id == roleId) role = query.first() if not role : return JSONResponse(status_code=410, content={ 'errcode': 410, 'errmsg': f'角色{roleId}不存在' }) role.status = body['status'] db.commit() return {"code": 200, "msg": "更新成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") #deptTree @router.get('/deptTree/{roleId}') async def getmunutreeselect(request: Request,roleId:int,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): def build_dept_tree(depts, parent_dept): dept_tree = [] for dept_info in depts: dept = { "id": dept_info.dept_id, "label": dept_info.dept_name, "parentId": dept_info.parent_id, "weight": dept_info.order_num } # print(dept_info.dept_id) children = parent_id_get_dept_info(db, dept_info.dept_id) if len(children) > 0: children_depts = build_dept_tree(children, dept) dept["children"] = children_depts dept_tree.append(dept) return dept_tree checkedKeys = role_id_get_role_depts(db, roleId) result = build_dept_tree(parent_id_get_dept_info(db, 0), None) return { "code": 200, "msg": "操作成功", "data": {"depts":result,"checkedKeys":checkedKeys} } @router.get('/authUser/allocatedList') async def userlist( userName:str = Query(None,description='用户名称'), phonenumber:str= Query(None,description='用户手机号'), roleId: int = Query(None ,description='部门id'), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: query = db.query(SysUserRole) # query = query.filter(SysUserRole.del_flag != '2') query = query.filter(SysUserRole.role_id == roleId) resutl = query.all() users = [i.user_id for i in resutl] # 构建查询 query = db.query(SysUser) query = query.filter(SysUser.del_flag != '2') # 应用查询条件 query = query.filter(SysUser.user_id.in_(users)) if userName is not None: query = query.filter(SysUser.user_name.like(f'%{userName}%')) if phonenumber is not None: query = query.filter(SysUser.phonenumber.like(f'%{phonenumber}%')) # # print(query) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(SysUser.create_time.desc()) # 执行分页查询 users = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 user_list = [] for user in users: roleIds = []#user_id_get_user_roleIds(db, user.user_id) user_roles = []#role_id_list_get_user_role(db,roleIds) user_info = { "userId": user.user_id, "tenantId": user.tenant_id, "deptId": user.dept_id, "userName": user.user_name, "nickName": user.nick_name, "userType": user.user_type, "email": user.email, "phonenumber": user.phonenumber, "sex": user.sex, "avatar": user.avatar, "status": user.status, "loginIp": user.login_ip, "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "remark": user.remark, "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "deptName": user.dept_name, "roles": user_roles, "roleIds": roleIds, "postIds": None, "roleId": None } user_list.append(user_info) # 返回结果 return { "code": 200, "msg": "成功用户列表", "rows": user_list, "total": total_items, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/authUser/unallocatedList') async def userlist( userName:str = Query(None,description='用户名称'), phonenumber:str= Query(None,description='用户手机号'), roleId: int = Query(None ,description='部门id'), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: query = db.query(SysUserRole) # query = query.filter(SysUserRole.del_flag != '2') query = query.filter(SysUserRole.role_id == roleId) resutl = query.all() users = [i.user_id for i in resutl] # 构建查询 query = db.query(SysUser) query = query.filter(SysUser.del_flag != '2') # 应用查询条件 query = query.filter(not_(SysUser.user_id.in_(users))) if userName is not None: query = query.filter(SysUser.user_name.like(f'%{userName}%')) if phonenumber is not None: query = query.filter(SysUser.phonenumber.like(f'%{phonenumber}%')) # # print(query) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(SysUser.create_time.desc()) # 执行分页查询 users = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 user_list = [] for user in users: roleIds = []#user_id_get_user_roleIds(db, user.user_id) user_roles = []#role_id_list_get_user_role(db,roleIds) user_info = { "userId": user.user_id, "tenantId": user.tenant_id, "deptId": user.dept_id, "userName": user.user_name, "nickName": user.nick_name, "userType": user.user_type, "email": user.email, "phonenumber": user.phonenumber, "sex": user.sex, "avatar": user.avatar, "status": user.status, "loginIp": user.login_ip, "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "remark": user.remark, "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "deptName": user.dept_name, "roles": user_roles, "roleIds": roleIds, "postIds": None, "roleId": None } user_list.append(user_info) # 返回结果 return { "code": 200, "msg": "成功用户列表", "rows": user_list, "total": total_items, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/authUser/selectAll') async def cancel_user_role( roleId: str = Query(None), userIds:str=Query(None), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: userIdList=userIds.split(',') new_roles = [SysUserRole(user_id=user_id, role_id=roleId) for user_id in userIdList] db.add_all(new_roles) db.commit() return { "code": 200, "msg": "成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/authUser/cancel') async def cancel_user_role( db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: roleId=body['roleId'] userId=body['userId'] query = db.query(SysUserRole).filter(SysUserRole.role_id==roleId) query = query.filter(SysUserRole.role_id==roleId) query = query.filter(SysUserRole.user_id==userId) query.delete() db.commit() return { "code": 200, "msg": "成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/authUser/cancelAll') async def cancel_user_role( roleId: str = Query(None), userIds:str=Query(None), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: userIdList=userIds.split(',') query = db.query(SysUserRole).filter(SysUserRole.role_id==roleId) query = query.filter(SysUserRole.role_id==roleId) query = query.filter(SysUserRole.user_id.in_(userIdList)) query.delete() db.commit() return { "code": 200, "msg": "成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/list') async def rolelist( roleName: int = Query(None ,description='角色名称'), roleKey: str = Query(None, description='权限字符'), status: int = Query(None, description='角色状态'), # beginTime: str = Depends(get_time_params), # endTime: str = Depends(get_time_params), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: # 构建查询 query = db.query(SysRole) query = query.filter(SysRole.del_flag != '2') # 应用查询条件 # if beginTime: # query = query.filter(SysUser.create_time >= beginTime) # if endTime: # query = query.filter(SysUser.create_time <= endTime) # if params: # query = query.filter(SysUser.create_time >= params.get("beginTime")) # query = query.filter(SysUser.create_time <= params.get("endTime")) if roleName: query =query.filter(SysRole.role_name.like(f'%{roleName}%')) if status: query =query.filter(SysRole.status==status) if roleKey: query =query.filter(SysRole.role_key.like(f'%{roleKey}%')) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(SysRole.create_time.desc()) # 执行分页查询 roles = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 role_list = role_list_to_dict(roles,[]) # 返回结果 return { "code": 200, "msg": "成功", "rows": role_list, "total": total_items, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/{roleId}') async def rolelist( roleId:int, db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: # 构建查询 query = db.query(SysRole) query = query.filter(SysRole.del_flag != '2') # 应用查询条件 query = query.filter(SysRole.role_id==roleId) roles = query.all() # 将查询结果转换为列表形式的字典 role_list = role_list_to_dict(roles,[]) # 返回结果 return { "code": 200, "msg": "成功", "data": role_list[0] } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")