|
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status, BackgroundTasks
- from common.security import valid_access_token
- from fastapi.responses import JSONResponse, FileResponse,StreamingResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import and_, or_
- from sqlalchemy.sql import func
- from common.auth_user import *
- from pydantic import BaseModel
- from common.db import db_dept
- from common.db import db_czrz
- from exceptions import AppException, HmacException
- from database import get_db
- from typing import List
- from models import *
- import xlrd
- from utils import *
- from utils.ry_system_util import *
- from utils.video_util import *
- import traceback
- import os
- import json
- router = APIRouter()
- def name_get_user_id(db,keywords):
- query = db.query(EmergencyContactUser)
- query = query.filter(EmergencyContactUser.del_flag != '2')
- query = query.filter(EmergencyContactUser.name.like(f"%{keywords}%"))
- data = query.all()
- return [info.id for info in data]
- def user_id_get_duty_id(db,user_id_list):
- query = db.query(DutyPersonnelArrangement)
- query = query.filter(DutyPersonnelArrangement.del_flag != '2')
- query = query.filter(DutyPersonnelArrangement.personnel_id.in_(user_id_list))
- data = query.all()
- return [info.duty_id for info in data]
- def duty_id_get_user_id(db,duty_id):
- query = db.query(DutyPersonnelArrangement)
- query = query.filter(DutyPersonnelArrangement.del_flag != '2')
- query = query.filter(DutyPersonnelArrangement.duty_id==duty_id)
- data = query.all()
- return data
- def user_id_get_info(db,user_id):
- query = db.query(EmergencyContactUser)
- query = query.filter(EmergencyContactUser.del_flag != '2')
- query = query.filter(EmergencyContactUser.id==user_id)
- data = query.first()
- return data
- def position_id_get_info(db,position_id):
- query = db.query(DutyPosition)
- query = query.filter(DutyPosition.del_flag != '2')
- query = query.filter(DutyPosition.id==position_id)
- data = query.first()
- return data
- def dept_id_get_info(db,dept_id):
- query = db.query(EmergencyContactDepartment)
- query = query.filter(EmergencyContactDepartment.del_flag != '2')
- query = query.filter(EmergencyContactDepartment.id==dept_id)
- data = query.first()
- return data
- def get_duty_unit_id_list(db):
- query = db.query(SysDictData)
- query = query.filter(SysDictData.del_flag != '2')
- query = query.filter(SysDictData.dict_type=='duty_unit')
- data = query.all()
- return [int(i.dict_value) for i in data]
- def dept_id_get_duty_unit_id(db,dept_id,duty_unit_id_list):
- if dept_id == 0:
- return 0
- if dept_id in duty_unit_id_list:
- return dept_id
- query = db.query(EmergencyContactDepartment)
- query = query.filter(EmergencyContactDepartment.del_flag != '2')
- query = query.filter(EmergencyContactDepartment.id==dept_id)
- data = query.first()
- if data:
- dept_id_get_duty_unit_id(db, data.parent_department_id, duty_unit_id_list)
- else:
- return 0
- def mobile_phone_get_dept_id(db,mobile_phone,duty_unit_id_list):
- query = db.query(EmergencyContactUser)
- query = query.filter(EmergencyContactUser.del_flag != '2')
- query = query.filter(EmergencyContactUser.mobile_phone.like(f'%{mobile_phone}%'))
- data = query.first()
- if data:
- dept_id = data.department_id
- dept_id = dept_id_get_duty_unit_id(db, dept_id, duty_unit_id_list)
- return dept_id
- else:
- return 0
- @router.get('/list')
- async def get_dict_data_by_type(
- keywords:str =Query(None),
- duty_type:str =Query(None),
- start_time:str =Query(None),
- end_time:str =Query(None),
- duty_unit:int =Query(None),
- page: int = Query(1, gt=0),
- pageSize: int = Query(10, gt=0),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2')
- # 添加查询条件
- if keywords:
- user_list= name_get_user_id(db,keywords)
- duty_list = user_id_get_duty_id(db,user_list)
- query = query.filter(DutySchedule.id.in_(duty_list))
- if duty_type:
- query = query.filter(DutySchedule.duty_type==duty_type)
- if start_time:
- start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date>=start_time)
- if end_time:
- end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date<=end_time)
- if duty_unit:
- query = query.filter(DutySchedule.duty_unit==duty_unit)
- # 计算总记录数
- total_count = query.count()
- # 计算分页
- offset = (page - 1) * pageSize
- query = query.order_by(DutySchedule.start_time.desc(),DutySchedule.end_time.desc(),DutySchedule.update_time.desc())
- duty_data = query.offset(offset).limit(pageSize).all()
- # 转换为字典
- data_list = []
- for d in duty_data:
- user_data = []
- user_list = duty_id_get_user_id(db,d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- user_data.append({"position_id":user_info.position_id,
- "personnel_id":user_info.personnel_id,
- "name":contact_info.name,
- "position":contact_info.position,
- "mobile_phone":contact_info.mobile_phone,
- "office_phone":contact_info.office_phone,
- "department_id":contact_info.department_id,
- "yzy_userid":contact_info.userid})
- data_list.append({
- "id": d.id,
- "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
- "end_time":d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
- "duty_date": d.duty_date,
- "shift_type":d.shift_type,
- "duty_unit": d.duty_unit,
- "duty_type": d.duty_type,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
- "user_data":user_data
- })
- # 构建返回结果
- result = {
- "total": total_count,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (total_count + pageSize - 1) // pageSize,
- "data": data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.get('/latest_by_group')
- async def get_dict_data_by_type(
- keywords:str =Query(None),
- start_time:str =Query(None),
- end_time:str =Query(None),
- duty_unit:int =Query(None),
- page: int = Query(1, gt=0),
- pageSize: int = Query(10, gt=0),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2')
- # 添加查询条件
- if keywords:
- user_list= name_get_user_id(db,keywords)
- duty_list = user_id_get_duty_id(db,user_list)
- query = query.filter(DutySchedule.id.in_(duty_list))
- if start_time:
- start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date>=start_time)
- if end_time:
- end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date<=end_time)
- if duty_unit:
- query = query.filter(DutySchedule.duty_unit==duty_unit)
- else:
- now_user_info=user_id_get_user_info(db,user_id)
- print(user_id,now_user_info.phonenumber,get_duty_unit_id_list(db))
- if now_user_info:
- query = query.filter(DutySchedule.duty_unit==mobile_phone_get_dept_id(db,mpfun.dec_data(now_user_info.phonenumber),get_duty_unit_id_list(db)))
- else:
- return {
- "total": 0,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (0 + pageSize - 1) // pageSize,
- "data": [],
- "code": 200,
- "msg": "查询成功"
- }
- subquery = db.query(
- DutySchedule.duty_date,
- DutySchedule.shift_type,
- func.max(DutySchedule.update_time).label("latest_create_time")
- ).filter(DutySchedule.del_flag != '2').group_by(
- DutySchedule.duty_date,
- DutySchedule.shift_type
- ).subquery()
- query = query.join(
- subquery,
- and_(
- DutySchedule.duty_date == subquery.c.duty_date,
- DutySchedule.shift_type == subquery.c.shift_type,
- DutySchedule.update_time == subquery.c.latest_create_time
- )
- )
- # 计算总记录数
- total_count = query.count()
- # 计算分页
- offset = (page - 1) * pageSize
- query = query.order_by(DutySchedule.update_time.desc())
- print(query)
- duty_data = query.offset(offset).limit(pageSize).all()
- # 转换为字典
- data_list = []
- for d in duty_data:
- user_data = []
- user_list = duty_id_get_user_id(db,d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- user_data.append({"position_id":user_info.position_id,
- "personnel_id":user_info.personnel_id,
- "name":contact_info.name,
- "position":contact_info.position,
- "mobile_phone":contact_info.mobile_phone,
- "office_phone":contact_info.office_phone,
- "department_id":contact_info.department_id,
- "yzy_userid":contact_info.userid})
- data_list.append({
- "id": d.id,
- "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
- "end_time":d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
- "duty_date": d.duty_date,
- "shift_type":d.shift_type,
- "duty_unit": d.duty_unit,
- "duty_type": d.duty_type,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
- "user_data":user_data
- })
- # 构建返回结果
- result = {
- "total": total_count,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (total_count + pageSize - 1) // pageSize,
- "data": data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.get('/info/{id}')
- async def get_dict_data_by_type(
- id: str ,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = dict_type_get_dict_data_info(db,'three_proofing')
- query = db.query(DutySchedule)
- # 添加查询条件
- # if dictType:
- query = query.filter(DutySchedule.id==id)
- query = query.filter(DutySchedule.del_flag != '2')
- d = query.first()
- user_data = []
- user_list = duty_id_get_user_id(db, d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- user_data.append({"position_id": user_info.position_id,
- "personnel_id": user_info.personnel_id,
- "name": contact_info.name,
- "position": contact_info.position,
- "mobile_phone": contact_info.mobile_phone,
- "office_phone": contact_info.office_phone,
- "department_id": contact_info.department_id,
- "yzy_userid": contact_info.userid})
- data_list={
- "id": d.id,
- "start_time": d.start_time,
- "end_time": d.end_time,
- "duty_date": d.duty_date,
- "shift_type": d.shift_type,
- "duty_unit": d.duty_unit,
- "duty_type": d.duty_type,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
- "user_data": user_data
- }
- # 构建返回结果
- result = {
- "data": data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.post('/create')
- async def create_dict_data(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 创建一个新的 SysDictData 实例
- user_data = body['user_data']
- if len(user_data)==0:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '值班人员不能为空'
- })
- new_duty_data = DutySchedule(
- start_time=body['start_time'],
- end_time=body['end_time'],
- duty_date=body['duty_date'],
- shift_type=body['shift_type'],
- duty_unit=body['duty_unit'],
- duty_type=body['duty_type'],
- create_by=user_id
- )
- # 添加到会话并提交
- db.add(new_duty_data)
- db.commit()
- db.refresh(new_duty_data)
- user_list = []
- for user_info in user_data:
- user_list.append( DutyPersonnelArrangement(
- duty_id=new_duty_data.id,
- position_id=user_info['position_id'],
- personnel_id=user_info['personnel_id'],
- create_by=user_id
- ))
- db.add_all(user_list)
- db.commit()
- # 构建返回结果
- result = {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.post('/list_create')
- async def create_dict_data(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 创建一个新的 SysDictData 实例
- if isinstance(body,list) == False:
- return JSONResponse(status_code=500,content={'msg':"请求体非列表","code":500})
- for data in body:
- user_data = data['user_data']
- if len(user_data)==0:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '值班人员不能为空'
- })
- new_duty_data = DutySchedule(
- start_time=data['start_time'],
- end_time=data['end_time'],
- duty_date=data['duty_date'],
- shift_type=data['shift_type'],
- duty_unit=data['duty_unit'],
- duty_type=data['duty_type'],
- create_by=user_id
- )
- # 添加到会话并提交
- db.add(new_duty_data)
- db.commit()
- db.refresh(new_duty_data)
- user_list = []
- for user_info in user_data:
- user_list.append( DutyPersonnelArrangement(
- duty_id=new_duty_data.id,
- position_id=user_info['position_id'],
- personnel_id=user_info['personnel_id'],
- create_by=user_id
- ))
- db.add_all(user_list)
- db.commit()
- # 构建返回结果
- result = {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.put("/update")
- async def updata_dict_type(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从请求数据创建一个新的 SysDictType 实例
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.id == body['id'])
- query = query.filter(DutySchedule.del_flag != '2')
- # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode)
- # query = db.query(SysDictData).filter(SysDictData.del_flag != '2')
- duty_data = query.first()
- old_user_list = duty_id_get_user_id(db,duty_data.id)
- for info in old_user_list:
- info.del_flag = '2'
- db.commit()
- user_data = body['user_data']
- if len(user_data)==0:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '值班人员不能为空'
- })
- user_list = []
- for user_info in user_data:
- user_list.append(DutyPersonnelArrangement(
- duty_id=duty_data.id,
- position_id=user_info['position_id'],
- personnel_id=user_info['personnel_id'],
- create_by=user_id
- ))
- db.add_all(user_list)
- if not duty_data:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '值班不存在'
- })
- duty_data.start_time=body['start_time']
- duty_data.end_time = body['end_time']
- duty_data.duty_date = body['duty_date']
- duty_data.shift_type=body['shift_type']
- duty_data.duty_unit = body['duty_unit']
- duty_data.duty_type = body['duty_type']
- duty_data.update_by = user_id
- # 添加到数据库会话并提交
- db.commit()
- # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None # 根据你的响应示例,data 为 null
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.delete("/delete/{id}") # 使用 ID 来标识要删除的接口
- async def delete_dict_data(
- id: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从数据库中获取要删除的 OneShareApiEntity 实例
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.id == id)
- query = query.filter(DutySchedule.del_flag != '2')
- position_data = query.first()
- # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
- if not position_data:
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': '值班不存在'
- })
- position_data.del_flag = '2'
- # 删除实例
- # db.delete(api)
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.delete("/delete_list/{id_list}") # 使用 ID 来标识要删除的接口
- async def delete_dict_data(
- id_list: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从数据库中获取要删除的 OneShareApiEntity 实例
- id_list = [int(i) for i in id_list.split(',')]
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.id.in_(id_list))
- query = query.filter(DutySchedule.del_flag != '2')
- position_data = query.all()
- # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
- if not position_data:
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': '值班不存在'
- })
- for info in position_data:
- info.del_flag = '2'
- # 删除实例
- # db.delete(api)
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- def foastr_to_datetime(datedata,timedata,book):
- if isinstance(datedata, str) and isinstance(timedata, str):
- result = datetime.strptime(datedata+" "+timedata,'%Y-%m-%d %H:%M')
- elif isinstance(datedata, str) and isinstance(timedata, float):
- date_part = datetime.strptime(datedata, "%Y-%m-%d").date()
- time_part = (datetime.min + timedelta(days=timedata)).time()
- result = datetime.combine(date_part, time_part)
- elif isinstance(datedata, float) and isinstance(timedata, str):
- date_part = xlrd.xldate.xldate_as_datetime(datedata, book.datemode).date()
- time_part = datetime.strptime(timedata, "%H:%M:%S").time()
- result = datetime.combine(date_part, time_part)
- else:
- result = xlrd.xldate.xldate_as_datetime(datedata+timedata, book.datemode)
- return result
- # 自定义排班 导出
- @router.post('/zdypb/createImport')
- # 值班排班 导入
- @router.post('/zbpb/createImport')
- async def create_contact(
- request: Request,
- background_tasks: BackgroundTasks,
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- auth_user: AuthUser = Depends(find_auth_user),
- user_id=Depends(valid_access_token)
- ):
- try:
- # 提取请求数据
- filename = body['filename']
- file_name_desc = body['file_name_desc']
- duty_unit = body['duty_unit']
- duty_type = ''
- if 'duty_type' in body:
- duty_type = body['duty_type']
- if len(filename) == 0:
- raise Exception()
- file_path = f'/data/upload/mergefile/uploads/{filename}'
- # 检查文件是否存在
- if not os.path.isfile(file_path):
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': f'{filename}不存在'
- })
- msg = '成功'
- code =200
- try:
- book = xlrd.open_workbook(file_path)
- sheet = book.sheet_by_index(0)
- except Exception as e:
- traceback.print_exc()
- msg = f'\n文件打开失败,请核实文件格式为xlsx/xlx>{str(e)}'
- code = 500
- return JSONResponse(status_code=code, content={
- "code": code,
- "msg": msg,
- "data": None
- })
- duty_data = {}
- duty_persion_data = {}
- import_status = True
- for row in range(2, sheet.nrows):
- user_data = []
- start_date = sheet.cell(row, 0).value
- if start_date == '' or start_date==0:
- msg = f'\n行<{row + 1}>开始日期不能为空<{start_date}>'
- code = 500
- start_time = sheet.cell(row, 1).value
- if start_time == '':
- msg = f'\n行<{row + 1}>开始时间不能为空<{start_time}>'
- code = 500
- start_time = foastr_to_datetime(start_date,start_time,book)
- end_date = sheet.cell(row, 2).value
- if end_date == '':
- msg = f'\n行<{row + 1}>结束日期不能为空<{end_date}>'
- code = 500
- end_time = sheet.cell(row, 3).value
- if end_time == '':
- msg = f'\n行<{row + 1}>结束时间不能为空<{end_time}>'
- code = 500
- end_time = foastr_to_datetime(end_date,end_time,book)
- dbld = sheet.cell(row, 4).value
- # if dbld == '':
- # msg = f'\n行<{row + 1}>带班领导不能为空<{dbld}>'
- # code = 500
- for name in dbld.split(','):
- personnel_id_list = name_get_user_id(db, name)
- if len(personnel_id_list)==0:
- import_status = False
- msg = f'\n行<{row + 1}>非应急通讯录人员<{name}>'
- code = 500
- for personnel_id in personnel_id_list:
- user_data.append({"position_id":1,"personnel_id":personnel_id})
- kjdb = sheet.cell(row, 5).value
- # if kjdb == '':
- # msg = f'\n行<{row + 1}>科级带班不能为空<{kjdb}>'
- # code = 500
- for name in kjdb.split(','):
- personnel_id_list = name_get_user_id(db, name)
- if len(personnel_id_list)==0:
- import_status = False
- msg = f'\n行<{row + 1}>非应急通讯录人员<{name}>'
- code = 500
- for personnel_id in personnel_id_list:
- user_data.append({"position_id":2,"personnel_id":personnel_id})
- zb = sheet.cell(row, 6).value
- # if zb == '':
- # msg = f'\n行<{row + 1}>主班不能为空<{zb}>'
- # code = 500
- for name in zb.split(','):
- personnel_id_list = name_get_user_id(db, name)
- if len(personnel_id_list)==0:
- import_status = False
- msg = f'\n行<{row + 1}>非应急通讯录人员<{name}>'
- code = 500
- for personnel_id in personnel_id_list:
- user_data.append({"position_id":8,"personnel_id":personnel_id})
- zz = sheet.cell(row, 7).value
- # if zz == '':
- # msg = f'\n行<{row + 1}>专职不能为空<{zz}>'
- # code = 500
- for name in zz.split(','):
- personnel_id_list = name_get_user_id(db, name)
- if len(personnel_id_list)==0:
- import_status = False
- msg = f'\n行<{row + 1}>非应急通讯录人员<{name}>'
- code = 500
- for personnel_id in personnel_id_list:
- user_data.append({"position_id":9,"personnel_id":personnel_id})
- start_date = start_time.strftime('%Y-%m-%d')
- end_date = end_time.strftime('%Y-%m-%d')
- if start_date == end_date:
- shift_type = '1'
- else:
- if start_time==end_time:
- shift_type = '3'
- else:
- shift_type = '2'
- new_duty_data = DutySchedule(
- start_time=start_time, #f'{start_date} {start_time}',
- end_time=end_time, #f'{end_date} {end_time}',
- duty_date=start_date,
- shift_type=shift_type,
- duty_unit=duty_unit,
- duty_type=duty_type,
- create_by=user_id
- )
- duty_data[start_date+str(shift_type)] = new_duty_data
- # 添加到会话并提交
- duty_persion_data[start_date+ str(shift_type)] = user_data
- # db.add(new_duty_data)
- # db.commit()
- # db.refresh(new_duty_data)
- # user_list = []
- # for user_info in user_data:
- # user_list.append(DutyPersonnelArrangement(
- # duty_id=new_duty_data.id,
- # position_id=user_info['position_id'],
- # personnel_id=user_info['personnel_id'],
- # create_by=user_id
- # ))
- # db.add_all(user_list)
- # db.commit()
- if import_status:
- for duty in duty_data:
- new_duty_data = duty_data[duty]
- db.add(new_duty_data)
- db.commit()
- db.refresh(new_duty_data)
- user_list = []
- for user_info in duty_persion_data[duty]:
- user_list.append(DutyPersonnelArrangement(
- duty_id=new_duty_data.id,
- position_id=user_info['position_id'],
- personnel_id=user_info['personnel_id'],
- create_by=user_id
- ))
- db.add_all(user_list)
- db.commit()
- except Exception as e:
- traceback.print_exc()
- # 处理异常
- db.rollback()
- code = 500
- msg = str(e)
- return JSONResponse(status_code=code, content={
- "code": code,
- "msg": msg,
- "data": None
- })
- # 值班排班 导出
- @router.get("/zbpb/export")
- async def download_file(keywords:str =Query(None),
- duty_type:str =Query(None),
- start_time:str =Query(None),
- end_time:str =Query(None),
- duty_unit:int =Query(None),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)):
- """
- 根据提供的文件名下载文件。
- :param filename: 要下载的文件的名称。
- """
- try:
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2')
- # 添加查询条件
- if keywords:
- user_list = name_get_user_id(db, keywords)
- duty_list = user_id_get_duty_id(db, user_list)
- query = query.filter(DutySchedule.id.in_(duty_list))
- if duty_type:
- query = query.filter(DutySchedule.duty_type == duty_type)
- if start_time:
- start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date >= start_time)
- if end_time:
- end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date <= end_time)
- if duty_unit:
- query = query.filter(DutySchedule.duty_unit == duty_unit)
- subquery = db.query(
- DutySchedule.duty_date,
- DutySchedule.shift_type,
- func.max(DutySchedule.update_time).label("latest_create_time")
- ).filter(DutySchedule.del_flag != '2').group_by(
- DutySchedule.duty_date,
- DutySchedule.shift_type
- ).subquery()
- query = query.join(
- subquery,
- and_(
- DutySchedule.duty_date == subquery.c.duty_date,
- DutySchedule.shift_type == subquery.c.shift_type,
- DutySchedule.update_time == subquery.c.latest_create_time
- )
- )
- query = query.order_by(DutySchedule.duty_date)
- duty_data = query.all()
- # 转换为字典
- data_list = []
- for d in duty_data:
- data = {
- "值班开始日期": d.start_time.strftime('%Y-%m-%d') if d.start_time else '',
- "值班开始时间": d.start_time.strftime('%H:%M') if d.start_time else '',
- "值班结束日期": d.end_time.strftime('%Y-%m-%d') if d.end_time else '',
- "值班结束时间": d.end_time.strftime('%H:%M') if d.end_time else '',
- "带班领导": ""
- }
- user_data = {}
- user_list = duty_id_get_user_id(db, d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- position= position_id_get_info(db,user_info.position_id)
- if position.position_name in user_data:
- user_data[position.position_name].append(contact_info.name)
- else:
- user_data[position.position_name]=[contact_info.name]
- for position_name in user_data:
- data[position_name] = ",".join(user_data[position_name])
- data_list.append(data)
- # 构造文件的完整路径
- import pandas as pd
- from io import BytesIO
- # 将查询结果转换为 DataFrame
- df = pd.DataFrame(data_list)
- # 将 DataFrame 导出为 Excel 文件
- output = BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- df.to_excel(writer, index=False)
- # 设置响应头
- output.seek(0)
- from urllib.parse import quote
- encoded_filename = f'值班排班{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx'
- encoded_filename = quote(encoded_filename, encoding='utf-8')
- headers = {
- 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
- 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
- }
- # 返回文件流
- return StreamingResponse(output, headers=headers)
- except HTTPException as e:
- raise e
- except Exception as e:
- # 处理其他异常情况
- raise HTTPException(status_code=500, detail=str(e))
- # 本单位值班 导出
- @router.get("/bdwzb/export")
- # 各级单位值班表 导出
- @router.get("/gjdwzbb/export")
- async def download_file(keywords:str =Query(None),
- start_time:str =Query(None),
- end_time:str =Query(None),
- duty_unit:int =Query(None),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)):
- """
- 根据提供的文件名下载文件。
- :param filename: 要下载的文件的名称。
- """
- try:
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2')
- # 添加查询条件
- if keywords:
- user_list = name_get_user_id(db, keywords)
- duty_list = user_id_get_duty_id(db, user_list)
- query = query.filter(DutySchedule.id.in_(duty_list))
- if start_time:
- start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date >= start_time)
- if end_time:
- end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date <= end_time)
- if duty_unit:
- query = query.filter(DutySchedule.duty_unit == duty_unit)
- else:
- now_user_info = user_id_get_user_info(db, user_id)
- print(user_id, now_user_info.phonenumber, get_duty_unit_id_list(db))
- if now_user_info:
- query = query.filter(
- DutySchedule.duty_unit == mobile_phone_get_dept_id(db, mpfun.dec_data(now_user_info.phonenumber),
- get_duty_unit_id_list(db)))
- else:
- return {
- "data": [],
- "code": 200,
- "msg": "查询成功"
- }
- subquery = db.query(
- DutySchedule.duty_date,
- DutySchedule.shift_type,
- func.max(DutySchedule.update_time).label("latest_create_time")
- ).filter(DutySchedule.del_flag != '2').group_by(
- DutySchedule.duty_date,
- DutySchedule.shift_type
- ).subquery()
- query = query.join(
- subquery,
- and_(
- DutySchedule.duty_date == subquery.c.duty_date,
- DutySchedule.shift_type == subquery.c.shift_type,
- DutySchedule.update_time == subquery.c.latest_create_time
- )
- )
- query = query.order_by(DutySchedule.update_time.desc())
- duty_data = query.all()
- # 转换为字典
- data_list = []
- for d in duty_data:
- user_data = []
- leaders = []
- user_list = duty_id_get_user_id(db, d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- position= position_id_get_info(db,user_info.position_id)
- if position.position_name=='带班领导':
- leaders.append(contact_info.name)
- user_data.append(contact_info.name)
- data_list.append({
- "单位名称": dept_id_get_info(db,d.duty_unit).department_name,
- "开始时间": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
- "结束时间": d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
- "带班领导": "、".join(leaders),
- "值班人员": "、".join(user_data)
- })
- # 构造文件的完整路径
- import pandas as pd
- from io import BytesIO
- # 将查询结果转换为 DataFrame
- df = pd.DataFrame(data_list)
- # 将 DataFrame 导出为 Excel 文件
- output = BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- df.to_excel(writer, index=False)
- # 设置响应头
- output.seek(0)
- from urllib.parse import quote
- encoded_filename = f'值班表{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx'
- encoded_filename = quote(encoded_filename, encoding='utf-8')
- headers = {
- 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
- 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
- }
- # 返回文件流
- return StreamingResponse(output, headers=headers)
- except HTTPException as e:
- raise e
- except Exception as e:
- # 处理其他异常情况
- raise HTTPException(status_code=500, detail=str(e))
- # 各级单位值班人 导出
- @router.get("/gjdwzbr/export")
- async def download_file(keywords:str =Query(None),
- start_time:str =Query(None),
- end_time:str =Query(None),
- duty_unit:int =Query(None),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)):
- """
- 根据提供的文件名下载文件。
- :param filename: 要下载的文件的名称。
- """
- try:
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2')
- # 添加查询条件
- if keywords:
- user_list = name_get_user_id(db, keywords)
- duty_list = user_id_get_duty_id(db, user_list)
- query = query.filter(DutySchedule.id.in_(duty_list))
- if start_time:
- start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date >= start_time)
- if end_time:
- end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date <= end_time)
- if duty_unit:
- query = query.filter(DutySchedule.duty_unit == duty_unit)
- else:
- now_user_info = user_id_get_user_info(db, user_id)
- print(user_id, now_user_info.phonenumber, get_duty_unit_id_list(db))
- if now_user_info:
- query = query.filter(
- DutySchedule.duty_unit == mobile_phone_get_dept_id(db, mpfun.dec_data(now_user_info.phonenumber),
- get_duty_unit_id_list(db)))
- else:
- return {
- "data": [],
- "code": 200,
- "msg": "查询成功"
- }
- subquery = db.query(
- DutySchedule.duty_date,
- DutySchedule.shift_type,
- func.max(DutySchedule.update_time).label("latest_create_time")
- ).filter(DutySchedule.del_flag != '2').group_by(
- DutySchedule.duty_date,
- DutySchedule.shift_type
- ).subquery()
- query = query.join(
- subquery,
- and_(
- DutySchedule.duty_date == subquery.c.duty_date,
- DutySchedule.shift_type == subquery.c.shift_type,
- DutySchedule.update_time == subquery.c.latest_create_time
- )
- )
- query = query.order_by(DutySchedule.update_time.desc())
- duty_data = query.all()
- # 转换为字典
- data_list = []
- yz_list = []
- for d in duty_data:
- user_list = duty_id_get_user_id(db, d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- position = position_id_get_info(db, user_info.position_id)
- department = dept_id_get_info(db,contact_info.department_id)
- yz = d.duty_date.strftime('%Y-%m-%d')+str(contact_info.id)
- if yz not in yz_list:
- yz_list.append(yz)
- data_list.append({"值班日期":d.duty_date,
- "值班机构": dept_id_get_info(db,d.duty_unit).department_name,
- "组织机构": department.department_name,
- "值班岗位": position.position_name,
- "值班人员": contact_info.name,
- "职务": contact_info.position,
- "联系电话": contact_info.mobile_phone,
- "办公电话": contact_info.office_phone})
- # 构造文件的完整路径
- import pandas as pd
- from io import BytesIO
- # 将查询结果转换为 DataFrame
- df = pd.DataFrame(data_list)
- # 将 DataFrame 导出为 Excel 文件
- output = BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- df.to_excel(writer, index=False)
- # 设置响应头
- output.seek(0)
- from urllib.parse import quote
- encoded_filename = f'值班人员{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx'
- encoded_filename = quote(encoded_filename, encoding='utf-8')
- headers = {
- 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
- 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
- }
- # 返回文件流
- return StreamingResponse(output, headers=headers)
- except HTTPException as e:
- raise e
- except Exception as e:
- # 处理其他异常情况
- raise HTTPException(status_code=500, detail=str(e))
|