#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status from common.security import valid_access_token from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from sqlalchemy.sql import func from common.auth_user import * from pydantic import BaseModel from database import get_db from typing import List from models import * from utils import * from utils.ry_system_util import * from utils.video_util import * import traceback import json router = APIRouter() def name_get_user_id(db,keywords): query = db.query(EmergencyContactUser) query = query.filter(EmergencyContactUser.del_flag != '2') query = query.filter(EmergencyContactUser.name.like(f"%{keywords}%")) data = query.all() return [info.id for info in data] def user_id_get_duty_id(db,user_id_list): query = db.query(DutyPersonnelArrangement) query = query.filter(DutyPersonnelArrangement.del_flag != '2') query = query.filter(DutyPersonnelArrangement.personnel_id._in(user_id_list)) data = query.all() return [info.duty_id for info in data] def duty_id_get_user_id(db,duty_id): query = db.query(DutyPersonnelArrangement) query = query.filter(DutyPersonnelArrangement.del_flag != '2') query = query.filter(DutyPersonnelArrangement.id==duty_id) data = query.all() return data def user_id_get_info(db,user_id): query = db.query(EmergencyContactUser) query = query.filter(EmergencyContactUser.del_flag != '2') query = query.filter(EmergencyContactUser.id==user_id) data = query.all() return data @router.get('/list') async def get_dict_data_by_type( keywords:str =Query(None), duty_type:str =Query(None), start_time:str =Query(None), end_time:str =Query(None), page: int = Query(1, gt=0), pageSize: int = Query(10, gt=0), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 根据 dict_type 查询字典数据 # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all() query = db.query(DutySchedule) query = query.filter(DutySchedule.del_flag != '2') # 添加查询条件 if keywords: user_list= name_get_user_id(db,keywords) duty_list = user_id_get_duty_id(db,user_list) query = query.filter(DutySchedule.id._in(duty_list)) if duty_type: query = query.filter(DutySchedule.duty_type==duty_type) if start_time: start_time = datetime.strptime(start_time, "%Y-%m-%d").date() query = query.filter(DutySchedule.duty_date>=start_time) if end_time: end_time = datetime.strptime(end_time, "%Y-%m-%d").date() query = query.filter(DutySchedule.duty_date<=end_time) # 计算总记录数 total_count = query.count() # 计算分页 offset = (page - 1) * pageSize duty_data = query.offset(offset).limit(pageSize).all() # 转换为字典 data_list = [] for d in duty_data: user_data = [] user_list = duty_id_get_user_id(db,d.id) for user_info in user_list: contact_info = user_id_get_info(db, user_info.personnel_id) user_data.append({"position_id":user_info.position_id, "personnel_id":user_info.personnel_id, "name":contact_info.name, "position":contact_info.position, "mobile_phone":contact_info.mobile_phone, "office_phone":contact_info.office_phone, "department_id":contact_info.department_id, "yzy_userid":contact_info.yzy_userid}) data_list.append({ "id": d.id, "start_time": d.start_time, "end_time":d.end_time, "duty_date": d.duty_date, "shift_type":d.shift_type, "duty_unit": d.duty_unit, "duty_type": d.duty_type, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '', "user_data":user_data }) # 构建返回结果 result = { "total": total_count, "page": page, "pageSize": pageSize, "totalPages": (total_count + pageSize - 1) // pageSize, "data": data_list, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.get('/info/{id}') async def get_dict_data_by_type( id: str , db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 根据 dict_type 查询字典数据 # dict_data = dict_type_get_dict_data_info(db,'three_proofing') query = db.query(DutySchedule) # 添加查询条件 # if dictType: query = query.filter(DutySchedule.id==id) query = query.filter(DutySchedule.del_flag != '2') d = query.first() user_data = [] user_list = duty_id_get_user_id(db, d.id) for user_info in user_list: contact_info = user_id_get_info(db, user_info.personnel_id) user_data.append({"position_id": user_info.position_id, "personnel_id": user_info.personnel_id, "name": contact_info.name, "position": contact_info.position, "mobile_phone": contact_info.mobile_phone, "office_phone": contact_info.office_phone, "department_id": contact_info.department_id, "yzy_userid": contact_info.yzy_userid}) data_list={ "id": d.id, "start_time": d.start_time, "end_time": d.end_time, "duty_date": d.duty_date, "shift_type": d.shift_type, "duty_unit": d.duty_unit, "duty_type": d.duty_type, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '', "user_data": user_data } # 构建返回结果 result = { "data": data_list, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.post('/create') async def create_dict_data( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 创建一个新的 SysDictData 实例 user_data = body['user_data'] if len(user_data)==0: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '值班人员不能为空' }) new_duty_data = DutySchedule( start_time=body['start_time'], end_time=body['end_time'], duty_date=body['duty_date'], shift_type=body['shift_type'], duty_unit=body['duty_unit'], duty_type=body['duty_type'], create_by=user_id ) # 添加到会话并提交 db.add(new_duty_data) db.commit() db.refresh(new_duty_data) user_list = [] for user_info in user_data: user_list.appent( DutyPersonnelArrangement( duty_id=new_duty_data.id, position_id=user_info['position_id'], personnel_id=user_info['personnel_id'], create_by=user_id )) db.add_all(user_list) db.commit() # 构建返回结果 result = { "code": 200, "msg": "操作成功", "data": None } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.put("/update") async def updata_dict_type( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从请求数据创建一个新的 SysDictType 实例 query = db.query(DutySchedule) query = query.filter(DutySchedule.id == body['id']) query = query.filter(DutySchedule.del_flag != '2') # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode) # query = db.query(SysDictData).filter(SysDictData.del_flag != '2') duty_data = query.first() old_user_list = duty_id_get_user_id(db,duty_data.id) for info in old_user_list: info.del_flag = '2' db.commit() user_data = body['user_data'] if len(user_data)==0: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '值班人员不能为空' }) user_list = [] for user_info in user_data: user_list.appent(DutyPersonnelArrangement( duty_id=duty_data.id, position_id=user_info['position_id'], personnel_id=user_info['personnel_id'], create_by=user_id )) db.add_all(user_list) if not dutyn_data: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '值班不存在' }) duty_data.start_time=body['start_time'] duty_data.end_time = body['end_time'] duty_data.duty_date = body['duty_date'] duty_data.shift_type=body['shift_type'] duty_data.duty_unit = body['duty_unit'] duty_data.duty_type = body['duty_type'] duty_data.update_by = user_id # 添加到数据库会话并提交 db.commit() # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态 # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None # 根据你的响应示例,data 为 null } except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.delete("/delete/{id}") # 使用 ID 来标识要删除的接口 async def delete_dict_data( id: str, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从数据库中获取要删除的 OneShareApiEntity 实例 query = db.query(DutySchedule) query = query.filter(DutySchedule.id == id) query = query.filter(DutySchedule.del_flag != '2') position_data = query.first() # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first() if not position_data: return JSONResponse(status_code=404, content={ 'code': 404, 'msg': '值班不存在' }) position_data.del_flag = '2' # 删除实例 # db.delete(api) db.commit() # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } except Exception as e: traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.delete("/delete_list/{id_list}") # 使用 ID 来标识要删除的接口 async def delete_dict_data( id_list: str, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从数据库中获取要删除的 OneShareApiEntity 实例 id_list = [int(i) for i in id_list.split(',')] query = db.query(DutySchedule) query = query.filter(DutySchedule.id._in(id_list)) query = query.filter(DutySchedule.del_flag != '2') position_data = query.all() # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first() if not position_data: return JSONResponse(status_code=404, content={ 'code': 404, 'msg': '值班不存在' }) for info in position_data: info.del_flag = '2' # 删除实例 # db.delete(api) db.commit() # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } except Exception as e: traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) })