|
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends,Query,HTTPException
- from database import get_db
- from sqlalchemy.orm import Session
- from fastapi.responses import JSONResponse
- from models import *
- from utils import *
- from utils.ry_system_util import *
- from common.security import valid_access_token
- import traceback
- router = APIRouter()
- @router.get('/')
- async def usercreate01(
- db: Session = Depends(get_db),
- user_id: int = Depends(valid_access_token)):
- try:
- return {
- "code": 200,
- "msg": "成功",
- "data": {
- "postIds":None,
- "posts":None,
- "roleIds":None,
- "roles":get_role(db),
- "user":None
- }
- }
- except Exception as e:
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
- @router.post('')
- async def usercreate(
- db: Session = Depends(get_db),
- user_id: int = Depends(valid_access_token),
- body = Depends(remove_xss_json)
- ):
- try:
- if 'deptId' in body:
- deptId = body['deptId']
- else:
- deptId = None
- if 'phonenumber' in body:
- phonenumber = body['phonenumber']
- else:
- phonenumber = None
- if 'email' in body:
- email = body['email']
- else:
- email = None
- if 'sex' in body:
- sex = body['sex']
- else:
- sex = None
- userName = body['userName']
- user = user_name_get_user_info(db,userName)
- if user:
- return JSONResponse(status_code=404, content={"code": 404, "msg": "用户名称已存在"})
- pattern = r'^[a-zA-Z0-9_]+$'
- if re.match(pattern, userName) == False:
- return JSONResponse(status_code=404, content={"code": 404, "msg": "用户名称又字母大小写、阿拉伯数字和下划线组成"})
- nickName = body['nickName']
- roleIds = body['roleIds']
- postIds = body['postIds']
- remark = body['remark']
- status = body['status']
- password = '$2a$10$b8yUzN0C71sbz.PhNOCgJe.Tu1yWC3RNrTyjSQ8p1W0.aaUXUJ.Ne'
- new_user = SysUser(
- user_name = userName,
- nick_name=nickName,
- dept_id=deptId,
- phonenumber=phonenumber,
- email=email,
- sex=sex,
- remark=remark,
- status=status,
- password=password
- )
- db.add(new_user)
- db.commit()
- db.refresh(new_user)
- # 创建新的用户角色关联
- new_roles = [SysUserRole(user_id=new_user.user_id, role_id=role_id) for role_id in roleIds]
- db.add_all(new_roles)
- # 创建新的用户角色关联
- new_posts = [SysUserPost(user_id=new_user.user_id, post_id=post_id) for post_id in postIds]
- db.add_all(new_posts)
- db.commit()
- return {"code": 200, "msg": "创建成功", "data": None}
- except Exception as e:
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
- @router.put('')
- async def userupdate(
- db: Session = Depends(get_db),
- user_id: int = Depends(valid_access_token),
- body = Depends(remove_xss_json)
- ):
- try:
- user_id_1 = body['userId']
- user = user_id_get_user_info(db,user_id_1)
- nickName = body['nickName']
- deptId = body['deptId']
- phonenumber = body['phonenumber']
- email = body['email']
- sex = body['sex']
- roleIds = body['roleIds']
- postIds = body['postIds']
- remark = body['remark']
- user.nick_name=nickName
- user.dept_id=deptId
- user.phonenumber=phonenumber
- user.email=email
- user.sex=sex
- user.remark=remark
- # 解析角色ID列表
- # role_ids_list = [int(id) for id in roleIds.split(',')]
- # 清除当前用户的所有角色关联
- db.query(SysUserRole).filter(SysUserRole.user_id == user_id_1).delete()
- # 创建新的用户角色关联
- new_roles = [SysUserRole(user_id=user_id_1, role_id=role_id) for role_id in roleIds]
- db.add_all(new_roles)
- # 解析角色ID列表
- # post_ids_list = [int(id) for id in postIds.split(',')]
- # 清除当前用户的所有角色关联
- db.query(SysUserPost).filter(SysUserPost.user_id == user_id_1).delete()
- # 创建新的用户角色关联
- new_posts = [SysUserPost(user_id=user_id_1, post_id=post_id) for post_id in postIds]
- db.add_all(new_posts)
- db.commit()
- return {"code": 200, "msg": "更新成功", "data": None}
- except Exception as e:
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
- @router.put('/resetPwd')
- async def userupdate(
- db: Session = Depends(get_db),
- user_id: int = Depends(valid_access_token),
- body = Depends(remove_xss_json)
- ):
- try:
- user_id_1 = body['userId']
- user = user_id_get_user_info(db,user_id_1)
- password = body['password']
- user.password='$2a$10$b8yUzN0C71sbz.PhNOCgJe.Tu1yWC3RNrTyjSQ8p1W0.aaUXUJ.Ne'
- user.update_by=user_id
- db.commit()
- return {"code": 200, "msg": "更新成功", "data": None}
- except Exception as e:
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
- @router.get('/getInfo')
- async def getInfo(request: Request,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)):
- try:
- # 用户信息
- info = db.query(SysUser).filter(SysUser.user_id == user_id).first()
- if info is None:
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None}
- # 部门信息
- dept_name = ""
- dept_info = db.query(SysDept).filter(SysDept.dept_id == info.dept_id).first()
- if dept_name is not None:
- dept_name = dept_info.dept_name
- # 角色信息
- roles = []
- role_ids = db.query(SysUserRole).filter(SysUserRole.user_id == user_id).all()
- for role in role_ids:
- role_info = db.query(SysRole).filter(SysRole.role_id == role.role_id).first()
- roles.append(
- {
- "roleId": role_info.role_id,
- "roleName": role_info.role_name,
- "roleKey": role_info.role_key,
- "roleSort": role_info.role_sort,
- "dataScope": role_info.data_scope,
- "menuCheckStrictly": role_info.menu_check_strictly,
- "deptCheckStrictly": role_info.dept_check_strictly,
- "status": role_info.status,
- "remark": role_info.remark,
- "createTime": get_datetime_str(role_info.create_time),
- "flag": False,
- "superAdmin": True
- }
- )
- role_keys = [
- n['roleKey']
- for n in roles
- ]
- return {
- "code": 200,
- "msg": "操作成功",
- "data": {
- "user": {
- "userId": info.user_id,
- "tenantId": info.tenant_id,
- "deptId": info.dept_id,
- "userName": info.user_name,
- "nickName": info.nick_name,
- "userType": info.user_type,
- "email": info.email,
- "phonenumber": info.phonenumber,
- "sex": info.sex,
- "avatar": info.avatar,
- "status": info.status,
- "loginIp": info.login_ip,
- "loginDate": get_datetime_str(info.login_date),
- "remark": info.remark,
- "createTime": get_datetime_str(info.create_time),
- "deptName": dept_name,
- "roles": roles,
- "roleIds": None,
- "postIds": None,
- "roleId": None
- },
- "permissions": [
- "*:*:*"
- ],
- "roles": role_keys
- }
- }
- except Exception as e:
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
- @router.get('/deptTree')
- async def deptTree(request: Request,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)):
- def build_dept_tree(depts, parent_dept):
- dept_tree = []
- for dept_info in depts:
- dept = {
- "id": dept_info.dept_id,
- "label": dept_info.dept_name,
- "parentId": dept_info.parent_id,
- "weight": dept_info.order_num
- }
- # print(dept_info.dept_id)
- children = parent_id_get_dept_info(db, dept_info.dept_id)
- if len(children) > 0:
- children_depts = build_dept_tree(children, dept)
- dept["children"] = children_depts
- dept_tree.append(dept)
- return dept_tree
- result = build_dept_tree(parent_id_get_dept_info(db, 0),None)
- return {
- "code": 200,
- "msg": "操作成功",
- "data": result
- }
- # def get_query_params(params: dict):
- # return params
- # def get_time_params(params: dict = Depends(get_query_params)):
- # try:
- # begin_time = params.get("beginTime")
- # end_time = params.get("endTime")
- # return begin_time, end_time
- # except:
- # return None,None
- @router.get('/list')
- async def userlist( deptId: int = Query(None ,description='部门id'),
- userName: str = Query(None, description='用户名'),
- status: int = Query(None, description='用户状态'),
- phonenumber : str = Query(None, description='手机号'),
- # params:dict = Query(None, description='创建日期'),
- # beginTime: str = Depends(get_time_params),
- # endTime: str = Depends(get_time_params),
- page: int = Query(1, gt=0, description='页码'),
- pageSize: int = Query(10, gt=0, description='每页条目数量'),
- db: Session = Depends(get_db),
- user_id: int = Depends(valid_access_token)):
- try:
- # 构建查询
- query = db.query(SysUser)
- query = query.filter(SysUser.del_flag != '2')
- # 应用查询条件
- # if beginTime:
- # query = query.filter(SysUser.create_time >= beginTime)
- # if endTime:
- # query = query.filter(SysUser.create_time <= endTime)
- # if params:
- # query = query.filter(SysUser.create_time >= params.get("beginTime"))
- # query = query.filter(SysUser.create_time <= params.get("endTime"))
- if userName:
- query =query.filter(SysUser.user_name.like(f'%{userName}%'))
- if status:
- query =query.filter(SysUser.status==status)
- if phonenumber:
- query =query.filter(SysUser.phonenumber.like(f'%{phonenumber}%'))
- def get_dept_chli(dept_list : list,parent_id : int):
- depts = parent_id_get_dept_info(db,parent_id)
- if depts:
- for dept in depts:
- dept_list.append(dept.dept_id)
- get_dept_chli(dept_list, dept.dept_id)
- return dept_list
- if deptId:
- query = query.filter(SysUser.dept_id.in_(get_dept_chli([],deptId)))
- # 计算总条目数
- total_items = query.count()
- # 排序
- query = query.order_by(SysUser.create_time.desc())
- # 执行分页查询
- users = query.offset((page - 1) * pageSize).limit(pageSize).all()
- # 将查询结果转换为列表形式的字典
- user_list = []
- for user in users:
- roleIds = user_id_get_user_roleIds(db, user.user_id)
- user_roles = role_id_list_get_user_role(db,roleIds)
- user_info = {
- "userId": user.user_id,
- "tenantId": user.tenant_id,
- "deptId": user.dept_id,
- "userName": user.user_name,
- "nickName": user.nick_name,
- "userType": user.user_type,
- "email": user.email,
- "phonenumber": user.phonenumber,
- "sex": user.sex,
- "avatar": user.avatar,
- "status": user.status,
- "loginIp": user.login_ip,
- "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '',
- "remark": user.remark,
- "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '',
- "deptName": user.dept_name,
- "roles": user_roles,
- "roleIds": None,
- "postIds": None,
- "roleId": None
- }
- user_list.append(user_info)
- # 返回结果
- return {
- "code": 200,
- "msg": "成功用户列表",
- "rows": user_list,
- "total": total_items,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (total_items + pageSize - 1) // pageSize
- }
- except Exception as e:
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
- @router.get('/list/dept/{dept_id}')
- async def get_dept_user_list(
- # request: Request,
- dept_id: int,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- query = db.query(SysUser)
- query = query.filter(SysUser.del_flag != '2')
- if dept_id:
- query = query.filter(SysUser.dept_id == dept_id)
- user_list = query.all()
- # 将模型实例转换为字典
- user_list_dict = [{
- "userId": user.user_id,
- "tenantId": user.tenant_id,
- "deptId": user.dept_id,
- "userName": user.user_name,
- "nickName": user.nick_name,
- "userType": user.user_type,
- "email": user.email,
- "phonenumber": user.phonenumber,
- "sex": user.sex,
- "avatar": user.avatar,
- "status": user.status,
- "loginIp": user.login_ip,
- "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '',
- "remark": user.remark,
- "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '',
- "deptName": user.dept_name,
- "roles": None,
- "roleIds": None,
- "postIds": None,
- "roleId": None
- } for user in user_list]
- return {
- "code": 200,
- "data": user_list_dict,
- "msg": "操作成功"
- }
- @router.put('/changeStatus')
- async def change_user_status(
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- user_id=Depends(valid_access_token)
- ):
- try:
- userId = body['userId']
- status = body['status']
- query = db.query(SysUser)
- query = query.filter(SysUser.del_flag != '2')
- query = query.filter(SysUser.user_id == userId)
- user = query.first()
- user.status= status
- user.update_by=user_id
- db.commit()
- return {
- "code": 200,
- "msg": "操作成功"
- }
- except Exception as e:
- # 处理异常
- db.rollback()
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=str(e))
- @router.delete('/{user_id1}')
- async def delete_dept(
- user_id1:int,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- user = user_id_get_user_info(db,user_id1)
- # 将模型实例转换为字典
- print(user)
- user.del_flag = '2'
- user.update_by = user_id
- db.commit()
- return {
- "code": 200,
- "data": None,
- "msg": "操作成功"
- }
- @router.get('/videoPoints')
- async def get_user_video_points(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)):
- try:
- videoIds = user_id_get_user_videoIds(db,user_id)
- video_list = [i.video_code_int for i in videoIds]
- query = db.query(TpVideoLog)
- query = query.filter(TpVideoLog.video_code_int.in_(video_list))
- video_info = query.all()
- return {
- "code": 200,
- "msg": "成功",
- "data": {"videoIds":[i.video_code_int for i in videoIds],
- "videoInfos":[{"name":info.name,"video_code_int":info.video_code_int} for info in video_info]}
- }
- except Exception as e:
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
- @router.put('/videoPoints')
- async def get_user_video_points(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)):
- try:
- query = db.query(SysUserVideo)
- query = query.filter(SysUserVideo.user_id == user_id)
- query.delete(synchronize_session=False)
- db.commit()
- for video in body:
- new_user_video = SysUserVideo(
- user_id=user_id,
- video_code_int = video
- )
- db.add(new_user_video)
- db.commit()
- return {
- "code": 200,
- "msg": "成功",
- "data":None
- }
- except Exception as e:
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
- @router.put('/authRole')
- async def authRoleUpdate( userId: int = Query(None ,description='用户id'),
- roleIds: str = Query(None ,description='权限id'),
- db: Session = Depends(get_db),
- user_id: int = Depends(valid_access_token)):
- try:
- # 解析角色ID列表
- role_ids_list = [int(id) for id in roleIds.split(',')]
- # 清除当前用户的所有角色关联
- db.query(SysUserRole).filter(SysUserRole.user_id == userId).delete()
- # 创建新的用户角色关联
- new_roles = [SysUserRole(user_id=userId, role_id=role_id) for role_id in role_ids_list]
- db.add_all(new_roles)
- db.commit()
- return {"code": 200, "msg": "更新成功", "data": None}
- except Exception as e:
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
- @router.get('/authRole/{user_id_1}')
- @router.get('/{user_id_1}')
- async def userlist( user_id_1: int = Query(None ,description='用户id'),
- db: Session = Depends(get_db),
- user_id: int = Depends(valid_access_token)):
- try:
- user = user_id_get_user_info(db,user_id_1)
- # 将查询结果转换为列表形式的字典
- roleIds = user_id_get_user_roleIds(db, user_id_1)
- user_roles = role_id_list_get_user_role(db,roleIds)
- user_list = {
- "userId": user.user_id,
- "tenantId": user.tenant_id,
- "deptId": user.dept_id,
- "userName": user.user_name,
- "nickName": user.nick_name,
- "userType": user.user_type,
- "email": user.email,
- "phonenumber": user.phonenumber,
- "sex": user.sex,
- "avatar": user.avatar,
- "status": user.status,
- "loginIp": user.login_ip,
- "loginDate": user.login_date.strftime('%Y-%m-%d %H:%M:%S') if user.login_date else '',
- "remark": user.remark,
- "createTime": user.create_time.strftime('%Y-%m-%d %H:%M:%S') if user.create_time else '',
- "deptName": user.dept_name,
- "roles": user_roles,
- "roleIds": roleIds,
- "postIds": None,
- "roleId": None
- }
- roles = get_role(db,roleIds)
- # 返回结果
- return {
- "code": 200,
- "msg": "成功用户列表",
- "data": {
- "user":user_list,
- "roleIds":roleIds,
- "roles":roles,
- "postIds":[],
- "posts":[]
- }
- }
- except Exception as e:
- traceback.print_exc()
- raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}")
|