#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends,Query,HTTPException from database import get_db from sqlalchemy.orm import Session from fastapi.responses import JSONResponse from models import * from utils import * from utils.ry_system_util import * from common.security import valid_access_token import traceback router = APIRouter() @router.get('/') async def usercreate01( db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: return { "code": 200, "msg": "成功", "data": { "postIds":None, "posts":None, "roleIds":None, "roles":get_role(db), "user":None } } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post('') async def usercreate( db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: if 'deptId' in body: deptId = body['deptId'] else: deptId = None if 'phonenumber' in body: phonenumber = body['phonenumber'] else: phonenumber = None if 'email' in body: email = body['email'] else: email = None if 'sex' in body: sex = body['sex'] else: sex = None userName = body['userName'] user = user_name_get_user_info(db,userName) if user: return JSONResponse(status_code=404, content={"code": 404, "msg": "用户名称已存在"}) pattern = r'^[a-zA-Z0-9_]+$' if re.match(pattern, userName) == False: return JSONResponse(status_code=404, content={"code": 404, "msg": "用户名称又字母大小写、阿拉伯数字和下划线组成"}) nickName = body['nickName'] roleIds = body['roleIds'] postIds = body['postIds'] remark = body['remark'] status = body['status'] password = '$2a$10$b8yUzN0C71sbz.PhNOCgJe.Tu1yWC3RNrTyjSQ8p1W0.aaUXUJ.Ne' new_user = SysUser( user_name = userName, nick_name=nickName, dept_id=deptId, phonenumber=phonenumber, email=email, sex=sex, remark=remark, status=status, password=password ) db.add(new_user) db.commit() db.refresh(new_user) # 创建新的用户角色关联 new_roles = [SysUserRole(user_id=new_user.user_id, role_id=role_id) for role_id in roleIds] db.add_all(new_roles) # 创建新的用户角色关联 new_posts = [SysUserPost(user_id=new_user.user_id, post_id=post_id) for post_id in postIds] db.add_all(new_posts) db.commit() return {"code": 200, "msg": "创建成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('') async def userupdate( db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: user_id_1 = body['userId'] user = user_id_get_user_info(db,user_id_1) nickName = body['nickName'] deptId = body['deptId'] phonenumber = body['phonenumber'] email = body['email'] sex = body['sex'] roleIds = body['roleIds'] postIds = body['postIds'] remark = body['remark'] user.nick_name=nickName user.dept_id=deptId user.phonenumber=phonenumber user.email=email user.sex=sex user.remark=remark # 解析角色ID列表 # role_ids_list = [int(id) for id in roleIds.split(',')] # 清除当前用户的所有角色关联 db.query(SysUserRole).filter(SysUserRole.user_id == user_id_1).delete() # 创建新的用户角色关联 new_roles = [SysUserRole(user_id=user_id_1, role_id=role_id) for role_id in roleIds] db.add_all(new_roles) # 解析角色ID列表 # post_ids_list = [int(id) for id in postIds.split(',')] # 清除当前用户的所有角色关联 db.query(SysUserPost).filter(SysUserPost.user_id == user_id_1).delete() # 创建新的用户角色关联 new_posts = [SysUserPost(user_id=user_id_1, post_id=post_id) for post_id in postIds] db.add_all(new_posts) db.commit() return {"code": 200, "msg": "更新成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/resetPwd') async def userupdate( db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: user_id_1 = body['userId'] user = user_id_get_user_info(db,user_id_1) password = body['password'] user.password=password user.update_by=user_id db.commit() return {"code": 200, "msg": "更新成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/getInfo') async def getInfo(request: Request,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: # 用户信息 info = db.query(SysUser).filter(SysUser.user_id == user_id).first() if info is None: return { "code": 200, "msg": "操作成功", "data": None} # 部门信息 dept_name = "" dept_info = db.query(SysDept).filter(SysDept.dept_id == info.dept_id).first() if dept_name is not None: dept_name = dept_info.dept_name # 角色信息 roles = [] role_ids = db.query(SysUserRole).filter(SysUserRole.user_id == user_id).all() for role in role_ids: role_info = db.query(SysRole).filter(SysRole.role_id == role.role_id).first() roles.append( { "roleId": role_info.role_id, "roleName": role_info.role_name, "roleKey": role_info.role_key, "roleSort": role_info.role_sort, "dataScope": role_info.data_scope, "menuCheckStrictly": role_info.menu_check_strictly, "deptCheckStrictly": role_info.dept_check_strictly, "status": role_info.status, "remark": role_info.remark, "createTime": get_datetime_str(role_info.create_time), "flag": False, "superAdmin": True } ) role_keys = [ n['roleKey'] for n in roles ] return { "code": 200, "msg": "操作成功", "data": { "user": { "userId": info.user_id, "tenantId": info.tenant_id, "deptId": info.dept_id, "userName": info.user_name, "nickName": info.nick_name, "userType": info.user_type, "email": info.email, "phonenumber": info.phonenumber, "sex": info.sex, "avatar": info.avatar, "status": info.status, "loginIp": info.login_ip, "loginDate": get_datetime_str(info.login_date), "remark": info.remark, "createTime": get_datetime_str(info.create_time), "deptName": dept_name, "roles": roles, "roleIds": None, "postIds": None, "roleId": None }, "permissions": [ "*:*:*" ], "roles": role_keys } } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/deptTree') async def deptTree(request: Request,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): def build_dept_tree(depts, parent_dept): dept_tree = [] for dept_info in depts: dept = { "id": dept_info.dept_id, "label": dept_info.dept_name, "parentId": dept_info.parent_id, "weight": dept_info.order_num } # print(dept_info.dept_id) children = parent_id_get_dept_info(db, dept_info.dept_id) if len(children) > 0: children_depts = build_dept_tree(children, dept) dept["children"] = children_depts dept_tree.append(dept) return dept_tree result = build_dept_tree(parent_id_get_dept_info(db, 0),None) return { "code": 200, "msg": "操作成功", "data": result } # def get_query_params(params: dict): # return params # def get_time_params(params: dict = Depends(get_query_params)): # try: # begin_time = params.get("beginTime") # end_time = params.get("endTime") # return begin_time, end_time # except: # return None,None @router.get('/list') async def userlist( deptId: int = Query(None ,description='部门id'), userName: str = Query(None, description='用户名'), status: int = Query(None, description='用户状态'), phonenumber : str = Query(None, description='手机号'), # params:dict = Query(None, description='创建日期'), # beginTime: str = Depends(get_time_params), # endTime: str = Depends(get_time_params), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: # 构建查询 query = db.query(SysUser) query = query.filter(SysUser.del_flag != '2') # 应用查询条件 # if beginTime: # query = query.filter(SysUser.create_time >= beginTime) # if endTime: # query = query.filter(SysUser.create_time <= endTime) # if params: # query = query.filter(SysUser.create_time >= params.get("beginTime")) # query = query.filter(SysUser.create_time <= params.get("endTime")) if userName: query =query.filter(SysUser.user_name.like(f'%{userName}%')) if status: query =query.filter(SysUser.status==status) if phonenumber: query =query.filter(SysUser.phonenumber.like(f'%{phonenumber}%')) def get_dept_chli(dept_list : list,parent_id : int): depts = parent_id_get_dept_info(db,parent_id) if depts: for dept in depts: dept_list.append(dept.dept_id) get_dept_chli(dept_list, dept.dept_id) return dept_list if deptId: query = query.filter(SysUser.dept_id.in_(get_dept_chli([],deptId))) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(SysUser.create_time.desc()) # 执行分页查询 users = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 user_list = [] for user in users: roleIds = user_id_get_user_roleIds(db, user.user_id) user_roles = role_id_list_get_user_role(db,roleIds) user_info = { "userId": user.user_id, "tenantId": user.tenant_id, "deptId": user.dept_id, "userName": user.user_name, "nickName": user.nick_name, "userType": user.user_type, "email": user.email, "phonenumber": user.phonenumber, "sex": user.sex, "avatar": user.avatar, "status": user.status, "loginIp": user.login_ip, "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "remark": user.remark, "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "deptName": user.dept_name, "roles": user_roles, "roleIds": None, "postIds": None, "roleId": None } user_list.append(user_info) # 返回结果 return { "code": 200, "msg": "成功用户列表", "rows": user_list, "total": total_items, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/list/dept/{dept_id}') async def get_dept_user_list( # request: Request, dept_id: int, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysUser) query = query.filter(SysUser.del_flag != '2') if dept_id: query = query.filter(SysUser.dept_id == dept_id) user_list = query.all() # 将模型实例转换为字典 user_list_dict = [{ "userId": user.user_id, "tenantId": user.tenant_id, "deptId": user.dept_id, "userName": user.user_name, "nickName": user.nick_name, "userType": user.user_type, "email": user.email, "phonenumber": user.phonenumber, "sex": user.sex, "avatar": user.avatar, "status": user.status, "loginIp": user.login_ip, "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "remark": user.remark, "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "deptName": user.dept_name, "roles": None, "roleIds": None, "postIds": None, "roleId": None } for user in user_list] return { "code": 200, "data": user_list_dict, "msg": "操作成功" } @router.put('/changeStatus') async def change_user_status( db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token) ): try: userId = body['userId'] status = body['status'] query = db.query(SysUser) query = query.filter(SysUser.del_flag != '2') query = query.filter(SysUser.user_id == userId) user = query.first() user.status= status user.update_by=user_id db.commit() return { "code": 200, "msg": "操作成功" } except Exception as e: # 处理异常 db.rollback() traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.delete('/{user_id1}') async def delete_dept( user_id1:int, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): user = user_id_get_user_info(db,user_id1) # 将模型实例转换为字典 print(user) user.del_flag = '2' user.update_by = user_id db.commit() return { "code": 200, "data": None, "msg": "操作成功" } @router.get('/videoPoints') async def get_user_video_points( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): try: videoIds = user_id_get_user_videoIds(db,user_id) video_list = [i.video_code_int for i in videoIds] query = db.query(TpVideoLog) query = query.filter(TpVideoLog.video_code_int.in_(video_list)) video_info = query.all() return { "code": 200, "msg": "成功", "data": {"videoIds":[i.video_code_int for i in videoIds], "videoInfos":[{"name":info.name,"video_code_int":info.video_code_int} for info in video_info]} } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/videoPoints') async def get_user_video_points( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): try: query = db.query(SysUserVideo) query = query.filter(SysUserVideo.user_id == user_id) query.delete(synchronize_session=False) db.commit() for video in body: new_user_video = SysUserVideo( user_id=user_id, video_code_int = video ) db.add(new_user_video) db.commit() return { "code": 200, "msg": "成功", "data":None } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/authRole') async def authRoleUpdate( userId: int = Query(None ,description='用户id'), roleIds: str = Query(None ,description='权限id'), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: # 解析角色ID列表 role_ids_list = [int(id) for id in roleIds.split(',')] # 清除当前用户的所有角色关联 db.query(SysUserRole).filter(SysUserRole.user_id == userId).delete() # 创建新的用户角色关联 new_roles = [SysUserRole(user_id=userId, role_id=role_id) for role_id in role_ids_list] db.add_all(new_roles) db.commit() return {"code": 200, "msg": "更新成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/profile') async def userlist( #user_id_1: int = Query(None ,description='用户id'), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: user = user_id_get_user_info(db,user_id) # 将查询结果转换为列表形式的字典 roleIds = user_id_get_user_roleIds(db, user_id) user_roles = role_id_list_get_user_role(db,roleIds) dept = dept_id_get_dept_info(db,user.dept_id) user_list = { "admin":1 in roleIds, "avatar": user.avatar, "createBy":user_id_get_user_info(db,user.create_by).user_name, "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "delFlag":user.del_flag, "dept":{ "createBy": user_id_get_user_info(db,dept.create_by).user_name if user.create_by else None, "createTime": dept.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else None, "updateBy": user_id_get_user_info(db,dept.update_by).user_name if user.update_by else None, "updateTime": dept.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else None, "remark": '',#dept.remark, "deptId": dept.dept_id, "parentId": dept.parent_id, "ancestors": dept.ancestors, "deptName": dept.dept_name, "orderNum": dept.order_num, "leader": dept.leader_name, "phone": dept.phone, "email": dept.email, "status": dept.status, "delFlag": dept.del_flag, "parentName": dept.parent_name, "children": [] }, "deptId": user.dept_id, "email": user.email, "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "loginIp": user.login_ip, "nickName": user.nick_name, "password":"", "phonenumber": user.phonenumber, "postIds": None, "remark": user.remark, "roleId": None, "roleIds": roleIds, "roles": user_roles, "sex": user.sex, "status": user.status, "updateBy":user.update_by, "updateTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else None, "userId": user.user_id, "userName": user.user_name, "tenantId": user.tenant_id, "userType": user.user_type, "deptName": dept.dept_name, } roles = get_role(db,roleIds) # 返回结果 return { "code": 200, "msg": "成功用户列表", "data": user_list, # { # "user":user_list, # "roleIds":roleIds, # "roles":roles, # "postIds":[], # "posts":[] # } "postGroup":"工作人员", "roleGroup":'/'.join([i["roleName"] for i in user_roles]) } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/profile') async def userlist( #user_id_1: int = Query(None ,description='用户id'), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id: int = Depends(valid_access_token)): try: user = user_id_get_user_info(db,user_id) if "email" in body: user.email=body['email'] user.update_by=user_id if "nickName" in body: user.nick_name = body['nickName'] user.update_by=user_id if "phonenumber" in body: user.phonenumber = body['phonenumber'] user.update_by=user_id if "sex" in body: user.nick_name = body['sex'] user.update_by=user_id db.commit() return {"code":200,"msg":"成功","data":None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/profile/updatePwd') async def userlist( #user_id_1: int = Query(None ,description='用户id'), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id: int = Depends(valid_access_token)): try: user = user_id_get_user_info(db,user_id) if user.password!=body['oldPassword']: return JSONResponse(status_code=404,content={"code":404,"msg":"密码错误"}) if "newPassword" in body: user.nick_name = body['newPassword'] user.update_by=user_id db.commit() return {"code":200,"msg":"成功","data":None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/authRole/{user_id_1}') @router.get('/{user_id_1}') async def userlist( user_id_1: int = Query(None ,description='用户id'), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: user = user_id_get_user_info(db,user_id_1) # 将查询结果转换为列表形式的字典 roleIds = user_id_get_user_roleIds(db, user_id_1) user_roles = role_id_list_get_user_role(db,roleIds) user_list = { "userId": user.user_id, "tenantId": user.tenant_id, "deptId": user.dept_id, "userName": user.user_name, "nickName": user.nick_name, "userType": user.user_type, "email": user.email, "phonenumber": user.phonenumber, "sex": user.sex, "avatar": user.avatar, "status": user.status, "loginIp": user.login_ip, "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "remark": user.remark, "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "deptName": user.dept_name, "roles": user_roles, "roleIds": roleIds, "postIds": None, "roleId": None } roles = get_role(db,roleIds) # 返回结果 return { "code": 200, "msg": "成功用户列表", "data": { "user":user_list, "roleIds":roleIds, "roles":roles, "postIds":[], "posts":[] } } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")