#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from common.security import valid_access_token from pydantic import BaseModel from database import get_db from sqlalchemy import text from sqlalchemy.orm import Session from typing import List from models import * from utils import * from utils.ry_system_util import * import json from sqlalchemy.sql import func from common.enc import mpfun, sys_menu_data, sys_menu_layer_data from common.auth_user import * from common.db import db_czrz import traceback from exceptions import HmacException router = APIRouter() # 定义 Meta 和 Child 模型用于嵌套结构 class Meta(BaseModel): title: str icon: str noCache: bool link: str = None class Child(BaseModel): name: str path: str hidden: bool component: str meta: Meta children: List['Child'] = [] class Router(BaseModel): name: str path: str hidden: bool redirect: str = 'noRedirect' component: str alwaysShow: bool = True meta: Meta children: List[Child] = [] class Router_frame(BaseModel): component:str hidden: bool meta:Meta name: str path: str redirect: str = 'noRedirect' children: List[Child] = [] alwaysShow: bool = True # checkedKeys roleMenuTreeselect @router.get('/roleMenuTreeselect/{roleId}') async def getmunutreeselect(request: Request,roleId:int,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): def build_dept_tree(menus, parent_dept): menu_tree = [] for menu_info in menus: menu = { "id": menu_info.menu_id, "label": menu_info.menu_name, "parentId": menu_info.parent_id, "weight": menu_info.order_num } # print(dept_info.dept_id) children = parent_id_get_menu_info(db, menu_info.menu_id) if len(children) > 0: children_depts = build_dept_tree(children, menu) menu["children"] = children_depts menu_tree.append(menu) return menu_tree checkedKeys = role_id_get_role_menus(db,roleId) menus = build_dept_tree(parent_id_get_menu_info(db, 0), None) return { "code": 200, "msg": "操作成功", "data": {"menus":menus,"checkedKeys":checkedKeys} } @router.get('/treeselect') async def getmunutreeselect(request: Request,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): def build_dept_tree(menus, parent_dept): menu_tree = [] for menu_info in menus: menu = { "id": menu_info.menu_id, "label": menu_info.menu_name, "parentId": menu_info.parent_id, "weight": menu_info.order_num } # print(dept_info.dept_id) children = parent_id_get_menu_info(db, menu_info.menu_id) if len(children) > 0: children_depts = build_dept_tree(children, menu) menu["children"] = children_depts menu_tree.append(menu) return menu_tree result = build_dept_tree(parent_id_get_menu_info(db, 0), None) return { "code": 200, "msg": "操作成功", "data": result } @router.get('/getRouters') async def getRouters(request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): try: # 查询数据库中的所有菜单项,根据 parent_id 排序以构建树形结构 query = db.query(SysMenu) query = query.filter_by(parent_id=0) query = query.filter(SysMenu.menu_id!=11655) query = query.filter(SysMenu.del_flag != '2') query = query.order_by(SysMenu.order_num.asc()) menus = query.all() # 顶级菜单 # 递归函数用于构建树形结构 def build_menu_tree(menus, parent_menu): menu_tree = [] # 初始化一个列表来存储菜单树结构 for menu in menus: # 是否我所在角色的菜单 if is_role_menu(menu.menu_id, user_id, db) != True: continue if sys_menu_data.sign_valid_row(menu) == False: raise HmacException(500, "系统菜单表验证异常,已被非法篡改") component = 'Layout' if menu.menu_type=='M' and parent_menu: component = 'ParentView' if menu.component: component= menu.component # print(menu.menu_name) menu_data = { "component":component, "hidden":menu.visible == '1', "name":menu.path, "path":'/'+menu.path, # "redirect":'noRedirect', # "alwaysShow":True, "meta":{ "title":menu.menu_name, 'icon' : menu.icon, 'noCache' : menu.is_cache == '1', } } if menu.is_frame==0: menu_data['meta']['link']=menu.path # 如果菜单有子菜单,则递归构建子菜单 if menu.menu_type == 'M': # 假设 'M' 表示目录类型 # query = db.query(SysMenu) # query = query.filter_by(parent_id=menu.menu_id) # query = query.filter(SysMenu.del_flag != '2') # children_menus = db.query(SysMenu).filter_by(parent_id=menu.menu_id).all() # children_menus = query.all() menu_data['redirect'] ='noRedirect' menu_data['alwaysShow'] =True children_menus = parent_id_get_menu_info(db,menu.menu_id) if len(children_menus)>0: menu_data['children'] = build_menu_tree(children_menus, menu) if menu.parent_id==0: pass else: menu_data['path'] = menu_data['path'][1:] menu_tree.append(menu_data) # 将当前菜单数据添加到菜单树列表 return menu_tree # 是否我所在角色的菜单 def is_role_menu(menu_id: int, user_id: int, db: Session): # 是否超级管理员 sql = text("select * from sys_user_role where user_id = :user_id and role_id = 1") rows = db.execute(sql, {"user_id": user_id}).fetchall() if len(rows) > 0: return True sql = text("select menu_id from sys_role_menu inner join sys_user_role on sys_role_menu.role_id = sys_user_role.role_id where sys_role_menu.menu_id = :menu_id and sys_user_role.user_id = :user_id") rows = db.execute(sql, {"menu_id": menu_id, "user_id": user_id}).fetchall() return len(rows) > 0 # 构建顶级菜单的树形结构 routers = build_menu_tree(menus, None) # routers_dict = [router.dict() for router in routers] # routers_dict = [] # for router in routers: # router_info = router.dict() # if len(router_info['children'])==0: # del router_info['children'] # del router_info['redirect'] # del router_info['alwaysShow'] # router_info['path'] = router_info['path'] #[1:] # # routers_dict.append(router_info) # 返回构建好的路由数据 return { "code": 200, "msg": "操作成功", "data": routers #[router.dict() for router in routers] # 如果没有顶级菜单,返回空列表 } except HmacException as e: return { "code": e.code, "msg": e.msg } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") def get_video_routers(db:Session): from utils.video_util import tag_get_video_tag_list query = db.query(SysDictData) query = query.filter(SysDictData.dict_type == 'video_type') query = query.filter(SysDictData.del_flag != '2') query = query.order_by(SysDictData.dict_sort) # dict_data = db.query(SysDictData).filter_by(dict_type==dict_type and del_flag != '2').all() dict_data = query.all() # 将模型转换为字典 dict_data_list = [{ "component": '4', "isVideo":False, "hidden": True, "name": '4', "path": '2', # "redirect":'noRedirect', # "alwaysShow":True, "meta": { "title": '附近视频', 'icon': 'icon6', 'noCache': False, } }] for d in dict_data: li = tag_get_video_tag_list(db, d.dict_value) if len(li) > 0: dict_data_list.append({ "component": d.dict_value or 'Layout', "isVideo":True, "hidden": True, "name": d.dict_value, "path": '2', # "redirect":'noRedirect', # "alwaysShow":True, "meta": { "title": d.dict_label, 'icon': 'icon6', 'noCache': False, } }) return dict_data_list @router.get('/qydt/getRouters') async def getRouters(request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): try: # 查询数据库中的所有菜单项,根据 parent_id 排序以构建树形结构 query = db.query(SysMenu) query = query.filter_by(parent_id=0) query = query.filter(SysMenu.menu_id==11655) query = query.filter(SysMenu.del_flag != '2') query = query.order_by(SysMenu.order_num.asc()) menus =query.all() # 顶级菜单 # 递归函数用于构建树形结构 def build_menu_tree(menus, parent_menu): menu_tree = [] # 初始化一个列表来存储菜单树结构 for menu in menus: # if sys_menu_layer_data.sign_valid_row(menu) == False: # raise HmacException(500, "系统图层菜单表验证异常,已被非法篡改") menu_data = { "component": menu.component or 'Layout', "isVideo":False, "hidden": menu.visible == '1', "name": menu.path, "path": '/' + menu.path, # "redirect":'noRedirect', # "alwaysShow":True, "meta": { "title": menu.menu_name, 'icon': menu.icon, 'noCache': menu.is_cache == '1', } } if menu.is_frame == 0: menu_data['meta']['link'] = menu.path # 如果菜单有子菜单,则递归构建子菜单 if menu.menu_type == 'M': # 假设 'M' 表示目录类型 menu_data['redirect'] = 'noRedirect' menu_data['alwaysShow'] = True if menu.menu_id==11684: menu_data['children']= get_video_routers(db) menu_tree.append(menu_data) continue query = db.query(SysMenu) query = query.filter_by(parent_id=menu.menu_id) query = query.filter(SysMenu.del_flag != '2') query = query.order_by(SysMenu.order_num.asc()) # children_menus = db.query(SysMenu).filter_by(parent_id=menu.menu_id).all() children_menus = query.all() # print( menu.menu_id) # children_menus = parent_id_get_menu_info(db, menu.menu_id) if len(children_menus) > 0: menu_data['children'] = build_menu_tree(children_menus, menu) elif menu.parent_id == 0: pass else: menu_data['path'] = menu_data['path'][1:] menu_tree.append(menu_data) # 将当前菜单数据添加到菜单树列表 return menu_tree # 构建顶级菜单的树形结构 routers = build_menu_tree(menus, None) # routers_dict = [router.dict() for router in routers] # routers_dict = [] # for router in routers: # router_info = router.dict() # if len(router_info['children'])==0: # del router_info['children'] # del router_info['redirect'] # del router_info['alwaysShow'] # router_info['path'] = router_info['path'] #[1:] # # routers_dict.append(router_info) # 返回构建好的路由数据 return { "code": 200, "msg": "操作成功", "data": routers # [router.dict() for router in routers] # 如果没有顶级菜单,返回空列表 } except HmacException as e: return { "code": e.code, "msg": e.msg } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/list') async def get_list( # request: Request, menuName: str = Query(None, max_length=100), status: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysMenu) query = query.filter(SysMenu.del_flag != '2') if menuName: query = query.filter(SysMenu.menu_name.like(f'%{menuName}%')) if status: query = query.filter(SysMenu.status.like(f'%{status}%')) # 应用查询 query = query.order_by(SysMenu.order_num.asc()) menu_list = query.all() # 将模型实例转换为字典 menu_list_dict = [] def menu_parent_life(menu_id): query = db.query(SysMenu) query = query.filter(SysMenu.menu_id==menu_id) data = query.filter(SysMenu.del_flag != '2').first() if data is None: return False if data.parent_id==0 or data.parent_id=='0': return True else: return menu_parent_life(data.parent_id) for menu in menu_list: if menu_parent_life(menu.menu_id): menu_list_dict.append({ "menuId": menu.menu_id, "menuName": menu.menu_name, "parentId": menu.parent_id, "orderNum": menu.order_num, "path": menu.path, "component": menu.component, "queryParam": menu.query_param, "isFrame": str(menu.is_frame), "isCache": str(menu.is_cache), "menuType": menu.menu_type, "visible": menu.visible, "status": menu.status, "perms": menu.perms, "icon": menu.icon, "createDept": menu.create_dept, "remark": menu.remark, "createTime": menu.create_time.strftime('%Y-%m-%d %H:%M:%S') if menu.create_time else '', "children": [] # 递归调用以获取子菜单 }) # 构建分页响应 # pagination_info = { # "total": total_count, # "page_num": page_num, # "page_size": page_size, # "total_pages": (total_count + page_size - 1) // page_size # 计算总页数 # } return { "code": 200, "data": menu_list_dict, # 'pages': page_num, # 总页数 # 'currentPage': page_num, # 当前页数 # # 'current':current, # # 'total' : total, # 'total': total_count, # 总数据量 # # 'size':size, # 'pageSize': page_size, # 页码 # { # "items": menu_list_dict, # "pagination": pagination_info # }, "msg": "操作成功" } @router.get('/layer_list') async def get_list( # request: Request, menuName: str = Query(None, max_length=100), status: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysMenuLayer) query = query.filter(SysMenuLayer.del_flag != '2') if menuName: query = query.filter(SysMenuLayer.menu_name.like(f'%{menuName}%')) if status: query = query.filter(SysMenuLayer.status.like(f'%{status}%')) # 应用查询 # menu_list = db.query(SysMenu).filter( # (SysMenu.menu_name.like(f'%{menu_name}%')) , # (SysMenu.status.like(f'%{status}%')) # ).all() menu_list = query.all() # 将模型实例转换为字典 menu_list_dict = [{ "menuId": menu.menu_id, "menuName": menu.menu_name, "parentId": menu.parent_id, "orderNum": menu.order_num, "path": menu.path, "component": menu.component, "queryParam": menu.query_param, "isFrame": str(menu.is_frame), "isCache": str(menu.is_cache), "menuType": menu.menu_type, "visible": menu.visible, "status": menu.status, "perms": menu.perms, "icon": menu.icon, "createDept": menu.create_dept, "remark": menu.remark, "createTime": menu.create_time.strftime('%Y-%m-%d %H:%M:%S') if menu.create_time else '', "children": [] # 递归调用以获取子菜单 } for menu in menu_list] # 构建分页响应 # pagination_info = { # "total": total_count, # "page_num": page_num, # "page_size": page_size, # "total_pages": (total_count + page_size - 1) // page_size # 计算总页数 # } return { "code": 200, "data": menu_list_dict, # 'pages': page_num, # 总页数 # 'currentPage': page_num, # 当前页数 # # 'current':current, # # 'total' : total, # 'total': total_count, # 总数据量 # # 'size':size, # 'pageSize': page_size, # 页码 # { # "items": menu_list_dict, # "pagination": pagination_info # }, "msg": "操作成功" } @router.get('/{menuid}') async def get_list( # request: Request, menuid: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysMenu) query = query.filter(SysMenu.del_flag != '2') if menuid: query = query.filter(SysMenu.menu_id.like(f'{menuid}')) menu= query.first() # 将模型实例转换为字典 menu_list_dict = { "menuId": menu.menu_id, "menuName": menu.menu_name, "parentId": menu.parent_id, "orderNum": menu.order_num, "path": menu.path, "component": menu.component, "queryParam": menu.query_param, "isFrame": str(menu.is_frame), "isCache": str(menu.is_cache), "menuType": menu.menu_type, "visible": menu.visible, "status": menu.status, "perms": menu.perms, "icon": menu.icon, "createDept": menu.create_dept, "remark": menu.remark, "createTime": menu.create_time.strftime('%Y-%m-%d %H:%M:%S') if menu.create_time else '', "children": [] # 递归调用以获取子菜单 } return { "code": 200, "data": menu_list_dict, "msg": "操作成功" } class SysMuneCreateForm(BaseModel): component:str = None icon : str isCache: str isFrame:str menuName: str menuType:str orderNum:int parentId:int path:str perms:str=None queryParam:str=None status:str visible:str @router.post('/create') async def create( request: Request, form_data: SysMuneCreateForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), auth_user: AuthUser = Depends(find_auth_user), user_id = Depends(valid_access_token) ): try: new_menu = SysMenu( menu_name = form_data.menuName, parent_id = form_data.parentId, order_num = form_data.orderNum, path = form_data.path, is_frame = int(form_data.isFrame), is_cache = int(form_data.isCache), menu_type = form_data.menuType, visible = form_data.visible, status = form_data.status, icon = form_data.icon, component= form_data.component, perms=form_data.perms, query_param=form_data.queryParam, create_by = user_id ) db.add(new_menu) db.commit() db.refresh(new_menu) sys_menu_data.sign_table() db_czrz.log(db, auth_user, "系统管理", f"后台管理新建菜单【{form_data.menuName}】成功", request.client.host) return { "code": 200, "data": None, "msg": "操作成功" } except Exception as e: traceback.print_exc() db.rollback() raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) class SysMuneUpdateForm(BaseModel): menuId : str component:str = None icon : str = None isCache: str = None isFrame:str = None menuName: str = None menuType:str = None orderNum:int = None parentId:int = None path:str = None perms:str=None queryParam:str=None status:str = None visible:str = None @router.put('') async def update( request: Request, # form_data: SysMuneUpdateForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), auth_user: AuthUser = Depends(find_auth_user), user_id = Depends(valid_access_token) ): try: query = db.query(SysMenu) query = query.filter(SysMenu.menu_id == body['menuId']) query = query.filter(SysMenu.del_flag != '2') menu = query.first() if not menu: detail = "菜单不存在" raise HTTPException(status_code=404, detail="菜单不存在") # 更新字段,排除主键和不可更新的字段 if 'component' in body: menu.component=body['component'] if 'icon' in body: menu.icon=body['icon'] if 'isCache' in body: menu.is_cache=body['isCache'] if 'isFrame' in body: menu.is_frame=body['isFrame'] if 'menuName' in body: menu.menu_name=body['menuName'] if 'menuType' in body: menu.menu_type=body['menuType'] if 'orderNum' in body: menu.order_num=body['orderNum'] if 'parentId' in body: menu.parent_id=body['parentId'] if 'path' in body: menu.path=body['path'] if 'perms' in body: menu.perms=body['perms'] if 'queryParam' in body: menu.query_param=body['queryParam'] if 'status' in body: menu.status=body['status'] if 'visible' in body: menu.visible=body['visible'] if user_id: menu.update_by = user_id menu.update_time = datetime.now() menu.sign = sys_menu_data.get_sign_hmac(menu) # for field, value in menu_data.items(): # if field != 'menu_id' and field in menu_to_update.__dict__: # setattr(menu_to_update, field, value) # # db.add(menu_to_update) db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理更新菜单【{body['menuName']}】成功", request.client.host) return { "code": 200, "msg": "菜单更新成功" } except Exception as e: # db.rollback() traceback.print_exc() if str(e)=='': e = detail raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.delete('/{menu_id}') async def delete( request: Request, menu_id: int, db: Session = Depends(get_db), body = Depends(remove_xss_json), auth_user: AuthUser = Depends(find_auth_user), user_id = Depends(valid_access_token) ): try: query = db.query(SysMenu) query = query.filter(SysMenu.menu_id == menu_id) query = query.filter(SysMenu.del_flag != '2') menu_to_delete =query.first() if not menu_to_delete: detail = "菜单不存在" raise HTTPException(status_code=404, detail="菜单不存在") menu_to_delete.del_flag='2' menu_to_delete.update_by = user_id menu_to_delete.update_time = datetime.now() menu_to_delete.sign = sys_menu_data.get_sign_hmac(menu_to_delete) # db.delete(menu_to_delete) db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理删除菜单【{menu_to_delete.menu_name}】成功", request.client.host) return { "code": 200, "msg": "菜单删除成功" } except Exception as e: traceback.print_exc() db.rollback() if str(e)=='': e = detail raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))