#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status, BackgroundTasks from common.security import valid_access_token from fastapi.responses import JSONResponse, FileResponse,StreamingResponse from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from sqlalchemy.sql import func from common.auth_user import * from pydantic import BaseModel from common.db import db_dept from common.db import db_czrz from exceptions import AppException, HmacException from database import get_db from typing import List from models import * from utils import * from utils.ry_system_util import * from utils.video_util import * import traceback import os import json router = APIRouter() def name_get_user_id(db,keywords): query = db.query(EmergencyContactUser) query = query.filter(EmergencyContactUser.del_flag != '2') query = query.filter(EmergencyContactUser.name.like(f"%{keywords}%")) data = query.all() return [info.id for info in data] def user_id_get_duty_id(db,user_id_list): query = db.query(DutyPersonnelArrangement) query = query.filter(DutyPersonnelArrangement.del_flag != '2') query = query.filter(DutyPersonnelArrangement.personnel_id.in_(user_id_list)) data = query.all() return [info.duty_id for info in data] def duty_id_get_user_id(db,duty_id): query = db.query(DutyPersonnelArrangement) query = query.filter(DutyPersonnelArrangement.del_flag != '2') query = query.filter(DutyPersonnelArrangement.duty_id==duty_id) data = query.all() return data def user_id_get_info(db,user_id): query = db.query(EmergencyContactUser) query = query.filter(EmergencyContactUser.del_flag != '2') query = query.filter(EmergencyContactUser.id==user_id) data = query.first() return data def position_id_get_info(db,position_id): query = db.query(DutyPosition) query = query.filter(DutyPosition.del_flag != '2') query = query.filter(DutyPosition.id==position_id) data = query.first() return data def dept_id_get_info(db,dept_id): query = db.query(EmergencyContactDepartment) query = query.filter(EmergencyContactDepartment.del_flag != '2') query = query.filter(EmergencyContactDepartment.id==dept_id) data = query.first() return data def get_duty_unit_id_list(db): query = db.query(SysDictData) query = query.filter(SysDictData.del_flag != '2') query = query.filter(SysDictData.dict_type=='duty_unit') data = query.all() return [int(i.dict_value) for i in data] def dept_id_get_duty_unit_id(db,dept_id,duty_unit_id_list): if dept_id == 0: return 0 if dept_id in duty_unit_id_list: return dept_id query = db.query(EmergencyContactDepartment) query = query.filter(EmergencyContactDepartment.del_flag != '2') query = query.filter(EmergencyContactDepartment.id==dept_id) data = query.first() if data: dept_id_get_duty_unit_id(db, data.parent_department_id, duty_unit_id_list) else: return 0 def mobile_phone_get_dept_id(db,mobile_phone,duty_unit_id_list): query = db.query(EmergencyContactUser) query = query.filter(EmergencyContactUser.del_flag != '2') query = query.filter(EmergencyContactUser.mobile_phone==mobile_phone) data = query.first() if data: dept_id = data.department_id dept_id = dept_id_get_duty_unit_id(db, dept_id, duty_unit_id_list) return dept_id else: return 0 @router.get('/list') async def get_dict_data_by_type( keywords:str =Query(None), duty_type:str =Query(None), start_time:str =Query(None), end_time:str =Query(None), duty_unit:int =Query(None), page: int = Query(1, gt=0), pageSize: int = Query(10, gt=0), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 根据 dict_type 查询字典数据 # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all() query = db.query(DutySchedule) query = query.filter(DutySchedule.del_flag != '2') # 添加查询条件 if keywords: user_list= name_get_user_id(db,keywords) duty_list = user_id_get_duty_id(db,user_list) query = query.filter(DutySchedule.id.in_(duty_list)) if duty_type: query = query.filter(DutySchedule.duty_type==duty_type) if start_time: start_time = datetime.strptime(start_time, "%Y-%m-%d").date() query = query.filter(DutySchedule.duty_date>=start_time) if end_time: end_time = datetime.strptime(end_time, "%Y-%m-%d").date() query = query.filter(DutySchedule.duty_date<=end_time) if duty_unit: query = query.filter(DutySchedule.duty_unit==duty_unit) # 计算总记录数 total_count = query.count() # 计算分页 offset = (page - 1) * pageSize query = query.order_by(DutySchedule.start_time.desc(),DutySchedule.end_time.desc(),DutySchedule.update_time.desc()) duty_data = query.offset(offset).limit(pageSize).all() # 转换为字典 data_list = [] for d in duty_data: user_data = [] user_list = duty_id_get_user_id(db,d.id) for user_info in user_list: contact_info = user_id_get_info(db, user_info.personnel_id) user_data.append({"position_id":user_info.position_id, "personnel_id":user_info.personnel_id, "name":contact_info.name, "position":contact_info.position, "mobile_phone":contact_info.mobile_phone, "office_phone":contact_info.office_phone, "department_id":contact_info.department_id, "yzy_userid":contact_info.userid}) data_list.append({ "id": d.id, "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '', "end_time":d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '', "duty_date": d.duty_date, "shift_type":d.shift_type, "duty_unit": d.duty_unit, "duty_type": d.duty_type, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '', "user_data":user_data }) # 构建返回结果 result = { "total": total_count, "page": page, "pageSize": pageSize, "totalPages": (total_count + pageSize - 1) // pageSize, "data": data_list, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.get('/latest_by_group') async def get_dict_data_by_type( keywords:str =Query(None), start_time:str =Query(None), end_time:str =Query(None), duty_unit:int =Query(None), page: int = Query(1, gt=0), pageSize: int = Query(10, gt=0), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 根据 dict_type 查询字典数据 # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all() query = db.query(DutySchedule) query = query.filter(DutySchedule.del_flag != '2') # 添加查询条件 if keywords: user_list= name_get_user_id(db,keywords) duty_list = user_id_get_duty_id(db,user_list) query = query.filter(DutySchedule.id.in_(duty_list)) if start_time: start_time = datetime.strptime(start_time, "%Y-%m-%d").date() query = query.filter(DutySchedule.duty_date>=start_time) if end_time: end_time = datetime.strptime(end_time, "%Y-%m-%d").date() query = query.filter(DutySchedule.duty_date<=end_time) if duty_unit: query = query.filter(DutySchedule.duty_unit==duty_unit) else: now_user_info=user_id_get_user_info(db,user_id) print(user_id,now_user_info.phonenumber,get_duty_unit_id_list(db)) if now_user_info: query = query.filter(DutySchedule.duty_unit==mobile_phone_get_dept_id(db,mpfun.dec_data(now_user_info.phonenumber),get_duty_unit_id_list(db))) else: return { "total": 0, "page": page, "pageSize": pageSize, "totalPages": (0 + pageSize - 1) // pageSize, "data": [], "code": 200, "msg": "查询成功" } subquery = db.query( DutySchedule.duty_date, DutySchedule.shift_type, func.max(DutySchedule.update_time).label("latest_create_time") ).filter(DutySchedule.del_flag != '2').group_by( DutySchedule.duty_date, DutySchedule.shift_type ).subquery() query = query.join( subquery, and_( DutySchedule.duty_date == subquery.c.duty_date, DutySchedule.shift_type == subquery.c.shift_type, DutySchedule.update_time == subquery.c.latest_create_time ) ) # 计算总记录数 total_count = query.count() # 计算分页 offset = (page - 1) * pageSize query = query.order_by(DutySchedule.update_time.desc()) print(query) duty_data = query.offset(offset).limit(pageSize).all() # 转换为字典 data_list = [] for d in duty_data: user_data = [] user_list = duty_id_get_user_id(db,d.id) for user_info in user_list: contact_info = user_id_get_info(db, user_info.personnel_id) user_data.append({"position_id":user_info.position_id, "personnel_id":user_info.personnel_id, "name":contact_info.name, "position":contact_info.position, "mobile_phone":contact_info.mobile_phone, "office_phone":contact_info.office_phone, "department_id":contact_info.department_id, "yzy_userid":contact_info.userid}) data_list.append({ "id": d.id, "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '', "end_time":d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '', "duty_date": d.duty_date, "shift_type":d.shift_type, "duty_unit": d.duty_unit, "duty_type": d.duty_type, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '', "user_data":user_data }) # 构建返回结果 result = { "total": total_count, "page": page, "pageSize": pageSize, "totalPages": (total_count + pageSize - 1) // pageSize, "data": data_list, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.get('/info/{id}') async def get_dict_data_by_type( id: str , db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 根据 dict_type 查询字典数据 # dict_data = dict_type_get_dict_data_info(db,'three_proofing') query = db.query(DutySchedule) # 添加查询条件 # if dictType: query = query.filter(DutySchedule.id==id) query = query.filter(DutySchedule.del_flag != '2') d = query.first() user_data = [] user_list = duty_id_get_user_id(db, d.id) for user_info in user_list: contact_info = user_id_get_info(db, user_info.personnel_id) user_data.append({"position_id": user_info.position_id, "personnel_id": user_info.personnel_id, "name": contact_info.name, "position": contact_info.position, "mobile_phone": contact_info.mobile_phone, "office_phone": contact_info.office_phone, "department_id": contact_info.department_id, "yzy_userid": contact_info.userid}) data_list={ "id": d.id, "start_time": d.start_time, "end_time": d.end_time, "duty_date": d.duty_date, "shift_type": d.shift_type, "duty_unit": d.duty_unit, "duty_type": d.duty_type, "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '', "user_data": user_data } # 构建返回结果 result = { "data": data_list, "code": 200, "msg": "查询成功" } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.post('/create') async def create_dict_data( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 创建一个新的 SysDictData 实例 user_data = body['user_data'] if len(user_data)==0: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '值班人员不能为空' }) new_duty_data = DutySchedule( start_time=body['start_time'], end_time=body['end_time'], duty_date=body['duty_date'], shift_type=body['shift_type'], duty_unit=body['duty_unit'], duty_type=body['duty_type'], create_by=user_id ) # 添加到会话并提交 db.add(new_duty_data) db.commit() db.refresh(new_duty_data) user_list = [] for user_info in user_data: user_list.append( DutyPersonnelArrangement( duty_id=new_duty_data.id, position_id=user_info['position_id'], personnel_id=user_info['personnel_id'], create_by=user_id )) db.add_all(user_list) db.commit() # 构建返回结果 result = { "code": 200, "msg": "操作成功", "data": None } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.post('/list_create') async def create_dict_data( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 创建一个新的 SysDictData 实例 if isinstance(body,list) == False: return JSONResponse(status_code=500,content={'msg':"请求体非列表","code":500}) for data in body: user_data = data['user_data'] if len(user_data)==0: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '值班人员不能为空' }) new_duty_data = DutySchedule( start_time=data['start_time'], end_time=data['end_time'], duty_date=data['duty_date'], shift_type=data['shift_type'], duty_unit=data['duty_unit'], duty_type=data['duty_type'], create_by=user_id ) # 添加到会话并提交 db.add(new_duty_data) db.commit() db.refresh(new_duty_data) user_list = [] for user_info in user_data: user_list.append( DutyPersonnelArrangement( duty_id=new_duty_data.id, position_id=user_info['position_id'], personnel_id=user_info['personnel_id'], create_by=user_id )) db.add_all(user_list) db.commit() # 构建返回结果 result = { "code": 200, "msg": "操作成功", "data": None } return result except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.put("/update") async def updata_dict_type( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从请求数据创建一个新的 SysDictType 实例 query = db.query(DutySchedule) query = query.filter(DutySchedule.id == body['id']) query = query.filter(DutySchedule.del_flag != '2') # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode) # query = db.query(SysDictData).filter(SysDictData.del_flag != '2') duty_data = query.first() old_user_list = duty_id_get_user_id(db,duty_data.id) for info in old_user_list: info.del_flag = '2' db.commit() user_data = body['user_data'] if len(user_data)==0: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '值班人员不能为空' }) user_list = [] for user_info in user_data: user_list.append(DutyPersonnelArrangement( duty_id=duty_data.id, position_id=user_info['position_id'], personnel_id=user_info['personnel_id'], create_by=user_id )) db.add_all(user_list) if not duty_data: return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': '值班不存在' }) duty_data.start_time=body['start_time'] duty_data.end_time = body['end_time'] duty_data.duty_date = body['duty_date'] duty_data.shift_type=body['shift_type'] duty_data.duty_unit = body['duty_unit'] duty_data.duty_type = body['duty_type'] duty_data.update_by = user_id # 添加到数据库会话并提交 db.commit() # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态 # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None # 根据你的响应示例,data 为 null } except Exception as e: # 处理异常 traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.delete("/delete/{id}") # 使用 ID 来标识要删除的接口 async def delete_dict_data( id: str, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从数据库中获取要删除的 OneShareApiEntity 实例 query = db.query(DutySchedule) query = query.filter(DutySchedule.id == id) query = query.filter(DutySchedule.del_flag != '2') position_data = query.first() # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first() if not position_data: return JSONResponse(status_code=404, content={ 'code': 404, 'msg': '值班不存在' }) position_data.del_flag = '2' # 删除实例 # db.delete(api) db.commit() # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } except Exception as e: traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) @router.delete("/delete_list/{id_list}") # 使用 ID 来标识要删除的接口 async def delete_dict_data( id_list: str, db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): try: # 从数据库中获取要删除的 OneShareApiEntity 实例 id_list = [int(i) for i in id_list.split(',')] query = db.query(DutySchedule) query = query.filter(DutySchedule.id.in_(id_list)) query = query.filter(DutySchedule.del_flag != '2') position_data = query.all() # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first() if not position_data: return JSONResponse(status_code=404, content={ 'code': 404, 'msg': '值班不存在' }) for info in position_data: info.del_flag = '2' # 删除实例 # db.delete(api) db.commit() # 构建并返回响应 return { "code": 200, "msg": "操作成功", "data": None } except Exception as e: traceback.print_exc() return JSONResponse(status_code=404, content={ 'code': 404, 'msg': str(e) }) # 自定义排班 导出 @router.post('/zdypb/createImport') # 值班排班 导入 @router.post('/zbpb/createImport') async def create_contact( request: Request, background_tasks: BackgroundTasks, db: Session = Depends(get_db), body=Depends(remove_xss_json), auth_user: AuthUser = Depends(find_auth_user), user_id=Depends(valid_access_token) ): try: # 提取请求数据 filename = body['filename'] file_name_desc = body['file_name_desc'] duty_unit = body['duty_unit'] duty_type = '' if 'duty_type' in body: duty_type = body['duty_type'] if len(filename) == 0: raise Exception() file_path = f'/data/upload/mergefile/uploads/{filename}' # 检查文件是否存在 if not os.path.isfile(file_path): return JSONResponse(status_code=404, content={ 'errcode': 404, 'errmsg': f'{filename}不存在' }) msg = '成功' code =200 try: book = xlrd.open_workbook(file_path) sheet = book.sheet_by_index(0) except: msg = f'\n文件打开失败,请核实文件格式为xlsx/xlx>' code = 500 return JSONResponse(status_code=code, content={ "code": code, "msg": msg, "data": None }) duty_data = {} duty_persion_data = {} import_status = True for row in range(2, sheet.nrows): user_data = [] start_date = sheet.cell(row, 0).value if start_date == '': msg = f'\n行<{row + 1}>日期不能为空<{start_date}>' code = 500 start_time = sheet.cell(row, 1).value if start_time == '': msg = f'\n行<{row + 1}>日期不能为空<{start_time}>' code = 500 end_date = sheet.cell(row, 2).value if end_date == '': msg = f'\n行<{row + 1}>日期不能为空<{end_date}>' code = 500 end_time = sheet.cell(row, 3).value if end_time == '': msg = f'\n行<{row + 1}>日期不能为空<{end_time}>' code = 500 dbld = sheet.cell(row, 4).value if dbld == '': msg = f'\n行<{row + 1}>日期不能为空<{dbld}>' code = 500 for name in dbld.split(','): personnel_id_list = name_get_user_id(db, name) if len(personnel_id_list)==0: import_status = False msg = f'\n行<{row + 1}>非应急通讯录人员<{name}>' code = 500 for personnel_id in personnel_id_list: user_data.append({"position_id":1,"personnel_id":personnel_id}) kjdb = sheet.cell(row, 5).value if kjdb == '': msg = f'\n行<{row + 1}>日期不能为空<{kjdb}>' code = 500 for name in kjdb.split(','): personnel_id_list = name_get_user_id(db, name) if len(personnel_id_list)==0: import_status = False msg = f'\n行<{row + 1}>非应急通讯录人员<{name}>' code = 500 for personnel_id in personnel_id_list: user_data.append({"position_id":1,"personnel_id":personnel_id}) zb = sheet.cell(row, 6).value if zb == '': msg = f'\n行<{row + 1}>日期不能为空<{zb}>' code = 500 for name in zb.split(','): personnel_id_list = name_get_user_id(db, name) if len(personnel_id_list)==0: import_status = False msg = f'\n行<{row + 1}>非应急通讯录人员<{name}>' code = 500 for personnel_id in personnel_id_list: user_data.append({"position_id":1,"personnel_id":personnel_id}) zz = sheet.cell(row, 7).value if zz == '': msg = f'\n行<{row + 1}>日期不能为空<{zz}>' code = 500 for name in zz.split(','): personnel_id_list = name_get_user_id(db, name) if len(personnel_id_list)==0: import_status = False msg = f'\n行<{row + 1}>非应急通讯录人员<{name}>' code = 500 for personnel_id in personnel_id_list: user_data.append({"position_id":1,"personnel_id":personnel_id}) if start_date == end_date: shift_type = '1' else: if start_time==end_time: shift_type = '2' else: shift_type = '3' new_duty_data = DutySchedule( start_time=f'{start_date} {start_time}', end_time=f'{end_date} {end_time}', duty_date=start_date, shift_type=shift_type, duty_unit=duty_unit, duty_type=duty_type, create_by=user_id ) duty_data[start_date+str(shift_type)] = new_duty_data # 添加到会话并提交 duty_persion_data[start_date + str(shift_type)] = user_data # db.add(new_duty_data) # db.commit() # db.refresh(new_duty_data) # user_list = [] # for user_info in user_data: # user_list.append(DutyPersonnelArrangement( # duty_id=new_duty_data.id, # position_id=user_info['position_id'], # personnel_id=user_info['personnel_id'], # create_by=user_id # )) # db.add_all(user_list) # db.commit() if import_status: for duty in duty_data: new_duty_data = duty_data[duty] db.add(new_duty_data) db.commit() db.refresh(new_duty_data) user_list = [] for user_info in duty_persion_data[duty]: user_list.append(DutyPersonnelArrangement( duty_id=new_duty_data.id, position_id=user_info['position_id'], personnel_id=user_info['personnel_id'], create_by=user_id )) db.add_all(user_list) db.commit() except Exception as e: traceback.print_exc() # 处理异常 db.rollback() code = 500 msg = str(e) return JSONResponse(status_code=code, content={ "code": code, "msg": msg, "data": None }) # 值班排班 导出 @router.get("/zbpb/export") async def download_file(keywords:str =Query(None), duty_type:str =Query(None), start_time:str =Query(None), end_time:str =Query(None), duty_unit:int =Query(None), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): """ 根据提供的文件名下载文件。 :param filename: 要下载的文件的名称。 """ try: query = db.query(DutySchedule) query = query.filter(DutySchedule.del_flag != '2') # 添加查询条件 if keywords: user_list = name_get_user_id(db, keywords) duty_list = user_id_get_duty_id(db, user_list) query = query.filter(DutySchedule.id.in_(duty_list)) if duty_type: query = query.filter(DutySchedule.duty_type == duty_type) if start_time: start_time = datetime.strptime(start_time, "%Y-%m-%d").date() query = query.filter(DutySchedule.duty_date >= start_time) if end_time: end_time = datetime.strptime(end_time, "%Y-%m-%d").date() query = query.filter(DutySchedule.duty_date <= end_time) if duty_unit: query = query.filter(DutySchedule.duty_unit == duty_unit) subquery = db.query( DutySchedule.duty_date, DutySchedule.shift_type, func.max(DutySchedule.update_time).label("latest_create_time") ).filter(DutySchedule.del_flag != '2').group_by( DutySchedule.duty_date, DutySchedule.shift_type ).subquery() query = query.join( subquery, and_( DutySchedule.duty_date == subquery.c.duty_date, DutySchedule.shift_type == subquery.c.shift_type, DutySchedule.update_time == subquery.c.latest_create_time ) ) query = query.order_by(DutySchedule.duty_date) duty_data = query.all() # 转换为字典 data_list = [] for d in duty_data: data = { "值班开始日期": d.start_time.strftime('%Y-%m-%d') if d.start_time else '', "值班开始时间": d.start_time.strftime('%H:%M') if d.start_time else '', "值班结束日期": d.end_time.strftime('%Y-%m-%d') if d.end_time else '', "值班结束时间": d.end_time.strftime('%H:%M') if d.end_time else '', "带班领导": "" } user_data = {} user_list = duty_id_get_user_id(db, d.id) for user_info in user_list: contact_info = user_id_get_info(db, user_info.personnel_id) position= position_id_get_info(db,user_info.position_id) if position.position_name in user_data: user_data[position.position_name].append(contact_info.name) else: user_data[position.position_name]=[contact_info.name] for position_name in user_data: data[position_name] = ",".join(user_data[position_name]) data_list.append(data) # 构造文件的完整路径 import pandas as pd from io import BytesIO # 将查询结果转换为 DataFrame df = pd.DataFrame(data_list) # 将 DataFrame 导出为 Excel 文件 output = BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: df.to_excel(writer, index=False) # 设置响应头 output.seek(0) from urllib.parse import quote encoded_filename = f'值班排班{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx' encoded_filename = quote(encoded_filename, encoding='utf-8') headers = { 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}', 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' } # 返回文件流 return StreamingResponse(output, headers=headers) except HTTPException as e: raise e except Exception as e: # 处理其他异常情况 raise HTTPException(status_code=500, detail=str(e)) # 本单位值班 导出 @router.get("/bdwzb/export") # 各级单位值班表 导出 @router.get("/gjdwzbb/export") async def download_file(keywords:str =Query(None), start_time:str =Query(None), end_time:str =Query(None), duty_unit:int =Query(None), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): """ 根据提供的文件名下载文件。 :param filename: 要下载的文件的名称。 """ try: query = db.query(DutySchedule) query = query.filter(DutySchedule.del_flag != '2') # 添加查询条件 if keywords: user_list = name_get_user_id(db, keywords) duty_list = user_id_get_duty_id(db, user_list) query = query.filter(DutySchedule.id.in_(duty_list)) if start_time: start_time = datetime.strptime(start_time, "%Y-%m-%d").date() query = query.filter(DutySchedule.duty_date >= start_time) if end_time: end_time = datetime.strptime(end_time, "%Y-%m-%d").date() query = query.filter(DutySchedule.duty_date <= end_time) if duty_unit: query = query.filter(DutySchedule.duty_unit == duty_unit) else: now_user_info = user_id_get_user_info(db, user_id) print(user_id, now_user_info.phonenumber, get_duty_unit_id_list(db)) if now_user_info: query = query.filter( DutySchedule.duty_unit == mobile_phone_get_dept_id(db, mpfun.dec_data(now_user_info.phonenumber), get_duty_unit_id_list(db))) else: return { "data": [], "code": 200, "msg": "查询成功" } subquery = db.query( DutySchedule.duty_date, DutySchedule.shift_type, func.max(DutySchedule.update_time).label("latest_create_time") ).filter(DutySchedule.del_flag != '2').group_by( DutySchedule.duty_date, DutySchedule.shift_type ).subquery() query = query.join( subquery, and_( DutySchedule.duty_date == subquery.c.duty_date, DutySchedule.shift_type == subquery.c.shift_type, DutySchedule.update_time == subquery.c.latest_create_time ) ) query = query.order_by(DutySchedule.update_time.desc()) duty_data = query.all() # 转换为字典 data_list = [] for d in duty_data: user_data = [] leaders = [] user_list = duty_id_get_user_id(db, d.id) for user_info in user_list: contact_info = user_id_get_info(db, user_info.personnel_id) position= position_id_get_info(db,user_info.position_id) if position.position_name=='带班领导': leaders.append(contact_info.name) user_data.append(contact_info.name) data_list.append({ "单位名称": dept_id_get_info(db,d.duty_unit).department_name, "开始时间": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '', "结束时间": d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '', "带班领导": "、".join(leaders), "值班人员": "、".join(user_data) }) # 构造文件的完整路径 import pandas as pd from io import BytesIO # 将查询结果转换为 DataFrame df = pd.DataFrame(data_list) # 将 DataFrame 导出为 Excel 文件 output = BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: df.to_excel(writer, index=False) # 设置响应头 output.seek(0) from urllib.parse import quote encoded_filename = f'值班表{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx' encoded_filename = quote(encoded_filename, encoding='utf-8') headers = { 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}', 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' } # 返回文件流 return StreamingResponse(output, headers=headers) except HTTPException as e: raise e except Exception as e: # 处理其他异常情况 raise HTTPException(status_code=500, detail=str(e)) # 各级单位值班人 导出 @router.get("/gjdwzbr/export") async def download_file(keywords:str =Query(None), start_time:str =Query(None), end_time:str =Query(None), duty_unit:int =Query(None), db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token)): """ 根据提供的文件名下载文件。 :param filename: 要下载的文件的名称。 """ try: query = db.query(DutySchedule) query = query.filter(DutySchedule.del_flag != '2') # 添加查询条件 if keywords: user_list = name_get_user_id(db, keywords) duty_list = user_id_get_duty_id(db, user_list) query = query.filter(DutySchedule.id.in_(duty_list)) if start_time: start_time = datetime.strptime(start_time, "%Y-%m-%d").date() query = query.filter(DutySchedule.duty_date >= start_time) if end_time: end_time = datetime.strptime(end_time, "%Y-%m-%d").date() query = query.filter(DutySchedule.duty_date <= end_time) if duty_unit: query = query.filter(DutySchedule.duty_unit == duty_unit) else: now_user_info = user_id_get_user_info(db, user_id) print(user_id, now_user_info.phonenumber, get_duty_unit_id_list(db)) if now_user_info: query = query.filter( DutySchedule.duty_unit == mobile_phone_get_dept_id(db, mpfun.dec_data(now_user_info.phonenumber), get_duty_unit_id_list(db))) else: return { "data": [], "code": 200, "msg": "查询成功" } subquery = db.query( DutySchedule.duty_date, DutySchedule.shift_type, func.max(DutySchedule.update_time).label("latest_create_time") ).filter(DutySchedule.del_flag != '2').group_by( DutySchedule.duty_date, DutySchedule.shift_type ).subquery() query = query.join( subquery, and_( DutySchedule.duty_date == subquery.c.duty_date, DutySchedule.shift_type == subquery.c.shift_type, DutySchedule.update_time == subquery.c.latest_create_time ) ) query = query.order_by(DutySchedule.update_time.desc()) duty_data = query.all() # 转换为字典 data_list = [] yz_list = [] for d in duty_data: user_list = duty_id_get_user_id(db, d.id) for user_info in user_list: contact_info = user_id_get_info(db, user_info.personnel_id) position = position_id_get_info(db, user_info.position_id) department = dept_id_get_info(db,contact_info.department_id) yz = d.duty_date.strftime('%Y-%m-%d')+str(contact_info.id) if yz not in yz_list: yz_list.append(yz) data_list.append({"值班日期":d.duty_date, "值班机构": dept_id_get_info(db,d.duty_unit).department_name, "组织机构": department.department_name, "值班岗位": position.position_name, "值班人员": contact_info.name, "职务": contact_info.position, "联系电话": contact_info.mobile_phone, "办公电话": contact_info.office_phone}) # 构造文件的完整路径 import pandas as pd from io import BytesIO # 将查询结果转换为 DataFrame df = pd.DataFrame(data_list) # 将 DataFrame 导出为 Excel 文件 output = BytesIO() with pd.ExcelWriter(output, engine='openpyxl') as writer: df.to_excel(writer, index=False) # 设置响应头 output.seek(0) from urllib.parse import quote encoded_filename = f'值班人员{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx' encoded_filename = quote(encoded_filename, encoding='utf-8') headers = { 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}', 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet' } # 返回文件流 return StreamingResponse(output, headers=headers) except HTTPException as e: raise e except Exception as e: # 处理其他异常情况 raise HTTPException(status_code=500, detail=str(e))