#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from common.security import valid_access_token from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from sqlalchemy.sql import func from common.auth_user import * from pydantic import BaseModel from database import get_db from typing import List from models import * from utils import * from utils.ry_system_util import * from utils.video_util import * import traceback import json router = APIRouter() def parent_id_get_contact_dept_list(db,parent_id): query = db.query(EmergencyContactDepartment) query = query.filter(EmergencyContactDepartment.del_flag != '2') query = query.filter(EmergencyContactDepartment.parent_department_id == parent_id) return query.all() def id_get_contact_dept_info(db,id): query = db.query(EmergencyContactDepartment) query = query.filter(EmergencyContactDepartment.del_flag != '2') query = query.filter(EmergencyContactDepartment.id == id) return query.first() @router.get('/department/data') async def get_dict_data_by_type( db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: data = parent_id_get_contact_dept_list(db,0) def build_dept_tree(depts, parent_dept): dept_tree = [] # 初始化一个列表来存储菜单树结构 for dept in depts: dept_data = { "id": dept.id, "isShowSelect": False, "label": dept.department_name, "regionPath": dept.regionPath, "yzy_unitid": dept.yzy_unitid, } children_depts = parent_id_get_contact_dept_list(db,dept.id) if len(children_depts) > 0: dept_data['children'] = build_dept_tree(children_depts, dept) dept_data['isShowSelect'] = True dept_tree.append(dept_data) # 将当前菜单数据添加到菜单树列表 return dept_tree routers = build_dept_tree(data, None) # 构建返回结果 result = { "data": routers, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.post('/department/create') async def create_dict_data( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 创建一个新的 SysDictData 实例 new_dict_data = EmergencyContactDepartment( parent_department_id=body['parent_department_id'], department_name=body['department_name'], regionPath=body['regionPath'], yzy_unitid=body['yzy_unitid'], yzy_regionPath=body['yzy_regionPath'], display_order=body['display_order'], create_by=user_id ) # 添加到会话并提交 db.add(new_dict_data) db.commit() db.refresh(new_dict_data) # 可选,如果需要刷新实例状态 # 构建返回结果 result = { "code": 200, "msg": "操作成功", "data": None } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.delete("/department/delete/{id}") # 使用 ID 来标识要删除的接口 async def delete_dict_data( id: str, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从数据库中获取要删除的 OneShareApiEntity 实例 query = db.query(EmergencyContactDepartment) query = query.filter(EmergencyContactDepartment.id == id) query = query.filter(EmergencyContactDepartment.del_flag != '2') dept_data = query.first() # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first() if not dept_data: return JSONResponse(status_code=404, content={ 'code': 404, 'msg': 'dept不存在' }) dept_data.del_flag = '2' # 删除实例 # db.delete(api) db.commit() # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } except Exception as e: traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.get('/user/list') async def get_dict_data_by_type( dept_parent_id:int =Query(None), page: int = Query(1, gt=0), pageSize: int = Query(10, gt=0), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 根据 dict_type 查询字典数据 # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all() query = db.query(EmergencyContactUser) query = query.filter(EmergencyContactUser.del_flag != '2') # 添加查询条件 if dept_parent_id: query = query.filter(EmergencyContactUser.department_id==dept_parent_id) # 计算总记录数 total_count = query.count() # 计算分页 offset = (page - 1) * pageSize user_data = query.offset(offset).limit(pageSize).all() # 转换为字典 data_list = [] for d in user_data: department = id_get_contact_dept_info(db,d.department_id) if department is None: department='' continue data_list.append({ "id": d.id, "name": d.name, "position":d.position, "mobile_phone": d.mobile_phone, "office_phone":d.office_phone, "department_id":d.department_id, "department":department, "display_order":d.display_order, "yzy_unitid":d.yzy_unitid, "userid": d.userid, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '' }) # 构建返回结果 result = { "total": total_count, "page": page, "pageSize": pageSize, "totalPages": (total_count + pageSize - 1) // pageSize, "data": data_list, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.get('/user/info/{id}') async def get_dict_data_by_type( id: str , db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 根据 dict_type 查询字典数据 # dict_data = dict_type_get_dict_data_info(db,'three_proofing') query = db.query(EmergencyContactUser) # 添加查询条件 # if dictType: query = query.filter(EmergencyContactUser.id==id) query = query.filter(EmergencyContactUser.del_flag != '2') d = query.first() department = id_get_contact_dept_info(db, d.department_id) if department is None: department = '' return JSONResponse(status_code=404, content={ 'code': 404, 'msg': '用户部门未查到' }) data_list = { "id": d.id, "name": d.name, "position": d.position, "mobile_phone": d.mobile_phone, "office_phone": d.office_phone, "department_id": d.department_id, "department": department, "display_order": d.display_order, "yzy_unitid": d.yzy_unitid, "userid": d.userid, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '' } # 构建返回结果 result = { "data": data_list, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.post('/user/create') async def create_dict_data( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 创建一个新的 SysDictData 实例 new_user_data = EmergencyContactUser( name=body['name'], position=body['position'], mobile_phone=body['mobile_phone'], office_phone=body['office_phone'], department_id=body['department_id'], display_order=body['display_order'], yzy_unitid=body['yzy_unitid'], userid=body['userid'], create_by=user_id ) # 添加到会话并提交 db.add(new_user_data) db.commit() # 构建返回结果 result = { "code": 200, "msg": "操作成功", "data": None } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.put("/user/update") async def updata_dict_type( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从请求数据创建一个新的 SysDictType 实例 query = db.query(EmergencyContactUser) query = query.filter(EmergencyContactUser.id == body['id']) query = query.filter(EmergencyContactUser.del_flag != '2') # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode) # query = db.query(SysDictData).filter(SysDictData.del_flag != '2') user_data = query.first() if not user_data: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': 'user不存在' }) user_data.name=body['name'] user_data.position = body['position'] user_data.mobile_phone = body['mobile_phone'] user_data.position = body['position'] user_data.department_id = body['department_id'] user_data.display_order = body['display_order'] user_data.yzy_unitid = body['yzy_unitid'] user_data.userid = body['userid'] user_data.update_by = user_id # 添加到数据库会话并提交 db.commit() # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态 # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None # 根据你的响应示例,data 为 null } except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.delete("/user/delete/{id}") # 使用 ID 来标识要删除的接口 async def delete_dict_data( id: str, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从数据库中获取要删除的 OneShareApiEntity 实例 query = db.query(EmergencyContactUser) query = query.filter(EmergencyContactUser.id == id) query = query.filter(EmergencyContactUser.del_flag != '2') user_data = query.first() # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first() if not user_data: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': 'type不存在' }) user_data.del_flag = '2' # 删除实例 # db.delete(api) db.commit() # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } except Exception as e: traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) })