baoyubo 1 月之前
父节点
当前提交
8b09ba6526
共有 1 个文件被更改,包括 592 次插入3 次删除
  1. 592 3
      routers/api/dutyManagement/schedule.py

+ 592 - 3
routers/api/dutyManagement/schedule.py

@@ -1,14 +1,17 @@
 #!/usr/bin/env python3
 # -*- coding: utf-8 -*-
 
-from fastapi import APIRouter, Request, Depends, Query, HTTPException, status
+from fastapi import APIRouter, Request, Depends, Query, HTTPException, status, BackgroundTasks
 from common.security import valid_access_token
-from fastapi.responses import JSONResponse
+from fastapi.responses import JSONResponse, FileResponse,StreamingResponse
 from sqlalchemy.orm import Session
 from sqlalchemy import and_, or_
 from sqlalchemy.sql import func
 from common.auth_user import *
 from pydantic import BaseModel
+from common.db import db_dept
+from common.db import db_czrz
+from exceptions import AppException, HmacException
 from database import get_db
 from typing import List
 from models import *
@@ -16,6 +19,7 @@ from utils import *
 from utils.ry_system_util import *
 from utils.video_util import *
 import traceback
+import os
 import json
 
 router = APIRouter()
@@ -503,7 +507,7 @@ async def updata_dict_type(
         db.add_all(user_list)
 
 
-        if not dutyn_data:
+        if not duty_data:
             return JSONResponse(status_code=404, content={
                 'errcode': 404,
                 'errmsg': '值班不存在'
@@ -615,3 +619,588 @@ async def delete_dict_data(
             'msg': str(e)
         })
 
+
+# 值班排班 导入
+@router.post('/zbpb/createImport')
+async def create_contact(
+        request: Request,
+        background_tasks: BackgroundTasks,
+        db: Session = Depends(get_db),
+        body=Depends(remove_xss_json),
+        auth_user: AuthUser = Depends(find_auth_user),
+        user_id=Depends(valid_access_token)
+):
+    try:
+        # 提取请求数据
+        filename = body['filename']
+        file_name_desc = body['file_name_desc']
+        duty_unit = body['duty_unit']
+        if len(filename) == 0:
+            raise Exception()
+
+        file_path = f'/data/upload/mergefile/uploads/{filename}'
+
+        # 检查文件是否存在
+        if not os.path.isfile(file_path):
+            return JSONResponse(status_code=404, content={
+                'errcode': 404,
+                'errmsg': f'{filename}不存在'
+            })
+
+        # new_file = ThreeProofingResponsiblePersonImportFileStatus(
+        #     file_uuid=filename,
+        #     file_name=file_name_desc,
+        #     status='1',
+        #     remark='',
+        #     user_id=user_id
+        # )
+        # db.add(new_file)
+        # db.commit()
+        # background_tasks.add_task(import_data, db, file_path, user_id, new_file)
+        #
+        # db_czrz.log(db, auth_user, "系统管理", f"后台管理导入三防责任人管理人员信息成功", request.client.host)
+
+        # 返回创建成功的响应
+        return {
+            "code": 200,
+            "msg": "成功",
+            "data": None
+        }
+
+    except AppException as e:
+        return {
+            "code": 500,
+            "msg": e.msg
+        }
+
+    except Exception as e:
+        traceback.print_exc()
+        # 处理异常
+        db.rollback()
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+# 值班排班 导出
+@router.get("/zbpb/export")
+async def download_file(keywords:str =Query(None),
+    duty_type:str =Query(None),
+    start_time:str =Query(None),
+    end_time:str =Query(None),
+    duty_unit:int  =Query(None),
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)):
+    """
+    根据提供的文件名下载文件。
+    :param filename: 要下载的文件的名称。
+    """
+    try:
+        query = db.query(DutySchedule)
+
+        query = query.filter(DutySchedule.del_flag != '2')
+        # 添加查询条件
+        if keywords:
+            user_list = name_get_user_id(db, keywords)
+            duty_list = user_id_get_duty_id(db, user_list)
+            query = query.filter(DutySchedule.id.in_(duty_list))
+
+        if duty_type:
+            query = query.filter(DutySchedule.duty_type == duty_type)
+        if start_time:
+            start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
+            query = query.filter(DutySchedule.duty_date >= start_time)
+        if end_time:
+            end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
+            query = query.filter(DutySchedule.duty_date <= end_time)
+        if duty_unit:
+            query = query.filter(DutySchedule.duty_unit == duty_unit)
+
+        query = query.order_by(DutySchedule.update_time.desc())
+        duty_data = query.all()
+
+        # 转换为字典
+        data_list = []
+        for d in duty_data:
+            user_data = []
+            user_list = duty_id_get_user_id(db, d.id)
+            for user_info in user_list:
+                contact_info = user_id_get_info(db, user_info.personnel_id)
+                user_data.append({"position_id": user_info.position_id,
+                                  "personnel_id": user_info.personnel_id,
+                                  "name": contact_info.name,
+                                  "position": contact_info.position,
+                                  "mobile_phone": contact_info.mobile_phone,
+                                  "office_phone": contact_info.office_phone,
+                                  "department_id": contact_info.department_id,
+                                  "yzy_userid": contact_info.userid})
+            data_list.append({
+                "id": d.id,
+                "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
+                "end_time": d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
+                "duty_date": d.duty_date,
+                "shift_type": d.shift_type,
+                "duty_unit": d.duty_unit,
+                "duty_type": d.duty_type,
+                "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
+                "user_data": user_data
+            })
+        # 构造文件的完整路径
+        import pandas as pd
+        from io import BytesIO
+        # 将查询结果转换为 DataFrame
+        df = pd.DataFrame(data_list)
+
+        # 将 DataFrame 导出为 Excel 文件
+        output = BytesIO()
+        with pd.ExcelWriter(output, engine='openpyxl') as writer:
+            df.to_excel(writer, index=False)
+
+        # 设置响应头
+        output.seek(0)
+        from urllib.parse import quote
+        encoded_filename = f'值班排班{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx'
+        encoded_filename = quote(encoded_filename, encoding='utf-8')
+        headers = {
+            'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
+            'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
+        }
+
+
+        # 返回文件流
+        return StreamingResponse(output, headers=headers)
+
+
+    except HTTPException as e:
+        raise e
+    except Exception as e:
+        # 处理其他异常情况
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+# 自定义排班 导出
+@router.post('/zdypb/createImport')
+async def create_contact(
+        request: Request,
+        background_tasks: BackgroundTasks,
+        db: Session = Depends(get_db),
+        body=Depends(remove_xss_json),
+        auth_user: AuthUser = Depends(find_auth_user),
+        user_id=Depends(valid_access_token)
+):
+    try:
+        # 提取请求数据
+        filename = body['filename']
+        file_name_desc = body['file_name_desc']
+        duty_unit = body['duty_unit']
+        if len(filename) == 0:
+            raise Exception()
+
+        file_path = f'/data/upload/mergefile/uploads/{filename}'
+
+        # 检查文件是否存在
+        if not os.path.isfile(file_path):
+            return JSONResponse(status_code=404, content={
+                'errcode': 404,
+                'errmsg': f'{filename}不存在'
+            })
+
+        # new_file = ThreeProofingResponsiblePersonImportFileStatus(
+        #     file_uuid=filename,
+        #     file_name=file_name_desc,
+        #     status='1',
+        #     remark='',
+        #     user_id=user_id
+        # )
+        # db.add(new_file)
+        # db.commit()
+        # background_tasks.add_task(import_data, db, file_path, user_id, new_file)
+        #
+        # db_czrz.log(db, auth_user, "系统管理", f"后台管理导入三防责任人管理人员信息成功", request.client.host)
+
+        # 返回创建成功的响应
+        return {
+            "code": 200,
+            "msg": "成功",
+            "data": None
+        }
+
+    except AppException as e:
+        return {
+            "code": 500,
+            "msg": e.msg
+        }
+
+    except Exception as e:
+        traceback.print_exc()
+        # 处理异常
+        db.rollback()
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+# 本单位值班 导出
+@router.get("/bdwzb/export")
+async def download_file(keywords:str =Query(None),
+    start_time:str =Query(None),
+    end_time:str =Query(None),
+    duty_unit:int  =Query(None),
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)):
+    """
+    根据提供的文件名下载文件。
+    :param filename: 要下载的文件的名称。
+    """
+    try:
+        query = db.query(DutySchedule)
+
+        query = query.filter(DutySchedule.del_flag != '2')
+        # 添加查询条件
+        if keywords:
+            user_list = name_get_user_id(db, keywords)
+            duty_list = user_id_get_duty_id(db, user_list)
+            query = query.filter(DutySchedule.id.in_(duty_list))
+        if start_time:
+            start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
+            query = query.filter(DutySchedule.duty_date >= start_time)
+        if end_time:
+            end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
+            query = query.filter(DutySchedule.duty_date <= end_time)
+        if duty_unit:
+            query = query.filter(DutySchedule.duty_unit == duty_unit)
+        else:
+            now_user_info = user_id_get_user_info(db, user_id)
+            print(user_id, now_user_info.phonenumber, get_duty_unit_id_list(db))
+            if now_user_info:
+                query = query.filter(
+                    DutySchedule.duty_unit == mobile_phone_get_dept_id(db, mpfun.dec_data(now_user_info.phonenumber),
+                                                                       get_duty_unit_id_list(db)))
+            else:
+                return {
+                    "data": [],
+                    "code": 200,
+                    "msg": "查询成功"
+                }
+        subquery = db.query(
+            DutySchedule.duty_date,
+            DutySchedule.shift_type,
+            func.max(DutySchedule.update_time).label("latest_create_time")
+        ).filter(DutySchedule.del_flag != '2').group_by(
+            DutySchedule.duty_date,
+            DutySchedule.shift_type
+        ).subquery()
+
+        query = query.join(
+            subquery,
+            and_(
+                DutySchedule.duty_date == subquery.c.duty_date,
+                DutySchedule.shift_type == subquery.c.shift_type,
+                DutySchedule.update_time == subquery.c.latest_create_time
+            )
+        )
+        query = query.order_by(DutySchedule.update_time.desc())
+        duty_data = query.all()
+
+        # 转换为字典
+        data_list = []
+        for d in duty_data:
+            user_data = []
+            user_list = duty_id_get_user_id(db, d.id)
+            for user_info in user_list:
+                contact_info = user_id_get_info(db, user_info.personnel_id)
+                user_data.append({"position_id": user_info.position_id,
+                                  "personnel_id": user_info.personnel_id,
+                                  "name": contact_info.name,
+                                  "position": contact_info.position,
+                                  "mobile_phone": contact_info.mobile_phone,
+                                  "office_phone": contact_info.office_phone,
+                                  "department_id": contact_info.department_id,
+                                  "yzy_userid": contact_info.userid})
+            data_list.append({
+                "id": d.id,
+                "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
+                "end_time": d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
+                "duty_date": d.duty_date,
+                "shift_type": d.shift_type,
+                "duty_unit": d.duty_unit,
+                "duty_type": d.duty_type,
+                "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
+                "user_data": user_data
+            })
+
+        # 构造文件的完整路径
+        import pandas as pd
+        from io import BytesIO
+        # 将查询结果转换为 DataFrame
+        df = pd.DataFrame(data_list)
+
+        # 将 DataFrame 导出为 Excel 文件
+        output = BytesIO()
+        with pd.ExcelWriter(output, engine='openpyxl') as writer:
+            df.to_excel(writer, index=False)
+
+        # 设置响应头
+        output.seek(0)
+        from urllib.parse import quote
+        encoded_filename = f'值班排班{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx'
+        encoded_filename = quote(encoded_filename, encoding='utf-8')
+        headers = {
+            'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
+            'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
+        }
+
+
+        # 返回文件流
+        return StreamingResponse(output, headers=headers)
+
+
+    except HTTPException as e:
+        raise e
+    except Exception as e:
+        # 处理其他异常情况
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+# 各级单位值班表 导出
+@router.get("/gjdwzbb/export")
+async def download_file(keywords:str =Query(None),
+    start_time:str =Query(None),
+    end_time:str =Query(None),
+    duty_unit:int  =Query(None),
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)):
+    """
+    根据提供的文件名下载文件。
+    :param filename: 要下载的文件的名称。
+    """
+    try:
+        query = db.query(DutySchedule)
+
+        query = query.filter(DutySchedule.del_flag != '2')
+        # 添加查询条件
+        if keywords:
+            user_list = name_get_user_id(db, keywords)
+            duty_list = user_id_get_duty_id(db, user_list)
+            query = query.filter(DutySchedule.id.in_(duty_list))
+        if start_time:
+            start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
+            query = query.filter(DutySchedule.duty_date >= start_time)
+        if end_time:
+            end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
+            query = query.filter(DutySchedule.duty_date <= end_time)
+        if duty_unit:
+            query = query.filter(DutySchedule.duty_unit == duty_unit)
+        else:
+            now_user_info = user_id_get_user_info(db, user_id)
+            print(user_id, now_user_info.phonenumber, get_duty_unit_id_list(db))
+            if now_user_info:
+                query = query.filter(
+                    DutySchedule.duty_unit == mobile_phone_get_dept_id(db, mpfun.dec_data(now_user_info.phonenumber),
+                                                                       get_duty_unit_id_list(db)))
+            else:
+                return {
+                    "data": [],
+                    "code": 200,
+                    "msg": "查询成功"
+                }
+        subquery = db.query(
+            DutySchedule.duty_date,
+            DutySchedule.shift_type,
+            func.max(DutySchedule.update_time).label("latest_create_time")
+        ).filter(DutySchedule.del_flag != '2').group_by(
+            DutySchedule.duty_date,
+            DutySchedule.shift_type
+        ).subquery()
+
+        query = query.join(
+            subquery,
+            and_(
+                DutySchedule.duty_date == subquery.c.duty_date,
+                DutySchedule.shift_type == subquery.c.shift_type,
+                DutySchedule.update_time == subquery.c.latest_create_time
+            )
+        )
+        query = query.order_by(DutySchedule.update_time.desc())
+        duty_data = query.all()
+
+        # 转换为字典
+        data_list = []
+        for d in duty_data:
+            user_data = []
+            user_list = duty_id_get_user_id(db, d.id)
+            for user_info in user_list:
+                contact_info = user_id_get_info(db, user_info.personnel_id)
+                user_data.append({"position_id": user_info.position_id,
+                                  "personnel_id": user_info.personnel_id,
+                                  "name": contact_info.name,
+                                  "position": contact_info.position,
+                                  "mobile_phone": contact_info.mobile_phone,
+                                  "office_phone": contact_info.office_phone,
+                                  "department_id": contact_info.department_id,
+                                  "yzy_userid": contact_info.userid})
+            data_list.append({
+                "id": d.id,
+                "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
+                "end_time": d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
+                "duty_date": d.duty_date,
+                "shift_type": d.shift_type,
+                "duty_unit": d.duty_unit,
+                "duty_type": d.duty_type,
+                "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
+                "user_data": user_data
+            })
+
+        # 构造文件的完整路径
+        import pandas as pd
+        from io import BytesIO
+        # 将查询结果转换为 DataFrame
+        df = pd.DataFrame(data_list)
+
+        # 将 DataFrame 导出为 Excel 文件
+        output = BytesIO()
+        with pd.ExcelWriter(output, engine='openpyxl') as writer:
+            df.to_excel(writer, index=False)
+
+        # 设置响应头
+        output.seek(0)
+        from urllib.parse import quote
+        encoded_filename = f'值班排班{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx'
+        encoded_filename = quote(encoded_filename, encoding='utf-8')
+        headers = {
+            'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
+            'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
+        }
+
+
+        # 返回文件流
+        return StreamingResponse(output, headers=headers)
+
+
+    except HTTPException as e:
+        raise e
+    except Exception as e:
+        # 处理其他异常情况
+        raise HTTPException(status_code=500, detail=str(e))
+
+
+# 各级单位值班人 导出
+@router.get("/gjdwzbr/export")
+async def download_file(keywords:str =Query(None),
+    start_time:str =Query(None),
+    end_time:str =Query(None),
+    duty_unit:int  =Query(None),
+    db: Session = Depends(get_db),
+    body = Depends(remove_xss_json),
+    user_id = Depends(valid_access_token)):
+    """
+    根据提供的文件名下载文件。
+    :param filename: 要下载的文件的名称。
+    """
+    try:
+        query = db.query(DutySchedule)
+
+        query = query.filter(DutySchedule.del_flag != '2')
+        # 添加查询条件
+        if keywords:
+            user_list = name_get_user_id(db, keywords)
+            duty_list = user_id_get_duty_id(db, user_list)
+            query = query.filter(DutySchedule.id.in_(duty_list))
+        if start_time:
+            start_time = datetime.strptime(start_time, "%Y-%m-%d").date()
+            query = query.filter(DutySchedule.duty_date >= start_time)
+        if end_time:
+            end_time = datetime.strptime(end_time, "%Y-%m-%d").date()
+            query = query.filter(DutySchedule.duty_date <= end_time)
+        if duty_unit:
+            query = query.filter(DutySchedule.duty_unit == duty_unit)
+        else:
+            now_user_info = user_id_get_user_info(db, user_id)
+            print(user_id, now_user_info.phonenumber, get_duty_unit_id_list(db))
+            if now_user_info:
+                query = query.filter(
+                    DutySchedule.duty_unit == mobile_phone_get_dept_id(db, mpfun.dec_data(now_user_info.phonenumber),
+                                                                       get_duty_unit_id_list(db)))
+            else:
+                return {
+                    "data": [],
+                    "code": 200,
+                    "msg": "查询成功"
+                }
+        subquery = db.query(
+            DutySchedule.duty_date,
+            DutySchedule.shift_type,
+            func.max(DutySchedule.update_time).label("latest_create_time")
+        ).filter(DutySchedule.del_flag != '2').group_by(
+            DutySchedule.duty_date,
+            DutySchedule.shift_type
+        ).subquery()
+
+        query = query.join(
+            subquery,
+            and_(
+                DutySchedule.duty_date == subquery.c.duty_date,
+                DutySchedule.shift_type == subquery.c.shift_type,
+                DutySchedule.update_time == subquery.c.latest_create_time
+            )
+        )
+        query = query.order_by(DutySchedule.update_time.desc())
+        duty_data = query.all()
+
+        # 转换为字典
+        data_list = []
+        for d in duty_data:
+            user_data = []
+            user_list = duty_id_get_user_id(db, d.id)
+            for user_info in user_list:
+                contact_info = user_id_get_info(db, user_info.personnel_id)
+                user_data.append({"position_id": user_info.position_id,
+                                  "personnel_id": user_info.personnel_id,
+                                  "name": contact_info.name,
+                                  "position": contact_info.position,
+                                  "mobile_phone": contact_info.mobile_phone,
+                                  "office_phone": contact_info.office_phone,
+                                  "department_id": contact_info.department_id,
+                                  "yzy_userid": contact_info.userid})
+            data_list.append({
+                "id": d.id,
+                "start_time": d.start_time.strftime('%Y-%m-%d %H:%M:%S') if d.start_time else '',
+                "end_time": d.end_time.strftime('%Y-%m-%d %H:%M:%S') if d.end_time else '',
+                "duty_date": d.duty_date,
+                "shift_type": d.shift_type,
+                "duty_unit": d.duty_unit,
+                "duty_type": d.duty_type,
+                "createTime": d.create_time.strftime('%Y-%m-%d %H:%M:%S') if d.create_time else '',
+                "user_data": user_data
+            })
+
+        # 构造文件的完整路径
+        import pandas as pd
+        from io import BytesIO
+        # 将查询结果转换为 DataFrame
+        df = pd.DataFrame(data_list)
+
+        # 将 DataFrame 导出为 Excel 文件
+        output = BytesIO()
+        with pd.ExcelWriter(output, engine='openpyxl') as writer:
+            df.to_excel(writer, index=False)
+
+        # 设置响应头
+        output.seek(0)
+        from urllib.parse import quote
+        encoded_filename = f'值班排班{datetime.now().strftime("%Y%m%d%H%mi%s")}.xlsx'
+        encoded_filename = quote(encoded_filename, encoding='utf-8')
+        headers = {
+            'Content-Disposition': f'attachment; filename*=UTF-8\'\'{encoded_filename}',
+            'Content-Type': 'application/vnd.openxmlformats-officedocument.spreadsheetml.sheet'
+        }
+
+
+        # 返回文件流
+        return StreamingResponse(output, headers=headers)
+
+
+    except HTTPException as e:
+        raise e
+    except Exception as e:
+        # 处理其他异常情况
+        raise HTTPException(status_code=500, detail=str(e))