#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from common.security import valid_access_token from pydantic import BaseModel from database import get_db from sqlalchemy.orm import Session from typing import List from models import * from utils import * from utils.ry_system_util import * import json from sqlalchemy.sql import func from common.auth_user import * import traceback router = APIRouter() # 定义 Meta 和 Child 模型用于嵌套结构 class Meta(BaseModel): title: str icon: str noCache: bool link: str = None class Child(BaseModel): name: str path: str hidden: bool component: str meta: Meta children: List['Child'] = [] class Router(BaseModel): name: str path: str hidden: bool redirect: str = 'noRedirect' component: str alwaysShow: bool = True meta: Meta children: List[Child] = [] class Router_frame(BaseModel): component:str hidden: bool meta:Meta name: str path: str redirect: str = 'noRedirect' children: List[Child] = [] alwaysShow: bool = True # checkedKeys roleMenuTreeselect @router.get('/roleMenuTreeselect/{roleId}') async def getmunutreeselect(request: Request,roleId:int,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): def build_dept_tree(menus, parent_dept): menu_tree = [] for menu_info in menus: menu = { "id": menu_info.menu_id, "label": menu_info.menu_name, "parentId": menu_info.parent_id, "weight": menu_info.order_num } # print(dept_info.dept_id) children = parent_id_get_menu_info(db, menu_info.menu_id) if len(children) > 0: children_depts = build_dept_tree(children, menu) menu["children"] = children_depts menu_tree.append(menu) return menu_tree checkedKeys = role_id_get_role_menus(db,roleId) menus = build_dept_tree(parent_id_get_menu_info(db, 0), None) return { "code": 200, "msg": "操作成功", "data": {"menus":menus,"checkedKeys":checkedKeys} } @router.get('/treeselect') async def getmunutreeselect(request: Request,db: Session = Depends(get_db), user_id: int = Depends(valid_access_token)): def build_dept_tree(menus, parent_dept): menu_tree = [] for menu_info in menus: menu = { "id": menu_info.menu_id, "label": menu_info.menu_name, "parentId": menu_info.parent_id, "weight": menu_info.order_num } # print(dept_info.dept_id) children = parent_id_get_menu_info(db, menu_info.menu_id) if len(children) > 0: children_depts = build_dept_tree(children, menu) menu["children"] = children_depts menu_tree.append(menu) return menu_tree result = build_dept_tree(parent_id_get_menu_info(db, 0), None) return { "code": 200, "msg": "操作成功", "data": result } @router.get('/getRouters') async def getRouters(request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): try: # 查询数据库中的所有菜单项,根据 parent_id 排序以构建树形结构 query = db.query(SysMenu) query = query.filter_by(parent_id=0) query = query.filter(SysMenu.menu_id!=11655) query = query.filter(SysMenu.del_flag != '2') query = query.order_by(SysMenu.order_num.asc()) menus =query.all() # 顶级菜单 # 递归函数用于构建树形结构 def build_menu_tree(menus, parent_menu): menu_tree = [] # 初始化一个列表来存储菜单树结构 for menu in menus: component = 'Layout' if menu.menu_type=='M' and parent_menu: component = 'ParentView' if menu.component: component= menu.component menu_data = { "component":component, "hidden":menu.visible == '1', "name":menu.path, "path":'/'+menu.path, # "redirect":'noRedirect', # "alwaysShow":True, "meta":{ "title":menu.menu_name, 'icon' : menu.icon, 'noCache' : menu.is_cache == '1', } } if menu.is_frame==0: menu_data['meta']['link']=menu.path # 如果菜单有子菜单,则递归构建子菜单 if menu.menu_type == 'M': # 假设 'M' 表示目录类型 # query = db.query(SysMenu) # query = query.filter_by(parent_id=menu.menu_id) # query = query.filter(SysMenu.del_flag != '2') # children_menus = db.query(SysMenu).filter_by(parent_id=menu.menu_id).all() # children_menus = query.all() menu_data['redirect'] ='noRedirect' menu_data['alwaysShow'] =True children_menus = parent_id_get_menu_info(db,menu.menu_id) if len(children_menus)>0: menu_data['children'] = build_menu_tree(children_menus, menu) if menu.parent_id==0: pass else: menu_data['path'] = menu_data['path'][1:] menu_tree.append(menu_data) # 将当前菜单数据添加到菜单树列表 return menu_tree # 构建顶级菜单的树形结构 routers = build_menu_tree(menus, None) # routers_dict = [router.dict() for router in routers] # routers_dict = [] # for router in routers: # router_info = router.dict() # if len(router_info['children'])==0: # del router_info['children'] # del router_info['redirect'] # del router_info['alwaysShow'] # router_info['path'] = router_info['path'] #[1:] # # routers_dict.append(router_info) # 返回构建好的路由数据 return { "code": 200, "msg": "操作成功", "data": routers #[router.dict() for router in routers] # 如果没有顶级菜单,返回空列表 } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/qydt/getRouters') async def getRouters(request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): try: # 查询数据库中的所有菜单项,根据 parent_id 排序以构建树形结构 query = db.query(SysMenu) query = query.filter_by(parent_id=0) query = query.filter(SysMenu.menu_id==11655) query = query.filter(SysMenu.del_flag != '2') menus =query.all() # 顶级菜单 # 递归函数用于构建树形结构 def build_menu_tree(menus, parent_menu): menu_tree = [] # 初始化一个列表来存储菜单树结构 for menu in menus: menu_data = { "component": menu.component or 'Layout', "hidden": menu.visible == '1', "name": menu.path, "path": '/' + menu.path, # "redirect":'noRedirect', # "alwaysShow":True, "meta": { "title": menu.menu_name, 'icon': menu.icon, 'noCache': menu.is_cache == '1', } } if menu.is_frame == 0: menu_data['meta']['link'] = menu.path # 如果菜单有子菜单,则递归构建子菜单 if menu.menu_type == 'M': # 假设 'M' 表示目录类型 query = db.query(SysMenu) query = query.filter_by(parent_id=menu.menu_id) query = query.filter(SysMenu.del_flag != '2') # children_menus = db.query(SysMenu).filter_by(parent_id=menu.menu_id).all() children_menus = query.all() menu_data['redirect'] = 'noRedirect' menu_data['alwaysShow'] = True print( menu.menu_id) # children_menus = parent_id_get_menu_info(db, menu.menu_id) if len(children_menus) > 0: menu_data['children'] = build_menu_tree(children_menus, menu) elif menu.parent_id == 0: pass else: menu_data['path'] = menu_data['path'][1:] menu_tree.append(menu_data) # 将当前菜单数据添加到菜单树列表 return menu_tree # 构建顶级菜单的树形结构 routers = build_menu_tree(menus, None) # routers_dict = [router.dict() for router in routers] # routers_dict = [] # for router in routers: # router_info = router.dict() # if len(router_info['children'])==0: # del router_info['children'] # del router_info['redirect'] # del router_info['alwaysShow'] # router_info['path'] = router_info['path'] #[1:] # # routers_dict.append(router_info) # 返回构建好的路由数据 return { "code": 200, "msg": "操作成功", "data": routers # [router.dict() for router in routers] # 如果没有顶级菜单,返回空列表 } except Exception as e: traceback.print_exc() raise HTTPException(status_code=500, detail=f"Internal server error: {str(e)}") @router.get('/list') async def get_list( # request: Request, menuName: str = Query(None, max_length=100), status: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysMenu) query = query.filter(SysMenu.del_flag != '2') if menuName: query = query.filter(SysMenu.menu_name.like(f'%{menuName}%')) if status: query = query.filter(SysMenu.status.like(f'%{status}%')) # 应用查询 # menu_list = db.query(SysMenu).filter( # (SysMenu.menu_name.like(f'%{menu_name}%')) , # (SysMenu.status.like(f'%{status}%')) # ).all() query = query.order_by(SysMenu.order_num.asc()) menu_list = query.all() # 将模型实例转换为字典 menu_list_dict = [{ "menuId": menu.menu_id, "menuName": menu.menu_name, "parentId": menu.parent_id, "orderNum": menu.order_num, "path": menu.path, "component": menu.component, "queryParam": menu.query_param, "isFrame": str(menu.is_frame), "isCache": str(menu.is_cache), "menuType": menu.menu_type, "visible": menu.visible, "status": menu.status, "perms": menu.perms, "icon": menu.icon, "createDept": menu.create_dept, "remark": menu.remark, "createTime": menu.create_time.strftime('%Y-%m-%d %H:%M:%S') if menu.create_time else '', "children": [] # 递归调用以获取子菜单 } for menu in menu_list] # 构建分页响应 # pagination_info = { # "total": total_count, # "page_num": page_num, # "page_size": page_size, # "total_pages": (total_count + page_size - 1) // page_size # 计算总页数 # } return { "code": 200, "data": menu_list_dict, # 'pages': page_num, # 总页数 # 'currentPage': page_num, # 当前页数 # # 'current':current, # # 'total' : total, # 'total': total_count, # 总数据量 # # 'size':size, # 'pageSize': page_size, # 页码 # { # "items": menu_list_dict, # "pagination": pagination_info # }, "msg": "操作成功" } @router.get('/layer_list') async def get_list( # request: Request, menuName: str = Query(None, max_length=100), status: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysMenuLayer) query = query.filter(SysMenuLayer.del_flag != '2') if menuName: query = query.filter(SysMenuLayer.menu_name.like(f'%{menuName}%')) if status: query = query.filter(SysMenuLayer.status.like(f'%{status}%')) # 应用查询 # menu_list = db.query(SysMenu).filter( # (SysMenu.menu_name.like(f'%{menu_name}%')) , # (SysMenu.status.like(f'%{status}%')) # ).all() menu_list = query.all() # 将模型实例转换为字典 menu_list_dict = [{ "menuId": menu.menu_id, "menuName": menu.menu_name, "parentId": menu.parent_id, "orderNum": menu.order_num, "path": menu.path, "component": menu.component, "queryParam": menu.query_param, "isFrame": str(menu.is_frame), "isCache": str(menu.is_cache), "menuType": menu.menu_type, "visible": menu.visible, "status": menu.status, "perms": menu.perms, "icon": menu.icon, "createDept": menu.create_dept, "remark": menu.remark, "createTime": menu.create_time.strftime('%Y-%m-%d %H:%M:%S') if menu.create_time else '', "children": [] # 递归调用以获取子菜单 } for menu in menu_list] # 构建分页响应 # pagination_info = { # "total": total_count, # "page_num": page_num, # "page_size": page_size, # "total_pages": (total_count + page_size - 1) // page_size # 计算总页数 # } return { "code": 200, "data": menu_list_dict, # 'pages': page_num, # 总页数 # 'currentPage': page_num, # 当前页数 # # 'current':current, # # 'total' : total, # 'total': total_count, # 总数据量 # # 'size':size, # 'pageSize': page_size, # 页码 # { # "items": menu_list_dict, # "pagination": pagination_info # }, "msg": "操作成功" } @router.get('/{menuid}') async def get_list( # request: Request, menuid: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysMenu) query = query.filter(SysMenu.del_flag != '2') if menuid: query = query.filter(SysMenu.menu_id.like(f'{menuid}')) menu= query.first() # 将模型实例转换为字典 menu_list_dict = { "menuId": menu.menu_id, "menuName": menu.menu_name, "parentId": menu.parent_id, "orderNum": menu.order_num, "path": menu.path, "component": menu.component, "queryParam": menu.query_param, "isFrame": str(menu.is_frame), "isCache": str(menu.is_cache), "menuType": menu.menu_type, "visible": menu.visible, "status": menu.status, "perms": menu.perms, "icon": menu.icon, "createDept": menu.create_dept, "remark": menu.remark, "createTime": menu.create_time.strftime('%Y-%m-%d %H:%M:%S') if menu.create_time else '', "children": [] # 递归调用以获取子菜单 } return { "code": 200, "data": menu_list_dict, "msg": "操作成功" } class SysMuneCreateForm(BaseModel): component:str = None icon : str isCache: str isFrame:str menuName: str menuType:str orderNum:int parentId:int path:str perms:str=None queryParam:str=None status:str visible:str @router.post('/create') async def create( form_data: SysMuneCreateForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: new_menu = SysMenu( menu_name = form_data.menuName, parent_id = form_data.parentId, order_num = form_data.orderNum, path = form_data.path, is_frame = int(form_data.isFrame), is_cache = int(form_data.isCache), menu_type = form_data.menuType, visible = form_data.visible, status = form_data.status, icon = form_data.icon, component= form_data.component, perms=form_data.perms, query_param=form_data.queryParam, create_by = user_id ) db.add(new_menu) db.commit() db.refresh(new_menu) return { "code": 200, "data": None, "msg": "操作成功" } except Exception as e: db.rollback() raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) class SysMuneUpdateForm(BaseModel): menuId : str component:str = None icon : str = None isCache: str = None isFrame:str = None menuName: str = None menuType:str = None orderNum:int = None parentId:int = None path:str = None perms:str=None queryParam:str=None status:str = None visible:str = None @router.put('') async def update( request: Request, # form_data: SysMuneUpdateForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: query = db.query(SysMenu) query = query.filter(SysMenu.menu_id == body['menuId']) query = query.filter(SysMenu.del_flag != '2') menu = query.first() if not menu: detail = "菜单不存在" raise HTTPException(status_code=404, detail="菜单不存在") # 更新字段,排除主键和不可更新的字段 if 'component' in body: menu.component=body['component'] if 'icon' in body: menu.icon=body['icon'] if 'isCache' in body: menu.is_cache=body['isCache'] if 'isFrame' in body: menu.is_frame=body['isFrame'] if 'menuName' in body: menu.menu_name=body['menuName'] if 'menuType' in body: menu.menu_type=body['menuType'] if 'orderNum' in body: menu.order_num=body['orderNum'] if 'parentId' in body: menu.parent_id=body['parentId'] if 'path' in body: menu.path=body['path'] if 'perms' in body: menu.perms=body['perms'] if 'queryParam' in body: menu.query_param=body['queryParam'] if 'status' in body: menu.status=body['status'] if 'visible' in body: menu.visible=body['visible'] if user_id: menu.create_by = user_id # for field, value in menu_data.items(): # if field != 'menu_id' and field in menu_to_update.__dict__: # setattr(menu_to_update, field, value) # # db.add(menu_to_update) db.commit() return { "code": 200, "msg": "菜单更新成功" } except Exception as e: # db.rollback() if str(e)=='': e = detail raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.delete('/{menu_id}') async def delete( menu_id: int, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: query = db.query(SysMenu) query = query.filter(SysMenu.menu_id == menu_id) query = query.filter(SysMenu.del_flag != '2') menu_to_delete =query.first() if not menu_to_delete: detail = "菜单不存在" raise HTTPException(status_code=404, detail="菜单不存在") menu_to_delete.create_by = user_id menu_to_delete.del_flag='2' # db.delete(menu_to_delete) db.commit() return { "code": 200, "msg": "菜单删除成功" } except Exception as e: db.rollback() if str(e)=='': e = detail raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e))