#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from common.security import valid_access_token from pydantic import BaseModel from database import get_db from sqlalchemy.orm import Session from typing import List from models import * from utils import * import json from sqlalchemy.sql import func from common.auth_user import * router = APIRouter() # 定义 Meta 和 Child 模型用于嵌套结构 class Meta(BaseModel): title: str icon: str noCache: bool link: str = None class Child(BaseModel): name: str path: str hidden: bool component: str meta: Meta children: List['Child'] = [] class Router(BaseModel): name: str path: str hidden: bool redirect: str = 'noRedirect' component: str alwaysShow: bool = True meta: Meta children: List[Child] = [] class Router_frame(BaseModel): component:str hidden: bool meta:Meta name: str path: str redirect: str = 'noRedirect' children: List[Child] = [] alwaysShow: bool = True @router.get('/getRouters') async def getRouters(request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): try: # 查询数据库中的所有菜单项,根据 parent_id 排序以构建树形结构 query = db.query(SysMenu) query = query.filter_by(parent_id=0) query = query.filter(SysMenu.menu_id!=11655) query = query.filter(SysMenu.del_flag != '2') menus =query.all() # 顶级菜单 # 递归函数用于构建树形结构 def build_menu_tree(menus, parent_menu): menu_tree = [] # 初始化一个列表来存储菜单树结构 for menu in menus: if menu.is_frame==0: menu_data = Router_frame( component=menu.component or 'Layout', hidden=menu.visible == '1', name=menu.path, path='/'+menu.path, meta = Meta( title=menu.menu_name, icon=menu.icon, noCache=menu.is_cache == '1', link = menu.path ), children=[] ) else: menu_data = Router( name=menu.path,#menu.menu_name, path='/'+menu.path, hidden=menu.visible == '1', component=menu.component or 'Layout', meta=Meta( title=menu.menu_name, icon=menu.icon, noCache=menu.is_cache == '1' ), children=[] # 初始化 children 列表,即使没有子菜单 ) # 如果菜单有子菜单,则递归构建子菜单 if menu.menu_type == 'M': # 假设 'M' 表示目录类型 query = db.query(SysMenu) query = query.filter_by(parent_id=menu.menu_id) query = query.filter(SysMenu.del_flag != '2') # children_menus = db.query(SysMenu).filter_by(parent_id=menu.menu_id).all() children_menus = query.all() menu_data.children = build_menu_tree(children_menus, menu) else: del menu_data.children #没有子菜单,删除children 列表 del menu_data.redirect del menu_data.alwaysShow menu_data.path = menu_data.path[1:] menu_tree.append(menu_data) # 将当前菜单数据添加到菜单树列表 return menu_tree # 构建顶级菜单的树形结构 routers = build_menu_tree(menus, None) # routers_dict = [router.dict() for router in routers] routers_dict = [] for router in routers: router_info = router.dict() if len(router_info['children'])==0: del router_info['children'] del router_info['redirect'] del router_info['alwaysShow'] router_info['path'] = router_info['path'][1:] routers_dict.append(router_info) # 返回构建好的路由数据 return { "code": 200, "msg": "操作成功", "data": routers_dict #[router.dict() for router in routers] # 如果没有顶级菜单,返回空列表 } except Exception as e: # 处理异常,返回错误信息 raise HTTPException(status_code=500, detail=str(e)) @router.get('/qydt/getRouters') async def getRouters(request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): try: # 查询数据库中的所有菜单项,根据 parent_id 排序以构建树形结构 query = db.query(SysMenu) query = query.filter_by(parent_id=0) query = query.filter(SysMenu.menu_id==11655) query = query.filter(SysMenu.del_flag != '2') menus =query.all() # 顶级菜单 # 递归函数用于构建树形结构 def build_menu_tree(menus, parent_menu): menu_tree = [] # 初始化一个列表来存储菜单树结构 for menu in menus: if menu.is_frame==0: menu_data = Router_frame( component=menu.component or 'Layout', hidden=menu.visible == '1', name=menu.path, path='/'+menu.path, meta = Meta( title=menu.menu_name, icon=menu.icon, noCache=menu.is_cache == '1', link = menu.path ), children=[] ) else: menu_data = Router( name=menu.path,#menu.menu_name, path='/'+menu.path, hidden=menu.visible == '1', component=menu.component or 'Layout', meta=Meta( title=menu.menu_name, icon=menu.icon, noCache=menu.is_cache == '1' ), children=[] # 初始化 children 列表,即使没有子菜单 ) # 如果菜单有子菜单,则递归构建子菜单 if menu.menu_type == 'M': # 假设 'M' 表示目录类型 query = db.query(SysMenu) query = query.filter_by(parent_id=menu.menu_id) query = query.filter(SysMenu.del_flag != '2') # children_menus = db.query(SysMenu).filter_by(parent_id=menu.menu_id).all() children_menus = query.all() menu_data.children = build_menu_tree(children_menus, menu) else: del menu_data.children #没有子菜单,删除children 列表 del menu_data.redirect del menu_data.alwaysShow menu_data.path = menu_data.path[1:] menu_tree.append(menu_data) # 将当前菜单数据添加到菜单树列表 return menu_tree # 构建顶级菜单的树形结构 routers = build_menu_tree(menus, None) # routers_dict = [router.dict() for router in routers] routers_dict = [] for router in routers: router_info = router.dict() if len(router_info['children'])==0: del router_info['children'] del router_info['redirect'] del router_info['alwaysShow'] router_info['path'] = router_info['path'][1:] routers_dict.append(router_info) # 返回构建好的路由数据 return { "code": 200, "msg": "操作成功", "data": routers_dict #[router.dict() for router in routers] # 如果没有顶级菜单,返回空列表 } except Exception as e: # 处理异常,返回错误信息 raise HTTPException(status_code=500, detail=str(e)) @router.get('/list') async def get_list( # request: Request, menuName: str = Query(None, max_length=100), status: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysMenu) query = query.filter(SysMenu.del_flag != '2') if menuName: query = query.filter(SysMenu.menu_name.like(f'%{menuName}%')) if status: query = query.filter(SysMenu.status.like(f'%{status}%')) # 应用查询 # menu_list = db.query(SysMenu).filter( # (SysMenu.menu_name.like(f'%{menu_name}%')) , # (SysMenu.status.like(f'%{status}%')) # ).all() menu_list = query.all() # 将模型实例转换为字典 menu_list_dict = [{ "menuId": menu.menu_id, "menuName": menu.menu_name, "parentId": menu.parent_id, "orderNum": menu.order_num, "path": menu.path, "component": menu.component, "queryParam": menu.query_param, "isFrame": str(menu.is_frame), "isCache": str(menu.is_cache), "menuType": menu.menu_type, "visible": menu.visible, "status": menu.status, "perms": menu.perms, "icon": menu.icon, "createDept": menu.create_dept, "remark": menu.remark, "createTime": menu.create_time.strftime('%Y-%m-%d %H:%M:%S') if menu.create_time else '', "children": [] # 递归调用以获取子菜单 } for menu in menu_list] # 构建分页响应 # pagination_info = { # "total": total_count, # "page_num": page_num, # "page_size": page_size, # "total_pages": (total_count + page_size - 1) // page_size # 计算总页数 # } return { "code": 200, "data": menu_list_dict, # 'pages': page_num, # 总页数 # 'currentPage': page_num, # 当前页数 # # 'current':current, # # 'total' : total, # 'total': total_count, # 总数据量 # # 'size':size, # 'pageSize': page_size, # 页码 # { # "items": menu_list_dict, # "pagination": pagination_info # }, "msg": "操作成功" } @router.get('/{menuid}') async def get_list( # request: Request, menuid: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysMenu) query = query.filter(SysMenu.del_flag != '2') if menuid: query = query.filter(SysMenu.menu_id.like(f'{menuid}')) menu= query.first() # 将模型实例转换为字典 menu_list_dict = { "menuId": menu.menu_id, "menuName": menu.menu_name, "parentId": menu.parent_id, "orderNum": menu.order_num, "path": menu.path, "component": menu.component, "queryParam": menu.query_param, "isFrame": str(menu.is_frame), "isCache": str(menu.is_cache), "menuType": menu.menu_type, "visible": menu.visible, "status": menu.status, "perms": menu.perms, "icon": menu.icon, "createDept": menu.create_dept, "remark": menu.remark, "createTime": menu.create_time.strftime('%Y-%m-%d %H:%M:%S') if menu.create_time else '', "children": [] # 递归调用以获取子菜单 } return { "code": 200, "data": menu_list_dict, "msg": "操作成功" } class SysMuneCreateForm(BaseModel): component:str = None icon : str isCache: str isFrame:str menuName: str menuType:str orderNum:int parentId:int path:str perms:str=None queryParam:str=None status:str visible:str @router.post('/create') async def create( form_data: SysMuneCreateForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: new_menu = SysMenu( menu_name = form_data.menuName, parent_id = form_data.parentId, order_num = form_data.orderNum, path = form_data.path, is_frame = int(form_data.isFrame), is_cache = int(form_data.isCache), menu_type = form_data.menuType, visible = form_data.visible, status = form_data.status, icon = form_data.icon, component= form_data.component, perms=form_data.perms, query_param=form_data.queryParam, create_by = user_id ) db.add(new_menu) db.commit() db.refresh(new_menu) return { "code": 200, "data": None, "msg": "操作成功" } except Exception as e: db.rollback() raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) class SysMuneUpdateForm(BaseModel): menuId : str component:str = None icon : str = None isCache: str = None isFrame:str = None menuName: str = None menuType:str = None orderNum:int = None parentId:int = None path:str = None perms:str=None queryParam:str=None status:str = None visible:str = None @router.put('') async def update( request: Request, form_data: SysMuneUpdateForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: query = db.query(SysMenu) query = query.filter(SysMenu.menu_id == form_data.menuId) query = query.filter(SysMenu.del_flag != '2') menu = query.first() if not menu: detail = "菜单不存在" raise HTTPException(status_code=404, detail="菜单不存在") # 更新字段,排除主键和不可更新的字段 if form_data.component: menu.component=form_data.component if form_data.icon: menu.icon=form_data.icon if form_data.isCache: menu.is_cache=form_data.isCache if form_data.isFrame: menu.is_frame=form_data.isFrame if form_data.menuName: menu.menu_name=form_data.menuName if form_data.menuType: menu.menu_type=form_data.menuType if form_data.orderNum: menu.order_num=form_data.orderNum if form_data.parentId: menu.parent_id=form_data.parentId if form_data.path: menu.path=form_data.path if form_data.perms: menu.perms=form_data.perms if form_data.queryParam: menu.query_param=form_data.queryParam if form_data.status: menu.status=form_data.status if form_data.visible: menu.visible=form_data.visible if user_id: menu.create_by = user_id # for field, value in menu_data.items(): # if field != 'menu_id' and field in menu_to_update.__dict__: # setattr(menu_to_update, field, value) # # db.add(menu_to_update) db.commit() return { "code": 200, "msg": "菜单更新成功" } except Exception as e: # db.rollback() if str(e)=='': e = detail raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.delete('/delete/{menu_id}') async def delete( menu_id: int, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: query = db.query(SysMenu) query = query.filter(SysMenu.menu_id == menu_id) query = query.filter(SysMenu.del_flag != '2') menu_to_delete =query.first() if not menu_to_delete: detail = "菜单不存在" raise HTTPException(status_code=404, detail="菜单不存在") menu_to_delete.create_by = user_id menu_to_delete.del_flag='2' # db.delete(menu_to_delete) db.commit() return { "code": 200, "msg": "菜单删除成功" } except Exception as e: db.rollback() if str(e)=='': e = detail raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))