from fastapi import APIRouter, Request, Depends, HTTPException, Query, Body from sqlalchemy.exc import IntegrityError from fastapi.responses import HTMLResponse, FileResponse from fastapi.responses import JSONResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from models import * import json import random from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_event_management, db_user, db_area, db_emergency_plan from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta from common.db import db_dict from urllib.parse import quote import base64 from config import settings from pydantic import BaseModel from typing import List, Dict,Set router = APIRouter() def get_dept_tree(db: Session, dept_id: int, tag_ids: Optional[Set[int]] = None) -> Dict: dept = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() if not dept: return None # 收集祖先部门名称 ancestors_names = [] current_dept = dept while current_dept: ancestors_names.append(current_dept.dept_name) current_dept = db.query(SysDept).filter(SysDept.dept_id == current_dept.parent_id).first() ancestors_names.reverse() # 获取部门标签 dept_tags = db.query(SysDeptTag).filter(SysDeptTag.dept_id == dept.dept_id).all() dept_tags_info = [{"tag_id": tag.tag_id, "tag_name": tag.tag_name} for tag in db.query(DeptTag).filter(DeptTag.tag_id.in_([tag.tag_id for tag in dept_tags])).all()] # 如果提供了标签ID参数,检查部门是否有匹配的标签 if tag_ids and not any(tag['tag_id'] in tag_ids for tag in dept_tags_info): # 如果没有匹配的标签,检查子部门是否有匹配的标签 child_with_tag = False for child in db.query(SysDept).filter(SysDept.parent_id == dept.dept_id).all(): if get_dept_tree(db, child.dept_id, tag_ids) is not None: child_with_tag = True break if not child_with_tag: return None # 如果子部门也没有匹配的标签,则不包含该部门 # 获取用户标签 def get_user_tags(user_id): user_tags_subquery = db.query(SysUserTag.tag_id).filter(SysUserTag.user_id == user_id).subquery() tags = db.query(SysTag).filter(SysTag.tag_id.in_(user_tags_subquery)).all() return [{"tag_id": tag.tag_id, "tag_name": tag.tag_name} for tag in tags] # 获取部门下的用户 users_info = [] for user in db.query(SysUser).filter(SysUser.dept_id == dept.dept_id).all(): user_tags_info = get_user_tags(user.user_id) # 如果提供了标签ID参数,检查用户是否有匹配的标签 if tag_ids and not any(tag['tag_id'] in tag_ids for tag in user_tags_info): continue # 如果用户没有匹配的标签,则不包含该用户 users_info.append({ "user_id": user.user_id, "dept_id": user.dept_id, "position": "无", "user_name": user.user_name, "email": user.email, "user_tags": user_tags_info }) # 构建子部门树 child_trees = [get_dept_tree(db, child.dept_id, tag_ids) for child in db.query(SysDept).filter(SysDept.parent_id == dept.dept_id).all()] child_trees = [child for child in child_trees if child] dept_info = { "dept_id": dept.dept_id, "parent_id": dept.parent_id, "ancestors": '/'.join(ancestors_names), "dept_name": dept.dept_name, "tags": dept_tags_info, "users": users_info, "children": child_trees } return dept_info @router.get("/alldepts", response_model=Dict) def read_all_depts(db: Session = Depends(get_db), tag_ids: List[int] = Query(None, title="Tag IDs")): tag_ids_set = set(tag_ids) if tag_ids else None top_level_depts = get_dept_tree(db, 3, tag_ids_set) if not top_level_depts: raise HTTPException(status_code=404, detail="Department not found") # return [top_level_depts] return { "code": 200, "msg": "成功", "data": top_level_depts } class DeptTagInfo(BaseModel): tag_id: int tag_name: str @router.get("/depts", response_model=Dict) def get_dept(db: Session = Depends(get_db)): dept_tags = db.query(DeptTag).all() # 如果没有找到记录,返回404错误 if not dept_tags: raise HTTPException(status_code=404, detail="No dept tags found") dept_tags_data = [ DeptTagInfo(tag_id=dept_tag.tag_id,tag_name=dept_tag.tag_name) for dept_tag in dept_tags ] return { "code": 200, "msg": "成功", "data": dept_tags_data } class User(BaseModel): user_id: int user_name: str # nick_name: str class TagIDs(BaseModel): tag_ids: List[int] @router.post("/users_by_tags") def read_users_by_tags( db: Session = Depends(get_db), tag_ids_data: TagIDs = Body(...) ): # 提取tag_ids tag_ids = tag_ids_data.tag_ids if not tag_ids: raise HTTPException(status_code=400, detail="Tag IDs are required") # 构建查询条件 conditions = [SysUserTag.tag_id == tag_id for tag_id in tag_ids] query = db.query(SysUser).join(SysUserTag, SysUser.user_id == SysUserTag.user_id).filter(or_(*conditions)) distinct_user_count = db.query(SysUserTag.user_id).filter(SysUserTag.tag_id.in_(tag_ids)).count() # 执行查询并去重 这里的去重只是单纯的用户标签去重,如果有多用户的得看到时候实际数据再规划如何去重 users_with_tags = query.distinct().all() non_distinct_user_count = len(users_with_tags) print(distinct_user_count,non_distinct_user_count) # 如果没有找到用户,返回404错误 if not users_with_tags: raise HTTPException(status_code=404, detail="No users found for the given tag IDs") # 将用户模型转换为User Pydantic模型列表 users_data = [ User(user_id=user.user_id, user_name=user.user_name) for user in users_with_tags ] return { "code": 200, "msg": "成功", "distinct_user_count":distinct_user_count, "non_distinct_user_count":non_distinct_user_count, "data": users_data } class TagTree(BaseModel): tag_id: int tag_name: str parent_id: int children: List['TagTree'] = [] # 递归构建树形结构 def build_tree(tags, parent_id=0): tree = [] for tag in tags: if tag.parent_id == parent_id: children = build_tree(tags, tag.tag_id) tree.append(TagTree(tag_id=tag.tag_id, tag_name=tag.tag_name, parent_id=tag.parent_id, children=children)) return tree @router.post("/tags_tree") def get_tag_tree( db: Session = Depends(get_db), tag_ids_data: TagIDs = Body(...) ): # 提取tag_ids tag_ids = tag_ids_data.tag_ids if not tag_ids: raise HTTPException(status_code=400, detail="Tag IDs are required") # 查询所有相关的标签 tags = db.query(SysTag).filter(SysTag.tag_id.in_(tag_ids)).all() # 如果没有找到标签,返回404错误 if not tags: raise HTTPException(status_code=404, detail="Tags not found for the given tag IDs") # 构建标签树 tag_tree = build_tree(tags) return { "code": 200, "msg": "成功", "data": tag_tree } class TagData(BaseModel): user_ids: List[int] tag_name: str @router.post("/add_common_list_tags") def add_common_list_tags( db: Session = Depends(get_db), data: TagData = Body(...) ): user_ids = data.user_ids tag_name = data.tag_name if not user_ids or not tag_name: raise HTTPException(status_code=tag_name, detail="User IDs and Tag Name are required") # 查询标签是否存在 tag = db.query(Common_List_Tags).filter(Common_List_Tags.tag_name == tag_name).first() print(tag) # print(tag.tag_id) # 如果标签不存在,则创建新标签 if not tag: tag = Common_List_Tags(tag_name=tag_name) db.add(tag) db.commit() tag_id = tag.tag_id else: tag_id = tag.tag_id # 为每个用户ID插入记录,如果已存在则跳过 for user_id in user_ids: existing_record = db.query(Sys_Common_List_Tags).filter_by(user_id=user_id, tag_id=tag_id).first() if not existing_record: db.add(Sys_Common_List_Tags(user_id=user_id, tag_id=tag_id)) db.commit() return {"code": 200,"msg": "操作成功", "tag_id": tag_id} class TagUserCount(BaseModel): tag_id: int tag_name: str user_count: int @router.get("/tags_user_count", response_model=Dict) def get_tags_user_count(db: Session = Depends(get_db)): # 查询所有标签 tags = db.query(Common_List_Tags).all() # 如果没有找到标签,返回404错误 if not tags: raise HTTPException(status_code=404, detail="No tags found") # 构建结果列表 result = [] for tag in tags: # 查询与标签相关的用户数量 user_count = db.query(Sys_Common_List_Tags).filter(Sys_Common_List_Tags.tag_id == tag.tag_id).count() result.append(TagUserCount(tag_id=tag.tag_id, tag_name=tag.tag_name, user_count=user_count)) return { "code": 200, "msg": "成功", "data": result }