#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException from fastapi.responses import FileResponse, StreamingResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from sqlalchemy.sql import func from models import * import uuid import os from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_user, db_yzy, db_dept, db_msg_center from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta from common import YzyApi from common.db import db_dict from urllib.parse import quote import base64 from config import settings from exceptions import AppException from typing import List, Dict,Set import openpyxl from io import BytesIO router = APIRouter() @router.get("/dept_by_key", response_model=Dict) def dept_by_key(db: Session = Depends(get_db), area_name: str = Query(None), keyword: str = Query(None)): def get_dept_tree(db: Session, dept_id: int): dept = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() if not dept: return None children = [] for child in db.query(SysDept).filter(SysDept.parent_id == dept.dept_id).all(): child = get_dept_tree(db, child.dept_id) if child is not None: children.append(child) dept_info = { "uuid":str(uuid.uuid4()).replace('-',''), "id": dept.dept_id, "label": dept.dept_name, "deptType": True } if len(children) > 0: dept_info["children"] = children return dept_info dept_id = db_dept.get_dept_id_by_name(db, area_name) if dept_id == 0: return { "code": 200, "msg": "成功", "data": [] } print('dept_id:', dept_id) data = get_dept_tree(db, dept_id) return { "code": 200, "msg": "成功", "data": [data] } # 一键点名全市至区县 @router.post("/create_by_city_to_area") async def create_by_city_to_area( db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): current_date = datetime.now().date() current_time = datetime.now().time() try: # 过滤当前时间内的值班人员 q = db.query(DutyShift).filter(func.char_length(DutyShift.area_code) <= 6) q = q.filter(DutyShift.shift_date == current_date) q = q.filter(DutyShift.shift_status == 0) q = q.filter(DutyShift.start_time < current_time).filter(DutyShift.end_time > current_time) rows = q.all() user_count = len(rows) if user_count == 0: raise AppException(500, "暂无相关值班人员信息") new_call = OnlineRollCallBase( call_type = 1, recorded_by = user_id, create_time = datetime.now(), del_flag = '0', call_status = 1, user_count = user_count, ack_count = 0, unack_count = user_count, remark = '', update_time = datetime.now() ) db.add(new_call) db.commit() db.refresh(new_call) new_call_id = new_call.id dept_list = [] for row in rows: shift_id = row.shift_id shift_dept_id = row.dept_id shift_dept_name = db_dept.get_dept_name_by_id(db, row.dept_id) leader_id = row.leader_id primary_staff_id = row.primary_staff_id secondary_staff_id = row.secondary_staff_id standby_staff_id = row.standby_staff_id onduty_user = db_user.get_nick_name_by_id(db, row.primary_staff_id) onduty_leader = db_user.get_nick_name_by_id(db, row.leader_id) video_url = "#" call_url = "#" # 避免同一个部门重复 if shift_dept_id in dept_list: continue dept_list.append(shift_dept_id) new_detail = OnlineRollCallDetail( pid = new_call_id, shift_id = shift_id, dept_id = shift_dept_id, dept_name = shift_dept_name, onduty_user = onduty_user, onduty_leader = onduty_leader, video_url = video_url, call_url = call_url, ack_status = 0, ack_time = None, create_time = datetime.now(), del_flag = '0', ack_type = 0, leader_id = leader_id, primary_staff_id = primary_staff_id, secondary_staff_id = secondary_staff_id, standby_staff_id = standby_staff_id ) db.add(new_detail) db.commit() db.refresh(new_detail) send_yzy_msg(db, new_detail, user_id) db_msg_center.add_msg(db, "在线点名", new_detail.id, user_id) return { "code": 200, "msg": "点名创建成功", "data": new_call_id } except AppException as e: return { "code": e.code, "msg": e.msg } except Exception as e: traceback.print_exc() # 处理异常 raise HTTPException(status_code=500, detail=str(e)) # 一键点名全市至镇街 @router.post("/create_by_city_to_district") async def create_by_city_to_district( db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): current_date = datetime.now().date() current_time = datetime.now().time() try: # 过滤当前时间内的值班人员 q = db.query(DutyShift).filter(func.char_length(DutyShift.area_code) <= 9) q = q.filter(DutyShift.shift_date == current_date) q = q.filter(DutyShift.shift_status == 0) q = q.filter(DutyShift.start_time < current_time).filter(DutyShift.end_time > current_time) rows = q.all() user_count = len(rows) if user_count == 0: raise AppException(500, "暂无相关值班人员信息") new_call = OnlineRollCallBase( call_type = 1, recorded_by = user_id, create_time = datetime.now(), del_flag = '0', call_status = 1, user_count = user_count, ack_count = 0, unack_count = user_count, remark = '', update_time = datetime.now() ) db.add(new_call) db.commit() db.refresh(new_call) new_call_id = new_call.id dept_list = [] for row in rows: shift_id = row.shift_id shift_dept_id = row.dept_id shift_dept_name = db_dept.get_dept_name_by_id(db, row.dept_id) leader_id = row.leader_id primary_staff_id = row.primary_staff_id secondary_staff_id = row.secondary_staff_id standby_staff_id = row.standby_staff_id onduty_user = db_user.get_nick_name_by_id(db, row.primary_staff_id) onduty_leader = db_user.get_nick_name_by_id(db, row.leader_id) video_url = "#" call_url = "#" # 避免同一个部门重复 if shift_dept_id in dept_list: continue dept_list.append(shift_dept_id) new_detail = OnlineRollCallDetail( pid = new_call_id, shift_id = shift_id, dept_id = shift_dept_id, dept_name = shift_dept_name, onduty_user = onduty_user, onduty_leader = onduty_leader, video_url = video_url, call_url = call_url, ack_status = 0, ack_time = None, create_time = datetime.now(), del_flag = '0', ack_type = 0, leader_id = leader_id, primary_staff_id = primary_staff_id, secondary_staff_id = secondary_staff_id, standby_staff_id = standby_staff_id ) db.add(new_detail) db.commit() return { "code": 200, "msg": "点名创建成功", "data": new_call_id } except AppException as e: return { "code": e.code, "msg": e.msg } except Exception as e: traceback.print_exc() # 处理异常 raise HTTPException(status_code=500, detail=str(e)) # 新建点名(分区/县点名) @router.post("/create_by_dept_ids") async def create_by_dept_ids( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): current_date = datetime.now().date() current_time = datetime.now().time() try: if len(body['depts']) == 0: raise AppException(500, "请勾选部门") new_call = OnlineRollCallBase( call_type = 3, recorded_by = user_id, create_time = datetime.now(), del_flag = '0', call_status = 1, user_count = 0, ack_count = 0, unack_count = 0, remark = body['remark'], update_time = datetime.now() ) db.add(new_call) db.commit() db.refresh(new_call) new_call_id = new_call.id for dept_id in body['depts']: # 过滤当前时间内的值班人员 q = db.query(DutyShift).filter(DutyShift.dept_id == dept_id) q = q.filter(DutyShift.shift_date == current_date) q = q.filter(DutyShift.shift_status == 0) q = q.filter(DutyShift.start_time < current_time).filter(DutyShift.end_time > current_time) row = q.first() if row is not None: print(dept_id) shift_id = row.shift_id shift_dept_id = row.dept_id shift_dept_name = db_dept.get_dept_name_by_id(db, row.dept_id) leader_id = row.leader_id primary_staff_id = row.primary_staff_id secondary_staff_id = row.secondary_staff_id standby_staff_id = row.standby_staff_id onduty_user = db_user.get_nick_name_by_id(db, row.primary_staff_id) onduty_leader = db_user.get_nick_name_by_id(db, row.leader_id) video_url = "#" call_url = "#" new_detail = OnlineRollCallDetail( pid = new_call_id, shift_id = shift_id, dept_id = shift_dept_id, dept_name = shift_dept_name, onduty_user = onduty_user, onduty_leader = onduty_leader, video_url = video_url, call_url = call_url, ack_status = 0, ack_time = None, create_time = datetime.now(), del_flag = '0', ack_type = 0, leader_id = leader_id, primary_staff_id = primary_staff_id, secondary_staff_id = secondary_staff_id, standby_staff_id = standby_staff_id ) db.add(new_detail) db.commit() # 检查多次,避免人员为空 user_count = db.query(OnlineRollCallDetail).filter(OnlineRollCallDetail.pid == new_call_id).count() if user_count == 0: db.query(OnlineRollCallBase).filter(OnlineRollCallBase.id == new_call_id).delete() db.commit() raise AppException(500, "暂无相关值班人员信息") db.query(OnlineRollCallBase).filter(OnlineRollCallBase.id == new_call_id).update({"user_count": user_count, "unack_count": user_count}) db.commit() return { "code": 200, "msg": "点名创建成功", "data": new_call_id } except AppException as e: return { "code": e.code, "msg": e.msg } except Exception as e: traceback.print_exc() # 处理异常 raise HTTPException(status_code=500, detail=str(e)) # 结束点名 @router.post("/end") async def end_call( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): call_id = body['call_id'] row = db.query(OnlineRollCallBase).filter(and_(OnlineRollCallBase.id == call_id, OnlineRollCallBase.del_flag == '0')).first() if row is None: return { "code": 500, "msg": "点名记录不存在" } if row.call_status != 1: return { "code": 500, "msg": "点名记录状态已结束" } row.end_time = datetime.now() row.call_status = 2 # 结束 row.update_time = datetime.now() db.commit() return { "code": 200, "msg": "点名结束成功" } # 标记用户已应答 @router.post("/ack") async def ack_all( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): call_id = body['call_id'] base_row = db.query(OnlineRollCallBase).filter(OnlineRollCallBase.id == call_id).first() if base_row is None: return { "code": 500, "msg": "点名记录不存在" } detail_row = db.query(OnlineRollCallDetail).filter(and_(OnlineRollCallDetail.pid == call_id, OnlineRollCallDetail.leader_id == user_id)).first() if detail_row is None: return { "code": 500, "msg": "点名记录不存在!" } detail_row.ack_status = 1 # 已应答 detail_row.ack_time = datetime.now() db.commit() # 统计应答数 ack_count = db.query(OnlineRollCallDetail).filter(and_(OnlineRollCallDetail.pid == call_id, OnlineRollCallDetail.ack_status == 1)).count() # 统计未应答数 unack_count = db.query(OnlineRollCallDetail).filter(and_(OnlineRollCallDetail.pid == call_id, OnlineRollCallDetail.ack_status == 2)).count() db.query(OnlineRollCallBase).filter(OnlineRollCallBase.id == call_id).update({ "ack_count": ack_count, "unack_count": unack_count, "update_time": datetime.now() }) db.commit() return { "code": 200, "msg": "应答成功" } # 查询是否有我的待应答记录(提醒) @router.post("/mycall") async def query_mycall( db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): rows = db.query(OnlineRollCallDetail).filter(and_(OnlineRollCallDetail.leader_id == user_id, OnlineRollCallDetail.ack_status == 0)).all() data = [] for row in rows: call_id = row.pid base_row = db.query(OnlineRollCallBase).filter(and_(OnlineRollCallBase.id == call_id, OnlineRollCallBase.call_status == 1, OnlineRollCallBase.del_flag == '0')).first() if base_row is not None: data.append({ "id": call_id, "time1": get_datetime_str(base_row.create_time) }) return { "code": 200, "msg": "查询成功", "data": data, "total": len(data) } #应答详情 @router.get('/detail') async def get_event_detail( request: Request, call_id: str = Query(None, description='点名ID'), db: Session = Depends(get_db)): try: base_row = db.query(OnlineRollCallBase).filter(OnlineRollCallBase.id == call_id).first() if base_row is None: return { "code": 500, "msg": "点名记录不存在" } data = get_model_dict(base_row) data['begin_time'] = get_datetime_str(data['create_time']) data['update_time'] = get_datetime_str(data['update_time']) data['duration_time'] = '' # 已应答(结束) if data['call_status'] == 2: # 处理垃圾数据 if base_row.end_time is None: base_row.end_time = datetime.now() db.commit() time_diff = base_row.end_time - base_row.create_time # hours,minutes,seconds = str(time_diff).split(':') data['duration_time'] = str(time_diff) detail_rows = db.query(OnlineRollCallDetail).filter(OnlineRollCallDetail.pid == call_id).all() items = [] for row in detail_rows: detail_info = get_model_dict(row) detail_info['begin_time'] = get_datetime_str(detail_info['create_time']) detail_info['ack_time'] = get_datetime_str(detail_info['ack_time']) detail_info['ack_status_text'] = get_ack_status_text(detail_info['ack_status']) items.append(detail_info) data['items'] = items return { "code": 200, "msg": "查询成功", "data": data } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) def send_yzy_msg(db: Session, detail_entity: OnlineRollCallDetail, user_id: int) -> None: to_user_id = detail_entity.leader_id user_info = db_user.get_user_info(db, to_user_id) yzy_account = user_info.yzy_account yzy_userid = db_yzy.get_userid_by_account(db, yzy_account) create_time = get_datetime_str(detail_entity.create_time) detail_url = YzyApi.format_redirect_url("{}/".format(settings.YJXP_WEB_ROOT_PATH)) # 主页 description = f"你有一条在线点名通知,请尽快确认\n点名时间:{create_time}" data = { "yzy_userid": yzy_userid, "mobile": yzy_account, "content": description, "recorded_by": user_id, "detail_url": detail_url, "foreign_key": detail_entity.id, "from_scenario": "online_roll_call_detail", "title": "在线点名提醒" } YzyApi.add_to_msg_queue(db, data) def get_ack_status_text(ack_status: int) -> str: if ack_status == 0: return '未应答' elif ack_status == 1: return '已接通' elif ack_status == 2: return '呼叫中' else: return str(ack_status) #应答详情 @router.get('/list') async def get_event_list( request: Request, begin_date: str = Query('', description='开始时间'), end_date: str = Query('', description='结束时间'), page: int = Query(1, gt=0, description='页码'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db)): try: # 两周内 d1 = datetime.now() - timedelta(days=14) where = and_(OnlineRollCallBase.del_flag == '0', OnlineRollCallBase.create_time > d1) if begin_date is not None and begin_date != '': begin_date = datetime.strptime(begin_date, "%Y-%m-%d") where = and_(where, OnlineRollCallBase.create_time > begin_date) if end_date is not None and end_date != '': end_date = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) where = and_(where, OnlineRollCallBase.create_time < end_date) print(where) # 计算总条目数 q = db.query(func.count(OnlineRollCallBase.id)) q = q.filter(where) total = q.scalar() # 执行分页查询 q = db.query(OnlineRollCallBase) q = q.filter(where) rows = q.order_by(OnlineRollCallBase.id.desc()).offset((page - 1) * page_size).limit(page_size).all() data = [] for row in rows: duration_time = "" # 已结束 if row.call_status == 2: time_diff = row.end_time - row.create_time # hours,minutes,seconds = str(time_diff).split(':') duration_time = str(time_diff) data.append({ "id": row.id, "begin_time": get_datetime_str(row.create_time), "end_time": get_datetime_str(row.end_time), "duration_time": duration_time, "call_status": row.call_status, "ack_count": row.ack_count, "unack_count": row.unack_count }) # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) #应答列表(我的) @router.get('/ack_list') async def get_event_list( request: Request, begin_date: str = Query('', description='开始时间'), end_date: str = Query('', description='结束时间'), page: int = Query(1, gt=0, description='页码'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: where = and_(OnlineRollCallDetail.leader_id == user_id, OnlineRollCallDetail.del_flag == '0') if begin_date is not None and begin_date != '': begin_date = datetime.strptime(begin_date, "%Y-%m-%d") where = and_(where, OnlineRollCallDetail.create_time > begin_date) if end_date is not None and end_date != '': end_date = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) where = and_(where, OnlineRollCallDetail.create_time < end_date) print(where) # 计算总条目数 q = db.query(func.count(OnlineRollCallDetail.id)) q = q.filter(where) total = q.scalar() # 执行分页查询 q = db.query(OnlineRollCallDetail) q = q.filter(where) rows = q.order_by(OnlineRollCallDetail.id.desc()).offset((page - 1) * page_size).limit(page_size).all() data = [] for row in rows: duration_time = "" # 已应答 if row.ack_status == 1: time_diff = row.ack_time - row.create_time if time_diff.days < 0: duration_time = "超过{}天".format(abs(time_diff.days)) else: hours,minutes,seconds = str(time_diff).split(':') duration_time = str(time_diff) data.append({ "id": row.id, "create_time": get_datetime_str(row.create_time), "duration_time": duration_time, "ack_status": row.ack_status }) # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # 点名统计(我的) @router.get("/summary") async def get_call_summary(request: Request, db: Session = Depends(get_db), user_id = Depends(valid_access_token)): try: call_count = db.query(OnlineRollCallDetail).filter(and_(OnlineRollCallDetail.leader_id == user_id, OnlineRollCallDetail.del_flag == '0')).count() ack_count = db.query(OnlineRollCallDetail).filter(and_(OnlineRollCallDetail.leader_id == user_id, OnlineRollCallDetail.del_flag == '0', OnlineRollCallDetail.ack_status == 1)).count() unack_count = db.query(OnlineRollCallDetail).filter(and_(OnlineRollCallDetail.leader_id == user_id, OnlineRollCallDetail.del_flag == '0', OnlineRollCallDetail.ack_status == 0)).count() data = { "call_count": call_count, "ack_count": ack_count, "unack_count": unack_count } return { "code": 200, "msg": "查询成功", "data": data } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get("/export", response_class=FileResponse) async def get_export_xslx( request: Request, call_id: str, db: Session = Depends(get_db)): # 导出excel表 wb = openpyxl.Workbook() ws = wb.active ws.cell(1, 1).value = "部门名称" ws.cell(1, 2).value = "值班领导" ws.cell(1, 3).value = "值班员" ws.cell(1, 4).value = "点名开始时间" ws.cell(1, 5).value = "应答状态" ws.cell(1, 6).value = "应答时间" q = db.query(OnlineRollCallDetail) q = q.filter(and_(OnlineRollCallDetail.pid == call_id, OnlineRollCallDetail.del_flag == '0')).order_by(OnlineRollCallDetail.ack_status.desc(), OnlineRollCallDetail.ack_time.asc()) rows = q.all() excel_row = 2 for row in rows: ws.cell(excel_row, 1).value = row.dept_name ws.cell(excel_row, 2).value = row.onduty_leader ws.cell(excel_row, 3).value = row.onduty_user ws.cell(excel_row, 4).value = get_datetime_str(row.create_time) ws.cell(excel_row, 5).value = get_ack_status_text(row.ack_status) ws.cell(excel_row, 6).value = get_datetime_str(row.ack_time) excel_row = excel_row + 1 upload_path = '/data/download/mergefile' if os.path.exists(upload_path) == False: os.makedirs(upload_path) excel_file_name = new_guid() +".xlsx" save_path = os.path.join(upload_path, excel_file_name) wb.save(save_path) wb.close() return FileResponse(save_path)