#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from fastapi.responses import JSONResponse from common.security import valid_access_token from pydantic import BaseModel from database import get_db from sqlalchemy.orm import Session from typing import List from models import * from utils import * from utils.ry_system_util import * import json from sqlalchemy.sql import func from common.auth_user import * from common.enc import mpfun, sys_menu_data, sys_menu_layer_data import traceback from common.db import db_czrz from exceptions import HmacException router = APIRouter() # 定义 Meta 和 Child 模型用于嵌套结构 class Meta(BaseModel): title: str icon: str noCache: bool link: str = None class Child(BaseModel): name: str path: str hidden: bool component: str meta: Meta children: List['Child'] = [] class Router(BaseModel): name: str path: str hidden: bool redirect: str = 'noRedirect' component: str alwaysShow: bool = True meta: Meta children: List[Child] = [] class Router_frame(BaseModel): component:str hidden: bool meta:Meta name: str path: str redirect: str = 'noRedirect' children: List[Child] = [] alwaysShow: bool = True # checkedKeys roleMenuTreeselect def get_video_routers(db:Session): from utils.video_util import tag_get_video_tag_list query = db.query(SysDictData) query = query.filter(SysDictData.dict_type == 'video_type') query = query.filter(SysDictData.del_flag != '2') query = query.order_by(SysDictData.dict_sort) # dict_data = db.query(SysDictData).filter_by(dict_type==dict_type and del_flag != '2').all() dict_data = query.all() # 将模型转换为字典 dict_data_list = [{ "component": '4', "isVideo":False, "hidden": True, "name": '4', "path": '2', # "redirect":'noRedirect', # "alwaysShow":True, "meta": { "title": '附近视频', 'icon': 'icon6', 'noCache': False, } }] for d in dict_data: li = tag_get_video_tag_list(db, d.dict_value) if len(li) > 0: dict_data_list.append({ "component": d.dict_value or 'Layout', "isVideo":True, "hidden": True, "name": d.dict_value, "path": '2', # "redirect":'noRedirect', # "alwaysShow":True, "meta": { "title": d.dict_label, 'icon': 'icon6', 'noCache': False, } }) return dict_data_list @router.get('/getRouters') async def getRouters(request: Request, db: Session = Depends(get_db), body = Depends(remove_xss_json), ZT: str = Query(None, max_length=100), user_id = Depends(valid_access_token)): try: # 查询数据库中的所有菜单项,根据 parent_id 排序以构建树形结构 query = db.query(SysMenuLayer) query = query.filter_by(parent_id=0) query = query.filter(SysMenuLayer.del_flag != '2') query = query.filter(SysMenuLayer.status != '1') if ZT: query = query.filter(SysMenuLayer.layer_template.like(f"%;{ZT};%")) query = query.order_by(SysMenuLayer.order_num) menus =query.all() # 顶级菜单 # 递归函数用于构建树形结构 def build_menu_tree(menus, parent_menu): menu_tree = [] # 初始化一个列表来存储菜单树结构 for menu in menus: if menu.is_frame==0: menu_data = Router_frame( component=menu.component or 'Layout', hidden=menu.layer_visible == '1', name=menu.path, path='/'+menu.path, meta = Meta( title=menu.menu_name, icon=menu.icon, noCache=menu.is_cache == '1', link = menu.path ), children=[] ) else: menu_data = Router( name=menu.path,#menu.menu_name, path='/'+menu.path, hidden=menu.layer_visible == '1', component=menu.component or 'Layout', meta=Meta( title=menu.menu_name, icon=menu.icon, noCache=menu.is_cache == '1' ), children=[] # 初始化 children 列表,即使没有子菜单 ) # 如果菜单有子菜单,则递归构建子菜单 if menu.menu_type == 'D' or menu.menu_type == 'Z': # 假设 'M' 表示目录类型 if menu.menu_id==11684: menu_data.children = get_video_routers(db) menu_tree.append(menu_data) continue query = db.query(SysMenuLayer) query = query.filter_by(parent_id=menu.menu_id) query = query.filter(SysMenuLayer.del_flag != '2') query = query.filter(SysMenuLayer.status != '1') if ZT: query = query.filter(SysMenuLayer.layer_template.like(f"%;{ZT};%")) query = query.order_by(SysMenuLayer.order_num) # children_menus = db.query(SysMenu).filter_by(parent_id=menu.menu_id).all() children_menus = query.all() menu_data.children = build_menu_tree(children_menus, menu) else: del menu_data.children #没有子菜单,删除children 列表 del menu_data.redirect del menu_data.alwaysShow menu_data.path = menu_data.path[1:] menu_tree.append(menu_data) # 将当前菜单数据添加到菜单树列表 return menu_tree # 构建顶级菜单的树形结构 routers = build_menu_tree(menus, None) # routers_dict = [router.dict() for router in routers] routers_dict = [] for router in routers: router_info = router.dict() print(router_info) if 'children' in router_info: if len(router_info['children'])==0: del router_info['children'] del router_info['redirect'] del router_info['alwaysShow'] router_info['path'] = router_info['path'][1:] routers_dict.append(router_info) # 返回构建好的路由数据 query = db.query(SysDictData) query = query.filter(SysDictData.del_flag != '2') query = query.filter(SysDictData.dict_type == 'sys_menu_layer_zt') if ZT is None: routers_dict = [] # return { # "code": 200, # "data": [], # "msg": "操作成功" # } query = query.order_by(SysDictData.order_num) zt_list = query.all() zt = [] for info in zt_list: zt.append({"data":[i for i in info.remark.split(';') if i != ''] if info.remark else [],'zt':info.dict_label,"zt_code":info.dict_value}) return { "code": 200, "msg": "操作成功", "data": routers_dict, #[router.dict() for router in routers] # 如果没有顶级菜单,返回空列表 'zt':zt } except Exception as e: # 处理异常,返回错误信息 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get('/list') async def get_list( # request: Request, menuName: str = Query(None, max_length=100), status: str = Query(None, max_length=100), ZT: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysMenuLayer) query = query.filter(SysMenuLayer.del_flag != '2') if menuName: query = query.filter(SysMenuLayer.menu_name.like(f'%{menuName}%')) if status: query = query.filter(SysMenuLayer.status==status) if ZT: query = query.filter(SysMenuLayer.layer_template.like(f"%;{ZT};%")) query = query.order_by(SysMenuLayer.order_num) # 应用查询 # menu_list = db.query(SysMenu).filter( # (SysMenu.menu_name.like(f'%{menu_name}%')) , # (SysMenu.status.like(f'%{status}%')) # ).all() menu_list = query.all() # 将模型实例转换为字典 menu_list_dict = [{ "menuId": menu.menu_id, "menuName": menu.menu_name, "parentId": menu.parent_id, "orderNum": menu.order_num, "path": menu.path, "component": menu.component, # "queryParam": menu.query_param, # "isFrame": str(menu.is_frame), # "isCache": str(menu.is_cache), "menuType": menu.menu_type, # "layerVisible": menu.layer_visible, "status": menu.status, # "perms": menu.perms, "icon": menu.icon, # "createDept": menu.create_dept, "remark": menu.remark, "createTime": menu.create_time.strftime('%Y-%m-%d %H:%M:%S') if menu.create_time else '', "children": [] , # 递归调用以获取子菜单 "ZT":[i for i in menu.layer_template.split(';') if i != ''] if menu.layer_template else [] } for menu in menu_list] return { "code": 200, "data": menu_list_dict, "msg": "操作成功" } @router.get('/info/{menuid}') async def get_list( # request: Request, menuid: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysMenuLayer) query = query.filter(SysMenuLayer.del_flag != '2') if menuid: query = query.filter(SysMenuLayer.menu_id==menuid) menu= query.first() # 将模型实例转换为字典 menu_list_dict = { "menuId": menu.menu_id, "menuName": menu.menu_name, "parentId": menu.parent_id, "orderNum": menu.order_num, "path": menu.path, "component": menu.component, # "queryParam": menu.query_param, # "isFrame": str(menu.is_frame), # "isCache": str(menu.is_cache), "menuType": menu.menu_type, # "layerVisible": menu.layer_visible, "status": menu.status, # "perms": menu.perms, "icon": menu.icon, # "createDept": menu.create_dept, "remark": menu.remark, "createTime": menu.create_time.strftime('%Y-%m-%d %H:%M:%S') if menu.create_time else '', "children": [], # 递归调用以获取子菜单 "ZT":[i for i in menu.layer_template.split(';') if i != ''] if menu.layer_template else [] } return { "code": 200, "data": menu_list_dict, "msg": "操作成功" } @router.post('/create') async def create( request: Request, # form_data: SysMuneCreateForm, db: Session = Depends(get_db), body=Depends(remove_xss_json), auth_user: AuthUser = Depends(find_auth_user), user_id=Depends(valid_access_token) ): try: # 开始事务 # db.begin() layer_template = ';'+';'.join(body['ZT'])+';' # 创建新的菜单层,此时还不能使用new_menu.menu_id,因为它还没有被赋值 new_menu_layer = SysMenuLayer( menu_name=body['menuName'], parent_id=body['parentId'], order_num=body['orderNum'], path=body['path'], # is_frame=int(body['isFrame']), # is_cache=int(body['isCache']), menu_type=body['menuType'], # layer_visible=body['layerVisible'], status=body['status'], icon=body['icon'], component=body['component'], # perms=body['perms'], # query_param=body['queryParam'], create_by=user_id, layer_template=layer_template ) db.add(new_menu_layer) # 提交事务 db.commit() sys_menu_data.sign_table() sys_menu_layer_data.sign_table() db_czrz.log(db, auth_user, "系统管理", f"后台管理新建地图菜单【{body['menuName']}】成功", request.client.host) return { "code": 200, "data": None,#new_menu_layer.menu_id, # 返回新创建的SysMenuLayer的ID "msg": "操作成功" } except Exception as e: traceback.print_exc() # 如果发生异常,回滚事务 db.rollback() raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.put('/update') async def update( request: Request, auth_user: AuthUser = Depends(find_auth_user), # form_data: SysMuneUpdateForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: queryLayer = db.query(SysMenuLayer) queryLayer = queryLayer.filter(SysMenuLayer.menu_id == body['menuId']) queryLayer = queryLayer.filter(SysMenuLayer.del_flag != '2') menu = queryLayer.first() if not menu: detail = "菜单不存在" raise HTTPException(status_code=404, detail="菜单不存在") # 更新字段,排除主键和不可更新的字段 if 'component' in body: menu.component = body['component'] if 'icon' in body: menu.icon = body['icon'] # if 'isCache' in body: # menu.is_cache = body['isCache'] # if 'isFrame' in body: # menu.is_frame = body['isFrame'] if 'menuName' in body: menu.menu_name = body['menuName'] if 'menuType' in body: menu.menu_type = body['menuType'] if 'orderNum' in body: menu.order_num = body['orderNum'] if 'parentId' in body: menu.parent_id = body['parentId'] if 'path' in body: menu.path = body['path'] # if 'perms' in body: # menu.perms = body['perms'] # if 'queryParam' in body: # menu.query_param = body['queryParam'] if 'status' in body: menu.status = body['status'] # if 'layer_visible' in body: # menu.layer_visible = body['layerVisible'] if 'ZT' in body: layer_template = ';' + ';'.join(body['ZT']) + ';' menu.layer_template = layer_template menu.update_by = user_id menu.update_time = datetime.now() menu.sign = sys_menu_layer_data.get_sign_hmac(menu) db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理更新地图菜单【{body['menuName']}】成功", request.client.host) return { "code": 200, "msg": "菜单更新成功" } except Exception as e: traceback.print_exc() # db.rollback() if str(e)=='': e = detail raise HTTPException(status_code=status.HTTP_500_INTERNAL_SERVER_ERROR, detail=str(e)) @router.delete('/delete/{menu_id}') async def delete( request: Request, menu_id: int, db: Session = Depends(get_db), body = Depends(remove_xss_json), auth_user: AuthUser = Depends(find_auth_user), user_id = Depends(valid_access_token) ): try: queryLayer = db.query(SysMenuLayer) queryLayer = queryLayer.filter(SysMenuLayer.menu_id == menu_id) queryLayer = queryLayer.filter(SysMenuLayer.del_flag != '2') menu_to_delete = queryLayer.first() if not menu_to_delete: detail = "菜单不存在" return JSONResponse(status_code=404,content={"code":404,"msg":"该id不存在"}) menu_to_delete.update_by = user_id menu_to_delete.update_time = datetime.now() menu_to_delete.del_flag = '2' menu_to_delete.sign = sys_menu_layer_data.get_sign_hmac(menu_to_delete) db.commit() db_czrz.log(db, auth_user, "系统管理", f"后台管理删除菜单【{menu_to_delete.menu_name}】成功", request.client.host) return { "code": 200, "msg": "菜单删除成功" } except Exception as e: traceback.print_exc() db.rollback() return JSONResponse(status_code=404, content={"code": 404, "msg": str(e)}) @router.get('/zt/dt/list') async def get_list( ZT: str = Query(None, max_length=100), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): query = db.query(SysDictData) query = query.filter(SysDictData.del_flag != '2') query = query.filter(SysDictData.dict_type=='sys_menu_layer_zt') if ZT is None: return { "code": 200, "data": [], "msg": "操作成功" } query = query.filter(SysDictData.dict_value==ZT) # query = query.order_by(SysDictData.order_num) # 应用查询 # menu_list = db.query(SysMenu).filter( # (SysMenu.menu_name.like(f'%{menu_name}%')) , # (SysMenu.status.like(f'%{status}%')) # ).all() zt = query.first() if zt is None: return { "code": 200, "data": [], "msg": "操作成功" } # 将模型实例转换为字典 zt_list = [i for i in zt.remark.split(';') if i != ''] if zt.remark else [] return { "code": 200, "data": zt_list, "msg": "操作成功" } @router.put('/zt/{id}/dt/update') async def update( request: Request, id:str, auth_user: AuthUser = Depends(find_auth_user), # form_data: SysMuneUpdateForm, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: query = db.query(SysDictData) query = query.filter(SysDictData.del_flag != '2') query = query.filter(SysDictData.dict_type == 'sys_menu_layer_zt') query = query.filter(SysDictData.dict_value == id) zt = query.first() if zt is None: return JSONResponse(status_code=404, content={"code": 404, "msg": '专题不存在'}) if 'dt' in body: remark = ';' + ';'.join(body['dt']) + ';' zt.remark = remark zt.update_by = user_id zt.update_time = datetime.now() db.commit() return {"code":200,"msg":"操作成功"} except Exception as e: traceback.print_exc() db.rollback() return JSONResponse(status_code=404, content={"code": 404, "msg": str(e)})