123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
- from common.security import valid_access_token
- from fastapi.responses import JSONResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import and_, or_
- from sqlalchemy.sql import func
- from common.auth_user import *
- from pydantic import BaseModel
- from database import get_db
- from typing import List
- from models import *
- from utils import *
- from utils.ry_system_util import *
- from utils.video_util import *
- import traceback
- import json
- router = APIRouter()
- def parent_id_get_contact_dept_list(db,parent_id):
- query = db.query(EmergencyContactDepartment)
- query = query.filter(EmergencyContactDepartment.del_flag != '2')
- query = query.filter(EmergencyContactDepartment.parent_department_id == parent_id)
- return query.all()
- def id_get_contact_dept_info(db,id):
- query = db.query(EmergencyContactDepartment)
- query = query.filter(EmergencyContactDepartment.del_flag != '2')
- query = query.filter(EmergencyContactDepartment.id == id)
- return query.first()
- @router.get('/department/data')
- async def get_dict_data_by_type(
- db: Session = Depends(get_db),
- user_id = Depends(valid_access_token)
- ):
- try:
- data = parent_id_get_contact_dept_list(db,0)
- def build_dept_tree(depts, parent_dept):
- dept_tree = [] # 初始化一个列表来存储菜单树结构
- for dept in depts:
- dept_data = {
- "id": dept.id,
- "isShowSelect": False,
- "label": dept.department_name,
- "regionPath": dept.regionPath,
- "yzy_unitid": dept.yzy_unitid,
- }
- children_depts = parent_id_get_contact_dept_list(db,dept.id)
- if len(children_depts) > 0:
- dept_data['children'] = build_dept_tree(children_depts, dept)
- dept_data['isShowSelect'] = True
- dept_tree.append(dept_data) # 将当前菜单数据添加到菜单树列表
- return dept_tree
- routers = build_dept_tree(data, None)
- # 构建返回结果
- result = {
- "data": routers,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.post('/department/create')
- async def create_dict_data(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 创建一个新的 SysDictData 实例
- new_dict_data = EmergencyContactDepartment(
- parent_department_id=body['parent_department_id'],
- department_name=body['department_name'],
- regionPath=body['regionPath'],
- yzy_unitid=body['yzy_unitid'],
- yzy_regionPath=body['yzy_regionPath'],
- display_order=body['display_order'],
- create_by=user_id
- )
- # 添加到会话并提交
- db.add(new_dict_data)
- db.commit()
- db.refresh(new_dict_data) # 可选,如果需要刷新实例状态
- # 构建返回结果
- result = {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.delete("/department/delete/{id}") # 使用 ID 来标识要删除的接口
- async def delete_dict_data(
- id: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从数据库中获取要删除的 OneShareApiEntity 实例
- query = db.query(EmergencyContactDepartment)
- query = query.filter(EmergencyContactDepartment.id == id)
- query = query.filter(EmergencyContactDepartment.del_flag != '2')
- dept_data = query.first()
- # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
- if not dept_data:
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': 'dept不存在'
- })
- dept_data.del_flag = '2'
- # 删除实例
- # db.delete(api)
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.get('/user/list')
- async def get_dict_data_by_type(
- dept_parent_id:int =Query(None),
- page: int = Query(1, gt=0),
- pageSize: int = Query(10, gt=0),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
- query = db.query(EmergencyContactUser)
- query = query.filter(EmergencyContactUser.del_flag != '2')
- # 添加查询条件
- if dept_parent_id:
- query = query.filter(EmergencyContactUser.department_id==dept_parent_id)
- # 计算总记录数
- total_count = query.count()
- # 计算分页
- offset = (page - 1) * pageSize
- user_data = query.offset(offset).limit(pageSize).all()
- # 转换为字典
- data_list = []
- for d in user_data:
- department = id_get_contact_dept_info(db,d.department_id)
- if department is None:
- department=''
- continue
- data_list.append({
- "id": d.id,
- "name": d.name,
- "position":d.position,
- "mobile_phone": d.mobile_phone,
- "office_phone":d.office_phone,
- "department_id":d.department_id,
- "department":department,
- "display_order":d.display_order,
- "yzy_unitid":d.yzy_unitid,
- "userid": d.userid,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
- })
- # 构建返回结果
- result = {
- "total": total_count,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (total_count + pageSize - 1) // pageSize,
- "data": data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.get('/user/info/{id}')
- async def get_dict_data_by_type(
- id: str ,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = dict_type_get_dict_data_info(db,'three_proofing')
- query = db.query(EmergencyContactUser)
- # 添加查询条件
- # if dictType:
- query = query.filter(EmergencyContactUser.id==id)
- query = query.filter(EmergencyContactUser.del_flag != '2')
- d = query.first()
- department = id_get_contact_dept_info(db, d.department_id)
- if department is None:
- department = ''
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': '用户部门未查到'
- })
- data_list = {
- "id": d.id,
- "name": d.name,
- "position": d.position,
- "mobile_phone": d.mobile_phone,
- "office_phone": d.office_phone,
- "department_id": d.department_id,
- "department": department,
- "display_order": d.display_order,
- "yzy_unitid": d.yzy_unitid,
- "userid": d.userid,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else ''
- }
- # 构建返回结果
- result = {
- "data": data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.post('/user/create')
- async def create_dict_data(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 创建一个新的 SysDictData 实例
- new_user_data = EmergencyContactUser(
- name=body['name'],
- position=body['position'],
- mobile_phone=body['mobile_phone'],
- office_phone=body['office_phone'],
- department_id=body['department_id'],
- display_order=body['display_order'],
- yzy_unitid=body['yzy_unitid'],
- userid=body['userid'],
- create_by=user_id
- )
- # 添加到会话并提交
- db.add(new_user_data)
- db.commit()
- # 构建返回结果
- result = {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.put("/user/update")
- async def updata_dict_type(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从请求数据创建一个新的 SysDictType 实例
- query = db.query(EmergencyContactUser)
- query = query.filter(EmergencyContactUser.id == body['id'])
- query = query.filter(EmergencyContactUser.del_flag != '2')
- # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode)
- # query = db.query(SysDictData).filter(SysDictData.del_flag != '2')
- user_data = query.first()
- if not user_data:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': 'user不存在'
- })
- user_data.name=body['name']
- user_data.position = body['position']
- user_data.mobile_phone = body['mobile_phone']
- user_data.position = body['position']
- user_data.department_id = body['department_id']
- user_data.display_order = body['display_order']
- user_data.yzy_unitid = body['yzy_unitid']
- user_data.userid = body['userid']
- user_data.update_by = user_id
- # 添加到数据库会话并提交
- db.commit()
- # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None # 根据你的响应示例,data 为 null
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.delete("/user/delete/{id}") # 使用 ID 来标识要删除的接口
- async def delete_dict_data(
- id: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从数据库中获取要删除的 OneShareApiEntity 实例
- query = db.query(EmergencyContactUser)
- query = query.filter(EmergencyContactUser.id == id)
- query = query.filter(EmergencyContactUser.del_flag != '2')
- user_data = query.first()
- # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
- if not user_data:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': 'type不存在'
- })
- user_data.del_flag = '2'
- # 删除实例
- # db.delete(api)
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
|