#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from fastapi.responses import JSONResponse from common.security import valid_access_token from pydantic import BaseModel from database import get_db from sqlalchemy.orm import Session from typing import List from models import * from utils import * from utils.ry_system_util import * import json from sqlalchemy.sql import func from common.auth_user import * from common.enc import mpfun, sys_menu_data, sys_menu_layer_data import traceback from common.db import db_czrz from exceptions import HmacException router = APIRouter() # 定义 Meta 和 Child 模型用于嵌套结构 class Meta(BaseModel): title: str icon: str noCache: bool link: str = None class Child(BaseModel): name: str path: str hidden: bool component: str meta: Meta children: List['Child'] = [] class Router(BaseModel): name: str path: str hidden: bool redirect: str = 'noRedirect' component: str alwaysShow: bool = True meta: Meta children: List[Child] = [] class Router_frame(BaseModel): component:str hidden: bool meta:Meta name: str path: str redirect: str = 'noRedirect' children: List[Child] = [] alwaysShow: bool = True # checkedKeys roleMenuTreeselect @router.get('/getRouters') async def getRouters(request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): try: # 查询数据库中的所有菜单项,根据 parent_id 排序以构建树形结构 query = db.query(SysMenuLayer) query = query.filter_by(parent_id=0) query = query.filter(SysMenuLayer.del_flag != '2') query = query.filter(SysMenuLayer.status != '1') query = query.order_by(SysMenuLayer.order_num) menus =query.all() # 顶级菜单 # 递归函数用于构建树形结构 def build_menu_tree(menus, parent_menu): menu_tree = [] # 初始化一个列表来存储菜单树结构 for menu in menus: if menu.is_frame==0: menu_data = Router_frame( component=menu.component or 'Layout', hidden=menu.layer_visible == '1', name=menu.path, path='/'+menu.path, meta = Meta( title=menu.menu_name, icon=menu.icon, noCache=menu.is_cache == '1', link = menu.path ), children=[] ) else: menu_data = Router( name=menu.path,#menu.menu_name, path='/'+menu.path, hidden=menu.layer_visible == '1', component=menu.component or 'Layout', meta=Meta( title=menu.menu_name, icon=menu.icon, noCache=menu.is_cache == '1' ), children=[] # 初始化 children 列表,即使没有子菜单 ) # 如果菜单有子菜单,则递归构建子菜单 if menu.menu_type == 'D' or menu.menu_type == 'Z': # 假设 'M' 表示目录类型 query = db.query(SysMenuLayer) query = query.filter_by(parent_id=menu.menu_id) query = query.filter(SysMenuLayer.del_flag != '2') query = query.filter(SysMenuLayer.status != '1') query = query.order_by(SysMenuLayer.order_num) # children_menus = db.query(SysMenu).filter_by(parent_id=menu.menu_id).all() children_menus = query.all() menu_data.children = build_menu_tree(children_menus, menu) else: del menu_data.children #没有子菜单,删除children 列表 del menu_data.redirect del menu_data.alwaysShow menu_data.path = menu_data.path[1:] menu_tree.append(menu_data) # 将当前菜单数据添加到菜单树列表 return menu_tree # 构建顶级菜单的树形结构 routers = build_menu_tree(menus, None) # routers_dict = [router.dict() for router in routers] routers_dict = [] for router in routers: router_info = router.dict() print(router_info) if len(router_info['children'])==0: del router_info['children'] del router_info['redirect'] del router_info['alwaysShow'] router_info['path'] = router_info['path'][1:] routers_dict.append(router_info) # 返回构建好的路由数据 return { "code": 200, "msg": "操作成功", "data": routers_dict #[router.dict() for router in routers] # 如果没有顶级菜单,返回空列表 } except Exception as e: # 处理异常,返回错误信息 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get('/list') async def get_list( # request: Request, menuName: str = Query(None, max_length=100), status: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysMenuLayer) query = query.filter(SysMenuLayer.del_flag != '2') if menuName: query = query.filter(SysMenuLayer.menu_name.like(f'%{menuName}%')) query = query.filter(SysMenuLayer.menu_type=='T') if status: query = query.filter(SysMenuLayer.status==status) query = query.order_by(SysMenuLayer.order_num) # 应用查询 # menu_list = db.query(SysMenu).filter( # (SysMenu.menu_name.like(f'%{menu_name}%')) , # (SysMenu.status.like(f'%{status}%')) # ).all() menu_list = query.all() # 将模型实例转换为字典 menu_list_dict = [{ "menuId": menu.menu_id, "menuName": menu.menu_name, "parentId": menu.parent_id, "orderNum": menu.order_num, "path": menu.path, "component": menu.component, # "queryParam": menu.query_param, # "isFrame": str(menu.is_frame), # "isCache": str(menu.is_cache), "menuType": menu.menu_type, # "layerVisible": menu.layer_visible, "status": menu.status, # "perms": menu.perms, "icon": menu.icon, # "createDept": menu.create_dept, "remark": menu.remark, "createTime": menu.create_time.strftime('%Y-%m-%d %H:%M:%S') if menu.create_time else '', "children": [] , # 递归调用以获取子菜单 # "layer_template":menu.layer_template } for menu in menu_list] return { "code": 200, "data": menu_list_dict, "msg": "操作成功" } @router.get('/info/{menuid}') async def get_list( # request: Request, menuid: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysMenuLayer) query = query.filter(SysMenuLayer.del_flag != '2') if menuid: query = query.filter(SysMenuLayer.menu_id==menuid) menu= query.first() # 将模型实例转换为字典 menu_list_dict = { "menuId": menu.menu_id, "menuName": menu.menu_name, "parentId": menu.parent_id, "orderNum": menu.order_num, "path": menu.path, "component": menu.component, # "queryParam": menu.query_param, # "isFrame": str(menu.is_frame), # "isCache": str(menu.is_cache), "menuType": menu.menu_type, # "layerVisible": menu.layer_visible, "status": menu.status, # "perms": menu.perms, "icon": menu.icon, # "createDept": menu.create_dept, "remark": menu.remark, "createTime": menu.create_time.strftime('%Y-%m-%d %H:%M:%S') if menu.create_time else '', "children": [], # 递归调用以获取子菜单 # "layer_template":menu.layer_template } return { "code": 200, "data": menu_list_dict, "msg": "操作成功" } @router.post('/create') async def create( request: Request, # form_data: SysMuneCreateForm, db: Session = Depends(get_db), body=Depends(remove_xss_json), auth_user: AuthUser = Depends(find_auth_user), user_id=Depends(valid_access_token) ): try: # 开始事务 # db.begin() # 创建新的菜单层,此时还不能使用new_menu.menu_id,因为它还没有被赋值 new_menu_layer = SysMenuLayer( menu_name=body['menuName'], parent_id=body['parentId'], order_num=body['orderNum'], path=body['path'], # is_frame=int(body['isFrame']), # is_cache=int(body['isCache']), menu_type=body['menuType'], # layer_visible=body['layerVisible'], status=body['status'], icon=body['icon'], component=body['component'], # perms=body['perms'], # query_param=body['queryParam'], create_by=user_id, # layer_template=body['layer_template'] ) db.add(new_menu_layer) # 提交事务 db.commit() sys_menu_data.sign_table() sys_menu_layer_data.sign_table() db_czrz.log(db, auth_user, "系统管理", f"后台管理新建地图菜单【{body['menuName']}】成功", request.client.host) return { "code": 200, "data": None,#new_menu_layer.menu_id, # 返回新创建的SysMenuLayer的ID "msg": "操作成功" } except Exception as e: traceback.print_exc() # 如果发生异常,回滚事务 db.rollback() raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.put('/update') async def update( request: Request, auth_user: AuthUser = Depends(find_auth_user), # form_data: SysMuneUpdateForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: queryLayer = db.query(SysMenuLayer) queryLayer = queryLayer.filter(SysMenuLayer.menu_id == body['menuId']) queryLayer = queryLayer.filter(SysMenuLayer.del_flag != '2') menu = queryLayer.first() if not menu: detail = "菜单不存在" raise HTTPException(status_code=404, detail="菜单不存在") # 更新字段,排除主键和不可更新的字段 if 'component' in body: menu.component = body['component'] if 'icon' in body: menu.icon = body['icon'] # if 'isCache' in body: # menu.is_cache = body['isCache'] # if 'isFrame' in body: # menu.is_frame = body['isFrame'] if 'menuName' in body: menu.menu_name = body['menuName'] if 'menuType' in body: menu.menu_type = body['menuType'] if 'orderNum' in body: menu.order_num = body['orderNum'] if 'parentId' in body: menu.parent_id = body['parentId'] if 'path' in body: menu.path = body['path'] # if 'perms' in body: # menu.perms = body['perms'] # if 'queryParam' in body: # menu.query_param = body['queryParam'] if 'status' in body: menu.status = body['status'] # if 'layer_visible' in body: # menu.layer_visible = body['layerVisible'] # if 'layer_template' in body: # menu.layer_template = body['layer_template'] menu.update_by = user_id menu.update_time = datetime.now() menu.sign = sys_menu_layer_data.get_sign_hmac(menu) db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理更新地图菜单【{body['menuName']}】成功", request.client.host) return { "code": 200, "msg": "菜单更新成功" } except Exception as e: traceback.print_exc() # db.rollback() if str(e)=='': e = detail raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.delete('/delete/{menu_id}') async def delete( request: Request, menu_id: int, db: Session = Depends(get_db), body = Depends(remove_xss_json), auth_user: AuthUser = Depends(find_auth_user), user_id = Depends(valid_access_token) ): try: queryLayer = db.query(SysMenuLayer) queryLayer = queryLayer.filter(SysMenuLayer.menu_id == menu_id) queryLayer = queryLayer.filter(SysMenuLayer.del_flag != '2') menu_to_delete = queryLayer.first() if not menu_to_delete: detail = "菜单不存在" return JSONResponse(status_code=404,content={"code":404,"msg":"该id不存在"}) menu_to_delete.update_by = user_id menu_to_delete.update_time = datetime.now() menu_to_delete.del_flag = '2' menu_to_delete.sign = sys_menu_layer_data.get_sign_hmac(menu_to_delete) db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理删除菜单【{menu_to_delete.menu_name}】成功", request.client.host) return { "code": 200, "msg": "菜单删除成功" } except Exception as e: traceback.print_exc() db.rollback() return JSONResponse(status_code=404, content={"code": 404, "msg": str(e)})