1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends, Query, HTTPException, status, BackgroundTasks
- from common.security import valid_access_token
- from fastapi.responses import JSONResponse, FileResponse,StreamingResponse
- from sqlalchemy.orm import Session
- from sqlalchemy import and_, or_
- from sqlalchemy.sql import func
- from common.auth_user import *
- from pydantic import BaseModel
- from common.db import db_dept
- from common.db import db_czrz
- from exceptions import AppException, HmacException
- from database import get_db
- from typing import List
- from models import *
- import xlrd
- from utils import *
- from utils.ry_system_util import *
- from utils.video_util import *
- import traceback
- import os
- import json
- router = APIRouter()
- def name_get_user_id(db,keywords,islike=True):
- query = db.query(EmergencyContactUser)
- query = query.filter(EmergencyContactUser.del_flag != '2')
- if islike:
- query = query.filter(EmergencyContactUser.name.like(f"%{keywords}%"))
- else:
- query = query.filter(EmergencyContactUser.name==keywords)
- data = query.all()
- return [info.id for info in data]
- def user_id_get_duty_id(db,user_id_list):
- query = db.query(DutyPersonnelArrangement)
- query = query.filter(DutyPersonnelArrangement.del_flag != '2')
- query = query.filter(DutyPersonnelArrangement.personnel_id.in_(user_id_list))
- data = query.all()
- return [info.duty_id for info in data]
- def duty_id_get_user_id(db,duty_id):
- query = db.query(DutyPersonnelArrangement)
- query = query.filter(DutyPersonnelArrangement.del_flag != '2')
- query = query.filter(DutyPersonnelArrangement.duty_id==duty_id)
- data = query.all()
- return data
- def user_id_get_info(db,user_id):
- query = db.query(EmergencyContactUser)
- query = query.filter(EmergencyContactUser.del_flag != '2')
- query = query.filter(EmergencyContactUser.id==user_id)
- data = query.first()
- return data
- def position_id_get_info(db,position_id):
- query = db.query(DutyPosition)
- query = query.filter(DutyPosition.del_flag != '2')
- query = query.filter(DutyPosition.id==position_id)
- data = query.first()
- return data
- def dept_id_get_info(db,dept_id):
- query = db.query(EmergencyContactDepartment)
- query = query.filter(EmergencyContactDepartment.del_flag != '2')
- query = query.filter(EmergencyContactDepartment.id==dept_id)
- data = query.first()
- return data
- def get_duty_unit_id_list(db):
- query = db.query(SysDictData)
- query = query.filter(SysDictData.del_flag != '2')
- query = query.filter(SysDictData.dict_type=='duty_unit')
- data = query.all()
- return [int(i.dict_value) for i in data]
- def dept_id_get_duty_unit_id(db,dept_id,duty_unit_id_list):
- if dept_id == 0:
- return 0
- if dept_id in duty_unit_id_list:
- return dept_id
- query = db.query(EmergencyContactDepartment)
- query = query.filter(EmergencyContactDepartment.del_flag != '2')
- query = query.filter(EmergencyContactDepartment.id==dept_id)
- data = query.first()
- if data:
- dept_id_get_duty_unit_id(db, data.parent_department_id, duty_unit_id_list)
- else:
- return 0
- def mobile_phone_get_dept_id(db,mobile_phone,duty_unit_id_list):
- query = db.query(EmergencyContactUser)
- query = query.filter(EmergencyContactUser.del_flag != '2')
- query = query.filter(EmergencyContactUser.mobile_phone.like(f'%{mobile_phone}%'))
- data = query.first()
- if data:
- dept_id = data.department_id
- dept_id = dept_id_get_duty_unit_id(db, dept_id, duty_unit_id_list)
- return dept_id
- else:
- return 0
- @router.get('/list')
- async def get_dict_data_by_type(
- keywords:str =Query(None),
- duty_type:str =Query(None),
- start_time:str =Query(None),
- end_time:str =Query(None),
- duty_unit:int =Query(None),
- page: int = Query(1, gt=0),
- pageSize: int = Query(10, gt=0),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2')
- # 添加查询条件
- if keywords:
- user_list= name_get_user_id(db,keywords)
- duty_list = user_id_get_duty_id(db,user_list)
- query = query.filter(DutySchedule.id.in_(duty_list))
- if duty_type:
- query = query.filter(DutySchedule.duty_type==duty_type)
- if start_time:
- start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date>=start_time)
- if end_time:
- end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date<=end_time)
- if duty_unit:
- query = query.filter(DutySchedule.duty_unit==duty_unit)
- # 计算总记录数
- total_count = query.count()
- # 计算分页
- offset = (page - 1) * pageSize
- query = query.order_by(DutySchedule.start_time.desc(),DutySchedule.end_time.desc(),DutySchedule.update_time.desc())
- duty_data = query.offset(offset).limit(pageSize).all()
- # 转换为字典
- data_list = []
- for d in duty_data:
- user_data = []
- user_list = duty_id_get_user_id(db,d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- user_data.append({"position_id":user_info.position_id,
- "personnel_id":user_info.personnel_id,
- "name":contact_info.name,
- "position":contact_info.position,
- "mobile_phone":contact_info.mobile_phone,
- "office_phone":contact_info.office_phone,
- "department_id":contact_info.department_id,
- "yzy_userid":contact_info.userid})
- data_list.append({
- "id": d.id,
- "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
- "end_time":d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
- "duty_date": d.duty_date,
- "shift_type":d.shift_type,
- "duty_unit": d.duty_unit,
- "duty_type": d.duty_type,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
- "user_data":user_data
- })
- # 构建返回结果
- result = {
- "total": total_count,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (total_count + pageSize - 1) // pageSize,
- "data": data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.get('/latest_by_group')
- async def get_dict_data_by_type(
- keywords:str =Query(None),
- start_time:str =Query(None),
- end_time:str =Query(None),
- duty_unit:int =Query(None),
- page: int = Query(1, gt=0),
- pageSize: int = Query(10, gt=0),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = db.query(SysDictData).filter_by(dict_type=dictType).all()
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2')
- # 添加查询条件
- if keywords:
- user_list= name_get_user_id(db,keywords)
- duty_list = user_id_get_duty_id(db,user_list)
- query = query.filter(DutySchedule.id.in_(duty_list))
- if start_time:
- start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date>=start_time)
- if end_time:
- end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date<=end_time)
- if duty_unit:
- query = query.filter(DutySchedule.duty_unit==duty_unit)
- else:
- now_user_info=user_id_get_user_info(db,user_id)
- print(user_id,now_user_info.phonenumber,get_duty_unit_id_list(db))
- if now_user_info:
- query = query.filter(DutySchedule.duty_unit==mobile_phone_get_dept_id(db,mpfun.dec_data(now_user_info.phonenumber),get_duty_unit_id_list(db)))
- else:
- return {
- "total": 0,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (0 + pageSize - 1) // pageSize,
- "data": [],
- "code": 200,
- "msg": "查询成功"
- }
- subquery = db.query(
- DutySchedule.duty_date,
- DutySchedule.shift_type,
- func.max(DutySchedule.update_time).label("latest_create_time")
- ).filter(DutySchedule.del_flag != '2').group_by(
- DutySchedule.duty_date,
- DutySchedule.shift_type
- ).subquery()
- query = query.join(
- subquery,
- and_(
- DutySchedule.duty_date == subquery.c.duty_date,
- DutySchedule.shift_type == subquery.c.shift_type,
- DutySchedule.update_time == subquery.c.latest_create_time
- )
- )
- # 计算总记录数
- total_count = query.count()
- # 计算分页
- offset = (page - 1) * pageSize
- query = query.order_by(DutySchedule.update_time.desc())
- print(query)
- duty_data = query.offset(offset).limit(pageSize).all()
- # 转换为字典
- data_list = []
- for d in duty_data:
- user_data = []
- user_list = duty_id_get_user_id(db,d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- user_data.append({"position_id":user_info.position_id,
- "personnel_id":user_info.personnel_id,
- "name":contact_info.name,
- "position":contact_info.position,
- "mobile_phone":contact_info.mobile_phone,
- "office_phone":contact_info.office_phone,
- "department_id":contact_info.department_id,
- "yzy_userid":contact_info.userid})
- data_list.append({
- "id": d.id,
- "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
- "end_time":d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
- "duty_date": d.duty_date,
- "shift_type":d.shift_type,
- "duty_unit": d.duty_unit,
- "duty_type": d.duty_type,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
- "user_data":user_data
- })
- # 构建返回结果
- result = {
- "total": total_count,
- "page": page,
- "pageSize": pageSize,
- "totalPages": (total_count + pageSize - 1) // pageSize,
- "data": data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.get('/info/{id}')
- async def get_dict_data_by_type(
- id: str ,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 根据 dict_type 查询字典数据
- # dict_data = dict_type_get_dict_data_info(db,'three_proofing')
- query = db.query(DutySchedule)
- # 添加查询条件
- # if dictType:
- query = query.filter(DutySchedule.id==id)
- query = query.filter(DutySchedule.del_flag != '2')
- d = query.first()
- user_data = []
- user_list = duty_id_get_user_id(db, d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- user_data.append({"position_id": user_info.position_id,
- "personnel_id": user_info.personnel_id,
- "name": contact_info.name,
- "position": contact_info.position,
- "mobile_phone": contact_info.mobile_phone,
- "office_phone": contact_info.office_phone,
- "department_id": contact_info.department_id,
- "yzy_userid": contact_info.userid})
- data_list={
- "id": d.id,
- "start_time": d.start_time,
- "end_time": d.end_time,
- "duty_date": d.duty_date,
- "shift_type": d.shift_type,
- "duty_unit": d.duty_unit,
- "duty_type": d.duty_type,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
- "user_data": user_data
- }
- # 构建返回结果
- result = {
- "data": data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.post('/create')
- async def create_dict_data(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 创建一个新的 SysDictData 实例
- user_data = body['user_data']
- if len(user_data)==0:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '值班人员不能为空'
- })
- new_duty_data = DutySchedule(
- start_time=body['start_time'],
- end_time=body['end_time'],
- duty_date=body['duty_date'],
- shift_type=body['shift_type'],
- duty_unit=body['duty_unit'],
- duty_type=body['duty_type'],
- create_by=user_id
- )
- # 添加到会话并提交
- db.add(new_duty_data)
- db.commit()
- db.refresh(new_duty_data)
- user_list = []
- for user_info in user_data:
- user_list.append( DutyPersonnelArrangement(
- duty_id=new_duty_data.id,
- position_id=user_info['position_id'],
- personnel_id=user_info['personnel_id'],
- create_by=user_id
- ))
- db.add_all(user_list)
- db.commit()
- # 构建返回结果
- result = {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.post('/list_create')
- async def create_dict_data(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 创建一个新的 SysDictData 实例
- if isinstance(body,list) == False:
- return JSONResponse(status_code=500,content={'msg':"请求体非列表","code":500})
- for data in body:
- user_data = data['user_data']
- if len(user_data)==0:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '值班人员不能为空'
- })
- new_duty_data = DutySchedule(
- start_time=data['start_time'],
- end_time=data['end_time'],
- duty_date=data['duty_date'],
- shift_type=data['shift_type'],
- duty_unit=data['duty_unit'],
- duty_type=data['duty_type'],
- create_by=user_id
- )
- # 添加到会话并提交
- db.add(new_duty_data)
- db.commit()
- db.refresh(new_duty_data)
- user_list = []
- for user_info in user_data:
- user_list.append( DutyPersonnelArrangement(
- duty_id=new_duty_data.id,
- position_id=user_info['position_id'],
- personnel_id=user_info['personnel_id'],
- create_by=user_id
- ))
- db.add_all(user_list)
- db.commit()
- # 构建返回结果
- result = {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.put("/update")
- async def updata_dict_type(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从请求数据创建一个新的 SysDictType 实例
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.id == body['id'])
- query = query.filter(DutySchedule.del_flag != '2')
- # query = db.query(SysDictData).filter(SysDictData.dict_code == form_data.dictCode)
- # query = db.query(SysDictData).filter(SysDictData.del_flag != '2')
- duty_data = query.first()
- old_user_list = duty_id_get_user_id(db,duty_data.id)
- for info in old_user_list:
- info.del_flag = '2'
- db.commit()
- user_data = body['user_data']
- if len(user_data)==0:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '值班人员不能为空'
- })
- user_list = []
- for user_info in user_data:
- user_list.append(DutyPersonnelArrangement(
- duty_id=duty_data.id,
- position_id=user_info['position_id'],
- personnel_id=user_info['personnel_id'],
- create_by=user_id
- ))
- db.add_all(user_list)
- if not duty_data:
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': '值班不存在'
- })
- duty_data.start_time=body['start_time']
- duty_data.end_time = body['end_time']
- duty_data.duty_date = body['duty_date']
- duty_data.shift_type=body['shift_type']
- duty_data.duty_unit = body['duty_unit']
- duty_data.duty_type = body['duty_type']
- duty_data.update_by = user_id
- # 添加到数据库会话并提交
- db.commit()
- # db.refresh(new_dict_type) # 可选,如果需要刷新实例状态
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None # 根据你的响应示例,data 为 null
- }
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.delete("/delete/{id}") # 使用 ID 来标识要删除的接口
- async def delete_dict_data(
- id: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从数据库中获取要删除的 OneShareApiEntity 实例
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.id == id)
- query = query.filter(DutySchedule.del_flag != '2')
- position_data = query.first()
- # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
- if not position_data:
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': '值班不存在'
- })
- position_data.del_flag = '2'
- # 删除实例
- # db.delete(api)
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- @router.delete("/delete_list/{id_list}") # 使用 ID 来标识要删除的接口
- async def delete_dict_data(
- id_list: str,
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)
- ):
- try:
- # 从数据库中获取要删除的 OneShareApiEntity 实例
- id_list = [int(i) for i in id_list.split(',')]
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.id.in_(id_list))
- query = query.filter(DutySchedule.del_flag != '2')
- position_data = query.all()
- # dict_data = db.query(SysDictData).filter(SysDictData.dict_code == dictCode and SysDictData.del_flag != '2').first()
- if not position_data:
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': '值班不存在'
- })
- for info in position_data:
- info.del_flag = '2'
- # 删除实例
- # db.delete(api)
- db.commit()
- # 构建并返回响应
- return {
- "code": 200,
- "msg": "操作成功",
- "data": None
- }
- except Exception as e:
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- def foastr_to_datetime(datedata,timedata,book):
- if isinstance(datedata, str) and isinstance(timedata, str):
- result = datetime.strptime(datedata+" "+timedata,'%Y-%m-%d %H:%M')
- elif isinstance(datedata, str) and isinstance(timedata, float):
- date_part = datetime.strptime(datedata, "%Y-%m-%d").date()
- time_part = (datetime.min + timedelta(days=timedata)).time()
- result = datetime.combine(date_part, time_part)
- elif isinstance(datedata, float) and isinstance(timedata, str):
- date_part = xlrd.xldate.xldate_as_datetime(datedata, book.datemode).date()
- time_part = datetime.strptime(timedata, "%H:%M:%S").time()
- result = datetime.combine(date_part, time_part)
- else:
- result = xlrd.xldate.xldate_as_datetime(datedata+timedata, book.datemode)
- return result
- # 自定义排班 导出
- @router.post('/zdypb/createImport')
- # 值班排班 导入
- @router.post('/zbpb/createImport')
- async def create_contact(
- request: Request,
- background_tasks: BackgroundTasks,
- db: Session = Depends(get_db),
- body=Depends(remove_xss_json),
- auth_user: AuthUser = Depends(find_auth_user),
- user_id=Depends(valid_access_token)
- ):
- try:
- # 提取请求数据
- filename = body['filename']
- file_name_desc = body['file_name_desc']
- duty_unit = body['duty_unit']
- duty_type = ''
- if 'duty_type' in body:
- duty_type = body['duty_type']
- if len(filename) == 0:
- raise Exception()
- file_path = f'/data/upload/mergefile/uploads/{filename}'
- # 检查文件是否存在
- if not os.path.isfile(file_path):
- return JSONResponse(status_code=404, content={
- 'errcode': 404,
- 'errmsg': f'{filename}不存在'
- })
- msg = '成功'
- code =200
- try:
- book = xlrd.open_workbook(file_path)
- sheet = book.sheet_by_index(0)
- except Exception as e:
- traceback.print_exc()
- msg = f'\n文件打开失败,请核实文件格式为xlsx/xlx>{str(e)}'
- code = 500
- return JSONResponse(status_code=code, content={
- "code": code,
- "msg": msg,
- "data": None
- })
- duty_data = {}
- duty_persion_data = {}
- import_status = True
- for row in range(5, sheet.nrows):
- user_data = []
- start_date = sheet.cell(row, 0).value
- if start_date == '' or start_date==0:
- msg = f'\n行<{row + 1}>开始日期不能为空<{start_date}>'
- code = 500
- start_time = sheet.cell(row, 1).value
- if start_time == '':
- msg = f'\n行<{row + 1}>开始时间不能为空<{start_time}>'
- code = 500
- start_time = foastr_to_datetime(start_date,start_time,book)
- end_date = sheet.cell(row, 2).value
- if end_date == '':
- msg = f'\n行<{row + 1}>结束日期不能为空<{end_date}>'
- code = 500
- end_time = sheet.cell(row, 3).value
- if end_time == '':
- msg = f'\n行<{row + 1}>结束时间不能为空<{end_time}>'
- code = 500
- end_time = foastr_to_datetime(end_date,end_time,book)
- dbld = sheet.cell(row, 4).value
- # if dbld == '':
- # msg = f'\n行<{row + 1}>带班领导不能为空<{dbld}>'
- # code = 500
- for name in dbld.split(','):
- if name=='':
- continue
- personnel_id_list = name_get_user_id(db, name,False)
- if len(personnel_id_list)==0:
- import_status = False
- msg = f'\n行<{row + 1}>非应急通讯录人员<{name}>'
- code = 500
- for personnel_id in personnel_id_list:
- user_data.append({"position_id":1,"personnel_id":personnel_id})
- kjdb = sheet.cell(row, 5).value
- # if kjdb == '':
- # msg = f'\n行<{row + 1}>科级带班不能为空<{kjdb}>'
- # code = 500
- for name in kjdb.split(','):
- if name=='':
- continue
- personnel_id_list = name_get_user_id(db, name,False)
- if len(personnel_id_list)==0:
- import_status = False
- msg = f'\n行<{row + 1}>非应急通讯录人员<{name}>'
- code = 500
- for personnel_id in personnel_id_list:
- user_data.append({"position_id":2,"personnel_id":personnel_id})
- zb = sheet.cell(row, 6).value
- # if zb == '':
- # msg = f'\n行<{row + 1}>主班不能为空<{zb}>'
- # code = 500
- for name in zb.split(','):
- if name=='':
- continue
- personnel_id_list = name_get_user_id(db, name,False)
- if len(personnel_id_list)==0:
- import_status = False
- msg = f'\n行<{row + 1}>非应急通讯录人员<{name}>'
- code = 500
- for personnel_id in personnel_id_list:
- user_data.append({"position_id":8,"personnel_id":personnel_id})
- zz = sheet.cell(row, 7).value
- # if zz == '':
- # msg = f'\n行<{row + 1}>专职不能为空<{zz}>'
- # code = 500
- for name in zz.split(','):
- if name=='':
- continue
- personnel_id_list = name_get_user_id(db, name,False)
- if len(personnel_id_list)==0:
- import_status = False
- msg = f'\n行<{row + 1}>非应急通讯录人员<{name}>'
- code = 500
- for personnel_id in personnel_id_list:
- user_data.append({"position_id":9,"personnel_id":personnel_id})
- start_date = start_time.strftime('%Y-%m-%d')
- end_date = end_time.strftime('%Y-%m-%d')
- if start_date == end_date:
- shift_type = '1'
- else:
- if start_time==end_time:
- shift_type = '3'
- else:
- shift_type = '2'
- new_duty_data = DutySchedule(
- start_time=start_time, #f'{start_date} {start_time}',
- end_time=end_time, #f'{end_date} {end_time}',
- duty_date=start_date,
- shift_type=shift_type,
- duty_unit=duty_unit,
- duty_type=duty_type,
- create_by=user_id
- )
- # print(start_date+str(shift_type))
- duty_data[start_date+str(shift_type)] = new_duty_data
- # 添加到会话并提交
- duty_persion_data[start_date+str(shift_type)] = user_data
- # print(duty_persion_data)
- # db.add(new_duty_data)
- # db.commit()
- # db.refresh(new_duty_data)
- # user_list = []
- # for user_info in user_data:
- # user_list.append(DutyPersonnelArrangement(
- # duty_id=new_duty_data.id,
- # position_id=user_info['position_id'],
- # personnel_id=user_info['personnel_id'],
- # create_by=user_id
- # ))
- # db.add_all(user_list)
- # db.commit()
- if import_status:
- for duty in duty_data:
- new_duty_data = duty_data[duty]
- db.add(new_duty_data)
- db.commit()
- db.refresh(new_duty_data)
- user_list = []
- for user_info in duty_persion_data[duty]:
- # print(new_duty_data.id,user_info)
- user_list.append(DutyPersonnelArrangement(
- duty_id=new_duty_data.id,
- position_id=user_info['position_id'],
- personnel_id=user_info['personnel_id'],
- create_by=user_id
- ))
- db.add_all(user_list)
- db.commit()
- except Exception as e:
- traceback.print_exc()
- # 处理异常
- db.rollback()
- code = 500
- msg = str(e)
- return JSONResponse(status_code=code, content={
- "code": code,
- "msg": msg,
- "data": None
- })
- # 值班排班 导出
- @router.get("/zbpb/export")
- async def download_file(keywords:str =Query(None),
- duty_type:str =Query(None),
- start_time:str =Query(None),
- end_time:str =Query(None),
- duty_unit:int =Query(None),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)):
- """
- 根据提供的文件名下载文件。
- :param filename: 要下载的文件的名称。
- """
- try:
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2')
- # 添加查询条件
- if keywords:
- user_list = name_get_user_id(db, keywords)
- duty_list = user_id_get_duty_id(db, user_list)
- query = query.filter(DutySchedule.id.in_(duty_list))
- if duty_type:
- query = query.filter(DutySchedule.duty_type == duty_type)
- if start_time:
- start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date >= start_time)
- if end_time:
- end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date <= end_time)
- if duty_unit:
- query = query.filter(DutySchedule.duty_unit == duty_unit)
- subquery = db.query(
- DutySchedule.duty_date,
- DutySchedule.shift_type,
- func.max(DutySchedule.update_time).label("latest_create_time")
- ).filter(DutySchedule.del_flag != '2').group_by(
- DutySchedule.duty_date,
- DutySchedule.shift_type
- ).subquery()
- query = query.join(
- subquery,
- and_(
- DutySchedule.duty_date == subquery.c.duty_date,
- DutySchedule.shift_type == subquery.c.shift_type,
- DutySchedule.update_time == subquery.c.latest_create_time
- )
- )
- query = query.order_by(DutySchedule.duty_date)
- duty_data = query.all()
- # 转换为字典
- data_list = []
- for d in duty_data:
- data = {
- "值班开始日期": d.start_time.strftime('%Y-%m-%d') if d.start_time else '',
- "值班开始时间": d.start_time.strftime('%H:%M') if d.start_time else '',
- "值班结束日期": d.end_time.strftime('%Y-%m-%d') if d.end_time else '',
- "值班结束时间": d.end_time.strftime('%H:%M') if d.end_time else '',
- "带班领导": ""
- }
- user_data = {}
- user_list = duty_id_get_user_id(db, d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- position= position_id_get_info(db,user_info.position_id)
- if position.position_name in user_data:
- user_data[position.position_name].append(contact_info.name)
- else:
- user_data[position.position_name]=[contact_info.name]
- for position_name in user_data:
- data[position_name] = ",".join(user_data[position_name])
- data_list.append(data)
- # 构造文件的完整路径
- import pandas as pd
- from io import BytesIO
- # 将查询结果转换为 DataFrame
- df = pd.DataFrame(data_list)
- # 将 DataFrame 导出为 Excel 文件
- output = BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- df.to_excel(writer, index=False)
- # 设置响应头
- output.seek(0)
- from urllib.parse import quote
- encoded_filename = f'值班排班{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx'
- encoded_filename = quote(encoded_filename, encoding='utf-8')
- headers = {
- 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
- 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
- }
- # 返回文件流
- return StreamingResponse(output, headers=headers)
- except HTTPException as e:
- raise e
- except Exception as e:
- # 处理其他异常情况
- raise HTTPException(status_code=500, detail=str(e))
- # 本单位值班 导出
- @router.get("/bdwzb/export")
- # 各级单位值班表 导出
- @router.get("/gjdwzbb/export")
- async def download_file(keywords:str =Query(None),
- start_time:str =Query(None),
- end_time:str =Query(None),
- duty_unit:int =Query(None),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)):
- """
- 根据提供的文件名下载文件。
- :param filename: 要下载的文件的名称。
- """
- try:
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2')
- # 添加查询条件
- if keywords:
- user_list = name_get_user_id(db, keywords)
- duty_list = user_id_get_duty_id(db, user_list)
- query = query.filter(DutySchedule.id.in_(duty_list))
- if start_time:
- start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date >= start_time)
- if end_time:
- end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date <= end_time)
- if duty_unit:
- query = query.filter(DutySchedule.duty_unit == duty_unit)
- else:
- now_user_info = user_id_get_user_info(db, user_id)
- print(user_id, now_user_info.phonenumber, get_duty_unit_id_list(db))
- if now_user_info:
- query = query.filter(
- DutySchedule.duty_unit == mobile_phone_get_dept_id(db, mpfun.dec_data(now_user_info.phonenumber),
- get_duty_unit_id_list(db)))
- else:
- return {
- "data": [],
- "code": 200,
- "msg": "查询成功"
- }
- subquery = db.query(
- DutySchedule.duty_date,
- DutySchedule.shift_type,
- func.max(DutySchedule.update_time).label("latest_create_time")
- ).filter(DutySchedule.del_flag != '2').group_by(
- DutySchedule.duty_date,
- DutySchedule.shift_type
- ).subquery()
- query = query.join(
- subquery,
- and_(
- DutySchedule.duty_date == subquery.c.duty_date,
- DutySchedule.shift_type == subquery.c.shift_type,
- DutySchedule.update_time == subquery.c.latest_create_time
- )
- )
- query = query.order_by(DutySchedule.update_time.desc())
- duty_data = query.all()
- # 转换为字典
- data_list = []
- for d in duty_data:
- user_data = []
- leaders = []
- user_list = duty_id_get_user_id(db, d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- position= position_id_get_info(db,user_info.position_id)
- if position.position_name=='带班领导':
- leaders.append(contact_info.name)
- user_data.append(contact_info.name)
- data_list.append({
- "单位名称": dept_id_get_info(db,d.duty_unit).department_name,
- "开始时间": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
- "结束时间": d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
- "带班领导": "、".join(leaders),
- "值班人员": "、".join(user_data)
- })
- # 构造文件的完整路径
- import pandas as pd
- from io import BytesIO
- # 将查询结果转换为 DataFrame
- df = pd.DataFrame(data_list)
- # 将 DataFrame 导出为 Excel 文件
- output = BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- df.to_excel(writer, index=False)
- # 设置响应头
- output.seek(0)
- from urllib.parse import quote
- encoded_filename = f'值班表{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx'
- encoded_filename = quote(encoded_filename, encoding='utf-8')
- headers = {
- 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
- 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
- }
- # 返回文件流
- return StreamingResponse(output, headers=headers)
- except HTTPException as e:
- raise e
- except Exception as e:
- # 处理其他异常情况
- raise HTTPException(status_code=500, detail=str(e))
- # 各级单位值班人 导出
- @router.get("/gjdwzbr/export")
- async def download_file(keywords:str =Query(None),
- start_time:str =Query(None),
- end_time:str =Query(None),
- duty_unit:int =Query(None),
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)):
- """
- 根据提供的文件名下载文件。
- :param filename: 要下载的文件的名称。
- """
- try:
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2')
- # 添加查询条件
- if keywords:
- user_list = name_get_user_id(db, keywords)
- duty_list = user_id_get_duty_id(db, user_list)
- query = query.filter(DutySchedule.id.in_(duty_list))
- if start_time:
- start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date >= start_time)
- if end_time:
- end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
- query = query.filter(DutySchedule.duty_date <= end_time)
- if duty_unit:
- query = query.filter(DutySchedule.duty_unit == duty_unit)
- else:
- now_user_info = user_id_get_user_info(db, user_id)
- print(user_id, now_user_info.phonenumber, get_duty_unit_id_list(db))
- if now_user_info:
- query = query.filter(
- DutySchedule.duty_unit == mobile_phone_get_dept_id(db, mpfun.dec_data(now_user_info.phonenumber),
- get_duty_unit_id_list(db)))
- else:
- return {
- "data": [],
- "code": 200,
- "msg": "查询成功"
- }
- subquery = db.query(
- DutySchedule.duty_date,
- DutySchedule.shift_type,
- func.max(DutySchedule.update_time).label("latest_create_time")
- ).filter(DutySchedule.del_flag != '2').group_by(
- DutySchedule.duty_date,
- DutySchedule.shift_type
- ).subquery()
- query = query.join(
- subquery,
- and_(
- DutySchedule.duty_date == subquery.c.duty_date,
- DutySchedule.shift_type == subquery.c.shift_type,
- DutySchedule.update_time == subquery.c.latest_create_time
- )
- )
- query = query.order_by(DutySchedule.update_time.desc())
- duty_data = query.all()
- # 转换为字典
- data_list = []
- yz_list = []
- for d in duty_data:
- user_list = duty_id_get_user_id(db, d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- position = position_id_get_info(db, user_info.position_id)
- department = dept_id_get_info(db,contact_info.department_id)
- yz = d.duty_date.strftime('%Y-%m-%d')+str(contact_info.id)
- if yz not in yz_list:
- yz_list.append(yz)
- data_list.append({"值班日期":d.duty_date,
- "值班机构": dept_id_get_info(db,d.duty_unit).department_name,
- "组织机构": department.department_name,
- "值班岗位": position.position_name,
- "值班人员": contact_info.name,
- "职务": contact_info.position,
- "联系电话": contact_info.mobile_phone,
- "办公电话": contact_info.office_phone})
- # 构造文件的完整路径
- import pandas as pd
- from io import BytesIO
- # 将查询结果转换为 DataFrame
- df = pd.DataFrame(data_list)
- # 将 DataFrame 导出为 Excel 文件
- output = BytesIO()
- with pd.ExcelWriter(output, engine='openpyxl') as writer:
- df.to_excel(writer, index=False)
- # 设置响应头
- output.seek(0)
- from urllib.parse import quote
- encoded_filename = f'值班人员{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx'
- encoded_filename = quote(encoded_filename, encoding='utf-8')
- headers = {
- 'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
- 'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
- }
- # 返回文件流
- return StreamingResponse(output, headers=headers)
- except HTTPException as e:
- raise e
- except Exception as e:
- # 处理其他异常情况
- raise HTTPException(status_code=500, detail=str(e))
-
- # 当前值班人员(大屏和中屏)
- @router.post("/today")
- def duty_today(
- db: Session = Depends(get_db),
- body = Depends(remove_xss_json),
- user_id = Depends(valid_access_token)):
- try:
- now = datetime.now()
- # 往前推8小时30分钟
- eight_and_half_hours_ago = now - timedelta(hours=8, minutes=30)
- # 获取对应的日期
- date_of_eight_and_half_hours_ago = eight_and_half_hours_ago.date()
- query = db.query(DutySchedule)
- query = query.filter(DutySchedule.del_flag != '2' and DutySchedule.duty_date==date_of_eight_and_half_hours_ago)
- now_user_info = user_id_get_user_info(db, user_id)
- if now_user_info:
- user_unit = mobile_phone_get_dept_id(db, mpfun.dec_data(now_user_info.phonenumber),
- get_duty_unit_id_list(db))
- if user_unit==0:
- user_unit=1
- query = query.filter(
- DutySchedule.duty_unit == user_unit)
- else:
- query = query.filter(
- DutySchedule.duty_unit == 1)
- subquery = db.query(
- DutySchedule.shift_type,
- func.max(DutySchedule.update_time).label("latest_create_time")
- ).filter(DutySchedule.del_flag != '2' and DutySchedule.duty_date==date_of_eight_and_half_hours_ago).group_by(
- DutySchedule.shift_type
- ).subquery()
- query = query.join(
- subquery,
- and_(
- DutySchedule.shift_type == subquery.c.shift_type,
- DutySchedule.update_time == subquery.c.latest_create_time
- )
- )
- query = query.order_by(DutySchedule.update_time.desc())
- duty_data = query.all()
- # print(query,len(duty_data),duty_data,date_of_eight_and_half_hours_ago,mobile_phone_get_dept_id(db, mpfun.dec_data(now_user_info.phonenumber),
- # get_duty_unit_id_list(db)))
- # 转换为字典
- data_list = []
- for d in duty_data:
- user_data = []
- class_leader = []
- main_class=[]
- deputy_class = []
- prepare_class = []
- user_list = duty_id_get_user_id(db, d.id)
- for user_info in user_list:
- contact_info = user_id_get_info(db, user_info.personnel_id)
- if contact_info:
- user_data.append({"position_id": user_info.position_id,
- "personnel_id": user_info.personnel_id,
- "name": contact_info.name,
- "position": contact_info.position,
- "mobile_phone": contact_info.mobile_phone,
- "office_phone": contact_info.office_phone,
- "department_id": contact_info.department_id,
- "yzy_userid": contact_info.userid})
- if user_info.position_id==1:
- class_leader.append(contact_info.name)
- if user_info.position_id==2:
- main_class.append(contact_info.name)
- if user_info.position_id==8:
- deputy_class.append(contact_info.name)
- if user_info.position_id==9:
- prepare_class.append(contact_info.name)
- else:
- user_data.append({"position_id": user_info.position_id,
- "personnel_id": user_info.personnel_id})
- data_list.append({
- "id": d.id,
- "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
- "end_time": d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
- "duty_date": d.duty_date,
- "shift_type": d.shift_type,
- "duty_unit": d.duty_unit,
- "duty_type": d.duty_type,
- "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
- "user_data": user_data,
- "class_leader":"、".join(class_leader),
- "main_class":"、".join(main_class),
- "deputy_class":"、".join(deputy_class),
- "prepare_class":"、".join(prepare_class)
- })
- # 构建返回结果
- result = {
- "data": data_list,
- "code": 200,
- "msg": "查询成功"
- }
- return result
- except Exception as e:
- # 处理异常
- traceback.print_exc()
- return JSONResponse(status_code=404, content={
- 'code': 404,
- 'msg': str(e)
- })
- # 需要根据当前用户所在部门进行切换,默认显示市应急
- # return {
- # "code": 0,
- # "msg": "",
- # "data": [{
- # "class_leader": '梁文龙',
- # "main_class": '邓思远',
- # "deputy_class": '姚意恒',
- # "prepare_class": '张四'
- # }]
- # }
|