#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends,Query,HTTPException from fastapi.responses import StreamingResponse from database import get_db from sqlalchemy.orm import Session from sqlalchemy import inspect,text from fastapi.responses import JSONResponse from models import * from utils import * from utils.ry_system_util import * from common.security import valid_access_token from common.enc import mpfun, sys_user_data, sys_user_role_data, sys_user_post_data from common.db import db_czrz from common.auth_user import * import traceback import re router = APIRouter() @router.get('/') async def usercreate01( db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: return { "code": 200, "msg": "成功", "data": { "postIds":None, "posts":None, "roleIds":None, "roles":get_role(db), "user":None } } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post('') async def usercreate( request: Request, auth_user: AuthUser = Depends(find_auth_user), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: user_info = user_id_get_user_info(db, user_id) create_dept = user_info.dept_id if 'deptId' in body: deptId = body['deptId'] dept_info = dept_id_get_dept_info(db, deptId) deptName = dept_info.dept_name if dept_info is not None else None else: deptId = None deptName = None if 'phonenumber' in body: phonenumber = body['phonenumber'] else: phonenumber = None if 'email' in body: email = body['email'] else: email = None if 'sex' in body: sex = body['sex'] else: sex = None if 'yzyAccount' in body: yzyAccount = body['yzyAccount'] else: yzyAccount = None userName = body['userName'] user_info = user_name_get_user_info(db, userName) if user_info: return {"code": 500, "msg": "用户名称已存在"} pattern = r'^[a-zA-Z0-9_]+$' if re.match(pattern, userName) == False: return {"code": 404, "msg": "用户名称又字母大小写、阿拉伯数字和下划线组成"} nickName = body['nickName'] roleIds = body['roleIds'] postIds = body['postIds'] remark = body['remark'] status = body['status'] password = body['password'] new_user = SysUser( user_name = userName, nick_name=nickName, dept_id=deptId, dept_name=deptName, phonenumber=phonenumber, email=email, sex=sex, remark=remark, status=status, password=password, yzy_account=yzyAccount, create_dept=create_dept, create_by=user_id, update_time=datetime.now(), update_by=user_id, login=0, login_date=datetime.now(), sign = '' ) db.add(new_user) db.commit() db.refresh(new_user) sys_user_data.sign_row(db, new_user) # 创建新的用户角色关联 new_roles = [SysUserRole(user_id=new_user.user_id, role_id=role_id) for role_id in roleIds] db.add_all(new_roles) # 创建新的用户角色关联 new_posts = [SysUserPost(user_id=new_user.user_id, post_id=post_id) for post_id in postIds] db.add_all(new_posts) sys_user_role_data.sign_table() sys_user_post_data.sign_table() db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理创建用户【{nickName}】成功", request.client.host) return {"code": 200, "msg": "创建成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('') async def userupdate( request: Request, auth_user: AuthUser = Depends(find_auth_user), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: user_id_1 = body['userId'] user = user_id_get_user_info(db,user_id_1) nickName = body['nickName'] deptId = body['deptId'] phonenumber = body['phonenumber'] email = body['email'] sex = body['sex'] roleIds = body['roleIds'] postIds = body['postIds'] remark = body['remark'] yzyAccount = body['yzyAccount'] status = body['status'] dept_info = dept_id_get_dept_info(db, deptId) deptName = dept_info.dept_name # 还原界面不用改的加密字段 user.user_name = mpfun.dec_data(user.user_name) user.password = mpfun.dec_data(user.password) user.nick_name = nickName user.dept_id = deptId user.dept_name = deptName user.phonenumber = phonenumber user.email = email user.sex = sex user.remark = remark user.yzy_account = yzyAccount user.status = status user.sign = '' db.commit() db.refresh(user) sys_user_data.sign_row(db, user) # 解析角色ID列表 # 1.清除当前用户的所有角色关联 db.query(SysUserRole).filter(SysUserRole.user_id == user_id_1).delete() # 2.创建新的用户角色关联 new_roles = [SysUserRole(user_id=user_id_1, role_id=role_id) for role_id in roleIds] db.add_all(new_roles) # 解析岗位ID列表 # 1.清除当前用户的所有岗位关联 db.query(SysUserPost).filter(SysUserPost.user_id == user_id_1).delete() # 2.创建新的用户角色关联 new_posts = [SysUserPost(user_id=user_id_1, post_id=post_id) for post_id in postIds] db.add_all(new_posts) sys_user_role_data.sign_table() sys_user_post_data.sign_table() db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理更新用户【{nickName}】成功", request.client.host) return {"code": 200, "msg": "更新成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/resetPwd') async def userupdate( request: Request, auth_user: AuthUser = Depends(find_auth_user), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token), body = Depends(remove_xss_json) ): try: user_id_1 = body['userId'] user = user_id_get_user_info(db,user_id_1) password = body['password'] user.password = mpfun.enc_data(password) user.sign = sys_user_data.get_sign_hmac(user) user.update_by = user_id user.update_time = datetime.now() db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理重置用户密码【{user.nick_name}】成功", request.client.host) return {"code": 200, "msg": "重置用户密码成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post("/export") async def export_to_excel( request: Request, auth_user: AuthUser = Depends(find_auth_user), db: Session = Depends(get_db), user_id: str = Depends(valid_access_token) ): # 获取对应填报ID的数据表名称 data_table_name = 'sys_user' # 获取表结构(用户填报的字段) inspector = inspect(db.bind) columns = inspector.get_columns(data_table_name) # 提取用户填报的字段注释 user_report_columns = [col for col in columns if col['name'] in ['user_id', 'dept_id', 'user_name', 'nick_name', 'phonenumber']] column_comments = [col.get('comment', '') for col in user_report_columns] # 构建查询SQL,关联 sys_user 表获取 nick_name query_sql = f""" SELECT {', '.join([f'rd.{col["name"]}' for col in user_report_columns])} FROM {data_table_name} rd where del_flag<>'2' """ # 使用 text 包装查询字符串 result = db.execute(text(query_sql)) rows = result.fetchall() import pandas as pd from io import BytesIO # 将查询结果转换为 DataFrame df = pd.DataFrame(rows, columns= column_comments) # 将 DataFrame 导出为 Excel 文件 output = BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: df.to_excel(writer, index=False, sheet_name='用户列表') # 设置响应头 output.seek(0) headers = { 'Content-Disposition': 'attachment; filename="report_data.xlsx"', 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' } db_czrz.log(db, auth_user, "系统管理", f"后台管理导出用户数据成功", request.client.host) # 返回文件流 return StreamingResponse(output, headers=headers) @router.get('/getInfo') async def getInfo(request: Request,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: # 用户信息 info = db.query(SysUser).filter(SysUser.user_id == user_id).first() if info is None: return { "code": 200, "msg": "操作成功", "data": None} # 部门信息 dept_name = "" dept_info = db.query(SysDept).filter(SysDept.dept_id == info.dept_id).first() if dept_name is not None: dept_name = dept_info.dept_name # 角色信息 roles = [] role_ids = db.query(SysUserRole).filter(SysUserRole.user_id == user_id).all() for role in role_ids: role_info = db.query(SysRole).filter(SysRole.role_id == role.role_id).first() roles.append( { "roleId": role_info.role_id, "roleName": role_info.role_name, "roleKey": role_info.role_key, "roleSort": role_info.role_sort, "dataScope": role_info.data_scope, "menuCheckStrictly": role_info.menu_check_strictly, "deptCheckStrictly": role_info.dept_check_strictly, "status": role_info.status, "remark": role_info.remark, "createTime": get_datetime_str(role_info.create_time), "flag": False, "superAdmin": True } ) role_keys = [ n['roleKey'] for n in roles ] return { "code": 200, "msg": "操作成功", "data": { "user": { "userId": info.user_id, "tenantId": info.tenant_id, "deptId": info.dept_id, "userName": mpfun.dec_data(info.user_name), "nickName": info.nick_name, "userType": info.user_type, "email": mpfun.dec_data(info.email), "phonenumber": mpfun.dec_data(info.phonenumber), "sex": info.sex, "avatar": info.avatar, "status": info.status, "loginIp": info.login_ip, "loginDate": get_datetime_str(info.login_date), "remark": info.remark, "createTime": get_datetime_str(info.create_time), "deptName": dept_name, "roles": roles, "roleIds": None, "postIds": None, "roleId": None }, "permissions": [ "*:*:*" ], "roles": role_keys } } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/deptTree') async def deptTree(request: Request,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): def build_dept_tree(depts, parent_dept): dept_tree = [] for dept_info in depts: dept = { "id": dept_info.dept_id, "label": dept_info.dept_name, "parentId": dept_info.parent_id, "weight": dept_info.order_num } # print(dept_info.dept_id) children = parent_id_get_dept_info(db, dept_info.dept_id) if len(children) > 0: children_depts = build_dept_tree(children, dept) dept["children"] = children_depts dept_tree.append(dept) return dept_tree result = build_dept_tree(parent_id_get_dept_info(db, 0),None) return { "code": 200, "msg": "操作成功", "data": result } @router.get('/avcon/deptTree') async def deptTree(request: Request,label: str = Query(None, description='部门名称'),db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): """ def build_dept_tree(depts, parent_dept): dept_tree = [] for dept_info in depts: dept = { "id": dept_info.dept_id, "label": dept_info.dept_name, # "parentId": dept_info.parent_id, # "weight": dept_info.order_num } # print(dept_info.dept_id) children = parent_id_get_dept_info(db, dept_info.dept_id) if len(children) > 0: children_depts = build_dept_tree(children, dept) dept["children"] = children_depts # userlist=dept_id_get_user_info(db,dept_info.dept_id) # if userlist: # for user_info in userlist: # dept["children"].append({"id":user_info.user_id,"label":user_info.nick_name,"isLeaf":True}) else: dept['isLeaf']=True # userlist = dept_id_get_user_info(db, dept_info.dept_id) # if userlist: # for user_info in userlist: # dept["children"].append({"id": user_info.user_id, "label": user_info.nick_name, "isLeaf": True}) dept_tree.append(dept) return dept_tree # result = [] if label: query = db.query(SysDept) query = query.filter(SysDept.del_flag != '2') query = query.filter(SysDept.dept_name.like(f'%{label}%')) # for dept_info in : result=build_dept_tree(query.all(), None) else: result=build_dept_tree(parent_id_get_dept_info(db, 0),None) """ result = [] # 暂时写死 result.append({ "id": "G2@mm.zw.yj", "label": "指挥终端", "isLeaf": True }) result.append({ "id": "G5@mm.zw.yj", "label": "移动视频", "isLeaf": True }) result.append({ "id": "G6@mm.zw.yj", "label": "H323会议", "isLeaf": True }) return { "code": 200, "msg": "操作成功", "data": result } # def get_query_params(params: dict): # return params # def get_time_params(params: dict = Depends(get_query_params)): # try: # begin_time = params.get("beginTime") # end_time = params.get("endTime") # return begin_time, end_time # except: # return None,None @router.get('/list') async def userlist( deptId: int = Query(None ,description='部门id'), userName: str = Query(None, description='用户名'), status: int = Query(None, description='用户状态'), phonenumber : str = Query(None, description='手机号'), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: # 构建查询 query = db.query(SysUser) query = query.filter(SysUser.del_flag != '2') # 应用查询条件 if userName: query =query.filter(SysUser.user_name == mpfun.enc_data(userName)) if status: query =query.filter(SysUser.status == status) if phonenumber: query =query.filter(SysUser.phonenumber == mpfun.enc_data(phonenumber)) def get_dept_chli(dept_list : list,parent_id : int): depts = parent_id_get_dept_info(db,parent_id) if depts: for dept in depts: dept_list.append(dept.dept_id) get_dept_chli(dept_list, dept.dept_id) return dept_list if deptId: query = query.filter(SysUser.dept_id.in_(get_dept_chli([deptId],deptId))) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(SysUser.create_time.desc()) # 执行分页查询 users = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 user_list = [] for user in users: # roleIds = user_id_get_user_roleIds(db, user.user_id) # user_roles = role_id_list_get_user_role(db,roleIds) user_info = { "userId": user.user_id, "tenantId": user.tenant_id, "deptId": user.dept_id, "userName": mpfun.dec_data(user.user_name), "nickName": user.nick_name, "userType": user.user_type, "email": mpfun.dec_data(user.email), "phonenumber": mpfun.dec_data(user.phonenumber), "sex": user.sex, "avatar": user.avatar, "status": user.status, "loginIp": user.login_ip, "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "remark": user.remark, "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "deptName": user.dept_name, # "roles": user_roles, # "roleIds": None, # "postIds": None, # "roleId": None } user_list.append(user_info) # 返回结果 return { "code": 200, "msg": "成功用户列表", "rows": user_list, "total": total_items, "page": page, "pageSize": pageSize, "totalPages": (total_items + pageSize - 1) // pageSize } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/list/dept/{dept_id}') async def get_dept_user_list( # request: Request, dept_id: int, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysUser) query = query.filter(SysUser.del_flag != '2') if dept_id: query = query.filter(SysUser.dept_id == dept_id) user_list = query.all() # 将模型实例转换为字典 user_list_dict = [{ "userId": user.user_id, "tenantId": user.tenant_id, "deptId": user.dept_id, "userName": mpfun.dec_data(user.user_name), "nickName": user.nick_name, "userType": user.user_type, "email": mpfun.dec_data(user.email), "phonenumber": mpfun.dec_data(user.phonenumber), "sex": user.sex, "avatar": user.avatar, "status": user.status, "loginIp": user.login_ip, "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "remark": user.remark, "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "deptName": user.dept_name, "roles": None, "roleIds": None, "postIds": None, "roleId": None } for user in user_list] return { "code": 200, "data": user_list_dict, "msg": "操作成功" } @router.put('/changeStatus') async def change_user_status( request: Request, auth_user: AuthUser = Depends(find_auth_user), db: Session = Depends(get_db), body=Depends(remove_xss_json), user_id=Depends(valid_access_token) ): try: userId = body['userId'] status = body['status'] query = db.query(SysUser) query = query.filter(SysUser.del_flag != '2') query = query.filter(SysUser.user_id == userId) user = query.first() user.status = status user.sign = sys_user_data.get_sign_hmac(user) user.update_by = user_id user.update_time = datetime.now() db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理变更用户状态成功", request.client.host) return { "code": 200, "msg": "操作成功" } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.delete('/{user_id1}') async def delete_user( user_id1: str, request: Request, auth_user: AuthUser = Depends(find_auth_user), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): userIdList=user_id1.split(',') for user_id1 in userIdList: user = user_id_get_user_info(db,user_id1) # 将模型实例转换为字典 print(user) user.del_flag = '2' user.sign = sys_user_data.get_sign_hmac(user) user.update_by = user_id user.update_time = datetime.now() db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理删除用户{user.nick_name}记录成功", request.client.host) return { "code": 200, "data": None, "msg": "操作成功" } @router.get('/videoPoints') async def get_user_video_points( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): try: videoIds = user_id_get_user_videoIds(db,user_id) video_list = [i.video_code_int for i in videoIds] query = db.query(TpVideoLog) query = query.filter(TpVideoLog.video_code_int.in_(video_list)) video_info = query.all() return { "code": 200, "msg": "成功", "data": {"videoIds":[i.video_code_int for i in videoIds], "videoInfos":[{"name":info.name,"video_code_int":info.video_code_int} for info in video_info]} } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/videoPoints') async def get_user_video_points( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): try: query = db.query(SysUserVideo) query = query.filter(SysUserVideo.user_id == user_id) query.delete(synchronize_session=False) db.commit() for video in body: new_user_video = SysUserVideo( user_id=user_id, video_code_int = video ) db.add(new_user_video) db.commit() return { "code": 200, "msg": "成功", "data":None } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/authRole') async def authRoleUpdate( request: Request, userId: int = Query(None ,description='用户id'), roleIds: str = Query(None ,description='权限id'), auth_user: AuthUser = Depends(find_auth_user), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: user = user_id_get_user_info(db, userId) # 解析角色ID列表 role_ids_list = [int(id) for id in roleIds.split(',')] # 清除当前用户的所有角色关联 db.query(SysUserRole).filter(SysUserRole.user_id == userId).delete() # 创建新的用户角色关联 new_roles = [SysUserRole(user_id=userId, role_id=role_id) for role_id in role_ids_list] db.add_all(new_roles) db.commit() sys_user_role_data.sign_table() db_czrz.log(db, auth_user, "系统管理", f"后台管理分配用户{user.nick_name}角色成功", request.client.host) return {"code": 200, "msg": "更新成功", "data": None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/profile') async def userlist( #user_id_1: int = Query(None ,description='用户id'), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: user = user_id_get_user_info(db,user_id) # 将查询结果转换为列表形式的字典 roleIds = user_id_get_user_roleIds(db, user_id) user_roles = role_id_list_get_user_role(db,roleIds) dept = dept_id_get_dept_info(db,user.dept_id) user_info = { "admin":1 in roleIds, "avatar": user.avatar, "createBy":mpfun.dec_data(user_id_get_user_info(db,user.create_by).user_name), "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "delFlag":user.del_flag, "dept":{ "createBy": user_id_get_user_info(db,dept.create_by).user_name if dept.create_by else None, "createTime": dept.create_time.strftime('%Y-%m-%d %H:%M:%S') if dept.create_time else None, "updateBy": user_id_get_user_info(db,dept.update_by).user_name if dept.update_by else None, "updateTime": dept.create_time.strftime('%Y-%m-%d %H:%M:%S') if dept.create_time else None, "remark": '',#dept.remark, "deptId": dept.dept_id, "parentId": dept.parent_id, "ancestors": dept.ancestors, "deptName": dept.dept_name, "orderNum": dept.order_num, "leader": dept.leader_name, "phone": dept.phone, "email": dept.email, "status": dept.status, "delFlag": dept.del_flag, "parentName": dept.parent_name, "children": [] }, "deptId": user.dept_id, "email": mpfun.dec_data(user.email), "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "loginIp": user.login_ip, "nickName": user.nick_name, "password":"", "phonenumber": mpfun.dec_data(user.phonenumber), "postIds": None, "remark": user.remark, "roleId": None, "roleIds": roleIds, "roles": user_roles, "sex": user.sex, "status": user.status, "updateBy":user.update_by, "updateTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else None, "userId": user.user_id, "userName": mpfun.dec_data(user.user_name), "tenantId": user.tenant_id, "userType": user.user_type, "deptName": dept.dept_name, "postGroup":"工作人员", "roleGroup":'/'.join([i["roleName"] for i in user_roles]) } # roles = get_role(db,roleIds) # 返回结果 return { "code": 200, "msg": "成功用户列表", "data": user_info, # { # "user":user_list, # "roleIds":roleIds, # "roles":roles, # "postIds":[], # "posts":[] # } } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.post('/profile/avatar') async def updateAvatar( request: Request, auth_user: AuthUser = Depends(find_auth_user), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): db_czrz.log(db, auth_user, "系统管理", f"后台管理更新用户头像成功", request.client.host) return { "code": 200, "msg": "更新头像成功" } @router.put('/profile') async def userlist( request: Request, auth_user: AuthUser = Depends(find_auth_user), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id: int = Depends(valid_access_token)): try: user = user_id_get_user_info(db,user_id) if "email" in body: user.email = mpfun.enc_data(body['email']) if "nickName" in body: user.nick_name = body['nickName'] if "phonenumber" in body: user.phonenumber = mpfun.enc_data(body['phonenumber']) if "sex" in body: user.sex = body['sex'] user.sign = sys_user_data.get_sign_hmac(user) user.update_by = user_id user.update_time = datetime.now() db.commit() print('auth_user', auth_user) db_czrz.log(db, auth_user, "系统管理", f"后台管理更新用户【{user.nick_name}】个人信息成功", request.client.host) return {"code":200,"msg":"成功","data":None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.put('/profile/updatePwd') async def userlist( request: Request, auth_user: AuthUser = Depends(find_auth_user), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id: int = Depends(valid_access_token)): # 判断是否密码是否至少12位且必须包含大小写字母和数字 def check_password_base(pwd): zz_str = '^(?=.*\d)(?=.*[a-z])(?=.*[A-Z]).{12,}$' re1 = re.search(zz_str, pwd) if not re1: raise Exception('密码至少12位且必须包含大小写字母和数字') else: check_password_special(pwd) # 判断是否密码包含易猜解字符 def check_password_special(pwd): list_special = ['admin', 'root', 'crld', 'crland', 'test', 'hello', '147258', '147369', '258369'] x=len(list_special)-1 for pwd_special in list_special: if pwd_special in pwd.lower(): raise Exception('密码不能包含易猜解字符:'+str(pwd_special)) else: if pwd_special==list_special[x]: check_password_adv(pwd) # 判断是否是连续、重复以及易猜解 def check_password_adv(pwd): str_all = '1234567890-=' \ '=-0987654321' \ '!@#$%^&*()_+' \ '+_)(*&^%$#@!' \ 'abcdefghijklmnopqrstuvwxyz' \ 'zyxwvutsrqponmlkjihgfedcba' \ 'qwertyuiopasdfghjklzxcvbnm' \ 'mnbvcxzlkjhgfdsapoiuytrewq' \ '1qaz2wsx3edc4rfv5tgb6yhn7ujm8ik,9ol.0p;/' pwd_len=len(pwd) x = -1 y = x+3 while y < pwd_len: x+=1 y+=1 pwd_cut = pwd[x:y] #print(pwd_cut) if pwd_cut.lower() in str_all and len(pwd_cut) == 3: # 无论是大写还是小写都统统转换为小写,为了匹配大写 raise Exception('密码不能包含3位以上连续字符:'+str(pwd_cut)) elif len(pwd_cut) == 3 and pwd_cut[0].lower() == pwd_cut[1].lower() == pwd_cut[2].lower(): raise Exception('密码不能包含3位以上重复字符:'+str(pwd_cut)) else: if y==pwd_len: print('密码复杂度合格') try: user = user_id_get_user_info(db,user_id) if user.password != mpfun.enc_data(body['oldPassword']): return {"code": 500, "msg":"旧密码错误"} if "newPassword" in body: try: check_password_base(body['newPassword']) except Exception as e: return { 'code': 500, 'msg': str(e) } user.password = mpfun.enc_data(body['newPassword']) user.sign = sys_user_data.get_sign_hmac(user) user.update_by = user_id user.update_time = datetime.now() db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理更新用户【{user.nick_name}】个人中心密码成功", request.client.host) return {"code":200,"msg":"修改密码成功","data":None} except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/authRole/{user_id_1}') @router.get('/{user_id_1}') async def userlist( user_id_1: int = Query(None ,description='用户id'), db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): try: user = user_id_get_user_info(db,user_id_1) # 将查询结果转换为列表形式的字典 roleIds = user_id_get_user_roleIds(db, user_id_1) user_roles = role_id_list_get_user_role(db,roleIds) user_list = { "userId": user.user_id, "tenantId": user.tenant_id, "deptId": user.dept_id, "userName": mpfun.dec_data(user.user_name), "nickName": user.nick_name, "userType": user.user_type, "email": mpfun.dec_data(user.email), "phonenumber": mpfun.dec_data(user.phonenumber), "sex": user.sex, "avatar": user.avatar, "status": user.status, "loginIp": user.login_ip, "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '', "remark": user.remark, "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '', "deptName": user.dept_name, "yzyAccount": mpfun.dec_data(user.yzy_account), } roles = get_role(db,roleIds) postIds = user_id_get_user_postIds(db, user_id_1) posts = dept_id_get_dept_post(db, user.dept_id) # 返回结果 return { "code": 200, "msg": "成功用户列表", "data": { "user":user_list, # 用户当前的所属角色ID列表 "roleIds":roleIds, # 用户当前所属部门岗位ID列表 "postIds": postIds, # 可供选择的角色列表 "roles":roles, # 可供选择的部门岗位列表 "posts": posts } } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")