#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Query, HTTPException from fastapi.responses import FileResponse, StreamingResponse from database import get_db from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.orm import Session from sqlalchemy.sql import func from models import * import uuid import os from sqlalchemy import create_engine, select from typing import Optional from utils.StripTagsHTMLParser import * from common.db import db_user, db_yzy, db_dept, db_msg_center from common.security import valid_access_token import traceback from utils import * from datetime import datetime, timedelta from common import YzyApi from common.db import db_dict from urllib.parse import quote import base64 from config import settings from exceptions import AppException from typing import List, Dict,Set import openpyxl from io import BytesIO from PIL import Image router = APIRouter() @router.get("/dept_by_key", response_model=Dict) def dept_by_key(db: Session = Depends(get_db), area_name: str = Query(None), keyword: str = Query(None)): def get_dept_tree(db: Session, dept_id: int): dept = db.query(SysDept).filter(SysDept.dept_id == dept_id).first() if not dept: return None children = [] for child in db.query(SysDept).filter(SysDept.parent_id == dept.dept_id).all(): child = get_dept_tree(db, child.dept_id) if child is not None: children.append(child) dept_info = { "uuid":str(uuid.uuid4()).replace('-',''), "id": dept.dept_id, "label": dept.dept_name, "deptType": True } if len(children) > 0: dept_info["children"] = children return dept_info dept_id = db_dept.get_dept_id_by_name(db, area_name) if dept_id == 0: return { "code": 200, "msg": "成功", "data": [] } print('dept_id:', dept_id) data = get_dept_tree(db, dept_id) return { "code": 200, "msg": "成功", "data": [data] } # 一键点名全市至区县 @router.post("/create_by_city_to_area") async def create_by_city_to_area( db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): current_date = datetime.now().date() current_time = datetime.now().time() try: # 过滤当前时间内的值班人员 q = db.query(DutyShift).filter(func.char_length(DutyShift.area_code) <= 6) q = q.filter(DutyShift.shift_date == current_date) q = q.filter(DutyShift.shift_status == 0) q = q.filter(DutyShift.start_time < current_time).filter(DutyShift.end_time > current_time) rows = q.all() user_count = len(rows) if user_count == 0: raise AppException(500, "暂无相关值班人员信息") new_call = OnlineRollCallBase( call_type = 1, recorded_by = user_id, create_time = datetime.now(), del_flag = '0', call_status = 1, user_count = user_count, ack_count = 0, unack_count = user_count, remark = '', update_time = datetime.now() ) db.add(new_call) db.commit() db.refresh(new_call) new_call_id = new_call.id dept_list = [] for row in rows: shift_id = row.shift_id shift_dept_id = row.dept_id shift_dept_name = db_dept.get_dept_name_by_id(db, row.dept_id) leader_id = row.leader_id primary_staff_id = row.primary_staff_id secondary_staff_id = row.secondary_staff_id standby_staff_id = row.standby_staff_id onduty_user = db_user.get_nick_name_by_id(db, row.primary_staff_id) onduty_leader = db_user.get_nick_name_by_id(db, row.leader_id) video_url = "#" call_url = "#" # 避免同一个部门重复 if shift_dept_id in dept_list: continue dept_list.append(shift_dept_id) new_detail = OnlineRollCallDetail( pid = new_call_id, shift_id = shift_id, dept_id = shift_dept_id, dept_name = shift_dept_name, onduty_user = onduty_user, onduty_leader = onduty_leader, video_url = video_url, call_url = call_url, ack_status = 0, ack_time = None, create_time = datetime.now(), del_flag = '0', ack_type = 0, leader_id = leader_id, primary_staff_id = primary_staff_id, secondary_staff_id = secondary_staff_id, standby_staff_id = standby_staff_id ) db.add(new_detail) db.commit() db.refresh(new_detail) # 发送粤政易消息 send_yzy_msg(db, new_detail, user_id) return { "code": 200, "msg": "点名创建成功", "data": new_call_id } except AppException as e: return { "code": e.code, "msg": e.msg } except Exception as e: traceback.print_exc() # 处理异常 raise HTTPException(status_code=500, detail=str(e)) # 一键点名全市至镇街 @router.post("/create_by_city_to_district") async def create_by_city_to_district( db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): current_date = datetime.now().date() current_time = datetime.now().time() try: # 过滤当前时间内的值班人员 q = db.query(DutyShift).filter(func.char_length(DutyShift.area_code) <= 9) q = q.filter(DutyShift.shift_date == current_date) q = q.filter(DutyShift.shift_status == 0) q = q.filter(DutyShift.start_time < current_time).filter(DutyShift.end_time > current_time) rows = q.all() user_count = len(rows) if user_count == 0: raise AppException(500, "暂无相关值班人员信息") new_call = OnlineRollCallBase( call_type = 1, recorded_by = user_id, create_time = datetime.now(), del_flag = '0', call_status = 1, user_count = user_count, ack_count = 0, unack_count = user_count, remark = '', update_time = datetime.now() ) db.add(new_call) db.commit() db.refresh(new_call) new_call_id = new_call.id dept_list = [] for row in rows: shift_id = row.shift_id shift_dept_id = row.dept_id shift_dept_name = db_dept.get_dept_name_by_id(db, row.dept_id) leader_id = row.leader_id primary_staff_id = row.primary_staff_id secondary_staff_id = row.secondary_staff_id standby_staff_id = row.standby_staff_id onduty_user = db_user.get_nick_name_by_id(db, row.primary_staff_id) onduty_leader = db_user.get_nick_name_by_id(db, row.leader_id) video_url = "#" call_url = "#" # 避免同一个部门重复 if shift_dept_id in dept_list: continue dept_list.append(shift_dept_id) new_detail = OnlineRollCallDetail( pid = new_call_id, shift_id = shift_id, dept_id = shift_dept_id, dept_name = shift_dept_name, onduty_user = onduty_user, onduty_leader = onduty_leader, video_url = video_url, call_url = call_url, ack_status = 0, ack_time = None, create_time = datetime.now(), del_flag = '0', ack_type = 0, leader_id = leader_id, primary_staff_id = primary_staff_id, secondary_staff_id = secondary_staff_id, standby_staff_id = standby_staff_id ) db.add(new_detail) db.commit() # 发送粤政易消息 send_yzy_msg(db, new_detail, user_id) return { "code": 200, "msg": "点名创建成功", "data": new_call_id } except AppException as e: return { "code": e.code, "msg": e.msg } except Exception as e: traceback.print_exc() # 处理异常 raise HTTPException(status_code=500, detail=str(e)) # 新建点名(分区/县点名) @router.post("/create_by_dept_ids") async def create_by_dept_ids( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): current_date = datetime.now().date() current_time = datetime.now().time() try: if len(body['depts']) == 0: raise AppException(500, "请勾选部门") new_call = OnlineRollCallBase( call_type = 3, recorded_by = user_id, create_time = datetime.now(), del_flag = '0', call_status = 1, user_count = 0, ack_count = 0, unack_count = 0, remark = body['remark'], update_time = datetime.now() ) db.add(new_call) db.commit() db.refresh(new_call) new_call_id = new_call.id for dept_id in body['depts']: # 过滤当前时间内的值班人员 q = db.query(DutyShift).filter(DutyShift.dept_id == dept_id) q = q.filter(DutyShift.shift_date == current_date) q = q.filter(DutyShift.shift_status == 0) q = q.filter(DutyShift.start_time < current_time).filter(DutyShift.end_time > current_time) row = q.first() if row is not None: print(dept_id) shift_id = row.shift_id shift_dept_id = row.dept_id shift_dept_name = db_dept.get_dept_name_by_id(db, row.dept_id) leader_id = row.leader_id primary_staff_id = row.primary_staff_id secondary_staff_id = row.secondary_staff_id standby_staff_id = row.standby_staff_id onduty_user = db_user.get_nick_name_by_id(db, row.primary_staff_id) onduty_leader = db_user.get_nick_name_by_id(db, row.leader_id) video_url = "#" call_url = "#" new_detail = OnlineRollCallDetail( pid = new_call_id, shift_id = shift_id, dept_id = shift_dept_id, dept_name = shift_dept_name, onduty_user = onduty_user, onduty_leader = onduty_leader, video_url = video_url, call_url = call_url, ack_status = 0, ack_time = None, create_time = datetime.now(), del_flag = '0', ack_type = 0, leader_id = leader_id, primary_staff_id = primary_staff_id, secondary_staff_id = secondary_staff_id, standby_staff_id = standby_staff_id ) db.add(new_detail) db.commit() # 发送粤政易消息 send_yzy_msg(db, new_detail, user_id) # 检查多次,避免人员为空 user_count = db.query(OnlineRollCallDetail).filter(OnlineRollCallDetail.pid == new_call_id).count() if user_count == 0: db.query(OnlineRollCallBase).filter(OnlineRollCallBase.id == new_call_id).delete() db.commit() raise AppException(500, "暂无相关值班人员信息") db.query(OnlineRollCallBase).filter(OnlineRollCallBase.id == new_call_id).update({"user_count": user_count, "unack_count": user_count}) db.commit() return { "code": 200, "msg": "点名创建成功", "data": new_call_id } except AppException as e: return { "code": e.code, "msg": e.msg } except Exception as e: traceback.print_exc() # 处理异常 raise HTTPException(status_code=500, detail=str(e)) # 结束点名 @router.post("/end") async def end_call( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): call_id = body['call_id'] row = db.query(OnlineRollCallBase).filter(and_(OnlineRollCallBase.id == call_id, OnlineRollCallBase.del_flag == '0')).first() if row is None: return { "code": 500, "msg": "点名记录不存在" } if row.call_status != 1: return { "code": 500, "msg": "点名记录状态已结束" } row.end_time = datetime.now() row.call_status = 2 # 结束 row.update_time = datetime.now() db.commit() return { "code": 200, "msg": "点名结束成功" } # 标记用户已应答 @router.post("/ack") async def ack_all( db: Session = Depends(get_db), body = Depends(remove_xss_json), user_id = Depends(valid_access_token) ): call_id = get_req_param(body, 'call_id') base64_data = get_req_param(body, "photo_data") file_name = new_guid() + ".png" file_path = f'/data/upload/mergefile/uploads/{file_name}' base64_data = base64_data.replace("data:image/png;base64,", "") binary_data = base64.b64decode(base64_data) bytes_io = BytesIO(binary_data) image = Image.open(bytes_io) image.save(file_path) base_row = db.query(OnlineRollCallBase).filter(OnlineRollCallBase.id == call_id).first() if base_row is None: return { "code": 500, "msg": "点名记录不存在" } user_or_where = or_(OnlineRollCallDetail.leader_id == user_id, OnlineRollCallDetail.primary_staff_id == user_id, OnlineRollCallDetail.secondary_staff_id == user_id, OnlineRollCallDetail.standby_staff_id == user_id) detail_row = db.query(OnlineRollCallDetail).filter(and_(OnlineRollCallDetail.pid == call_id, user_or_where)).first() if detail_row is None: return { "code": 500, "msg": "点名记录不存在!" } new_file = OnlineRollCallFile( file_name=f"点名头像{user_id}.png", storage_file_name=file_name, file_path=f'/data/upload/mergefile/uploads/{file_name}', file_size=os.path.getsize(f'/data/upload/mergefile/uploads/{file_name}'), foreign_key=str(detail_row.id), from_scenario="online_call_photo_file", update_time=datetime.now(), create_time=datetime.now(), create_by=user_id, create_dept=0, del_flag='0', status=0, ) db.add(new_file) detail_row.ack_status = 1 # 已应答 detail_row.ack_time = datetime.now() db.commit() logger.info("标记[在线点名]已读 {}", str(detail_row.id)) db_msg_center.update_msg_read(db, user_id, "在线点名", str(detail_row.id)) # 统计应答数 ack_count = db.query(OnlineRollCallDetail).filter(and_(OnlineRollCallDetail.pid == call_id, OnlineRollCallDetail.ack_status == 1)).count() # 统计未应答数 unack_count = db.query(OnlineRollCallDetail).filter(and_(OnlineRollCallDetail.pid == call_id, OnlineRollCallDetail.ack_status == 2)).count() db.query(OnlineRollCallBase).filter(OnlineRollCallBase.id == call_id).update({ "ack_count": ack_count, "unack_count": unack_count, "update_time": datetime.now() }) db.commit() return { "code": 200, "msg": "应答成功" } # 查询是否有我的待应答记录(提醒) @router.post("/mycall") async def query_mycall( db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): is_myid_where = or_(OnlineRollCallDetail.standby_staff_id == user_id, OnlineRollCallDetail.leader_id == user_id, OnlineRollCallDetail.primary_staff_id == user_id, OnlineRollCallDetail.secondary_staff_id == user_id) rows = db.query(OnlineRollCallDetail).filter(and_(is_myid_where, OnlineRollCallDetail.ack_status == 0)).all() data = [] for row in rows: call_id = row.pid base_row = db.query(OnlineRollCallBase).filter(and_(OnlineRollCallBase.id == call_id, OnlineRollCallBase.call_status == 1, OnlineRollCallBase.del_flag == '0')).first() if base_row is not None: data.append({ "id": call_id, "time1": get_datetime_str(base_row.create_time) }) return { "code": 200, "msg": "查询成功", "data": data, "total": len(data) } #应答详情 @router.get('/detail') async def get_event_detail( request: Request, call_id: str = Query(None, description='点名ID'), db: Session = Depends(get_db)): try: base_row = db.query(OnlineRollCallBase).filter(OnlineRollCallBase.id == call_id).first() if base_row is None: return { "code": 500, "msg": "点名记录不存在" } data = get_model_dict(base_row) data['begin_time'] = get_datetime_str(data['create_time']) data['update_time'] = get_datetime_str(data['update_time']) data['duration_time'] = '' # 已应答(结束) if data['call_status'] == 2: # 处理垃圾数据 if base_row.end_time is None: base_row.end_time = datetime.now() db.commit() time_diff = base_row.end_time - base_row.create_time # hours,minutes,seconds = str(time_diff).split(':') data['duration_time'] = str(time_diff) detail_rows = db.query(OnlineRollCallDetail).filter(OnlineRollCallDetail.pid == call_id).all() items = [] for row in detail_rows: detail_info = get_model_dict(row) detail_info['begin_time'] = get_datetime_str(detail_info['create_time']) detail_info['ack_time'] = get_datetime_str(detail_info['ack_time']) detail_info['ack_status_text'] = get_ack_status_text(detail_info['ack_status']) items.append(detail_info) data['items'] = items return { "code": 200, "msg": "查询成功", "data": data } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # 发送粤政易消息 def send_yzy_msg(db: Session, detail_entity: OnlineRollCallDetail, user_id: int) -> None: to_users = [detail_entity.leader_id, detail_entity.primary_staff_id, detail_entity.secondary_staff_id, detail_entity.standby_staff_id] user_list = [] for to_user_id in to_users: user_info = db_user.get_user_info(db, to_user_id) yzy_account = user_info.yzy_account if yzy_account not in user_list: yzy_userid = db_yzy.get_userid_by_account(db, yzy_account) create_time = get_datetime_str(detail_entity.create_time) # detail_url = YzyApi.format_redirect_url("{}/".format(settings.YJXP_WEB_ROOT_PATH)) # 主页 detail_url = "{}{}".format(settings.YZY_WEB_ROOT, "/yjxp/") description = f"你有一条在线点名通知,请尽快确认\n点名时间:{create_time}" data = { "yzy_userid": yzy_userid, "mobile": yzy_account, "content": description, "recorded_by": user_id, "detail_url": detail_url, "foreign_key": detail_entity.id, "from_scenario": "online_roll_call_detail", "title": "在线点名提醒" } YzyApi.add_to_msg_queue(db, data) db_msg_center.add_message(db, "在线点名", to_user_id, "在线点名提醒", "你有一条在线点名通知,请尽快确认", str(detail_entity.id), 'online_roll_call_detail') user_list.append(yzy_account) def get_ack_status_text(ack_status: int) -> str: if ack_status == 0: return '未应答' elif ack_status == 1: return '已接通' elif ack_status == 2: return '呼叫中' else: return str(ack_status) #应答详情 @router.get('/list') async def get_event_list( request: Request, begin_date: str = Query('', description='开始时间'), end_date: str = Query('', description='结束时间'), page: int = Query(1, gt=0, description='页码'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db)): try: # 两周内 d1 = datetime.now() - timedelta(days=14) where = and_(OnlineRollCallBase.del_flag == '0', OnlineRollCallBase.create_time > d1) if begin_date is not None and begin_date != '': begin_date = datetime.strptime(begin_date, "%Y-%m-%d") where = and_(where, OnlineRollCallBase.create_time > begin_date) if end_date is not None and end_date != '': end_date = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) where = and_(where, OnlineRollCallBase.create_time < end_date) print(where) # 计算总条目数 q = db.query(func.count(OnlineRollCallBase.id)) q = q.filter(where) total = q.scalar() # 执行分页查询 q = db.query(OnlineRollCallBase) q = q.filter(where) rows = q.order_by(OnlineRollCallBase.id.desc()).offset((page - 1) * page_size).limit(page_size).all() data = [] for row in rows: duration_time = "" # 已结束 if row.call_status == 2: time_diff = row.end_time - row.create_time # hours,minutes,seconds = str(time_diff).split(':') duration_time = str(time_diff) data.append({ "id": row.id, "begin_time": get_datetime_str(row.create_time), "end_time": get_datetime_str(row.end_time), "duration_time": duration_time, "call_status": row.call_status, "ack_count": row.ack_count, "unack_count": row.unack_count }) # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) #应答列表(我的) @router.get('/ack_list') async def get_event_list( request: Request, begin_date: str = Query('', description='开始时间'), end_date: str = Query('', description='结束时间'), page: int = Query(1, gt=0, description='页码'), page_size: int = Query(10, gt=0, description='pageSize'), db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: is_myid_where = or_(OnlineRollCallDetail.standby_staff_id == user_id, OnlineRollCallDetail.leader_id == user_id, OnlineRollCallDetail.primary_staff_id == user_id, OnlineRollCallDetail.secondary_staff_id == user_id) where = and_(is_myid_where, OnlineRollCallDetail.del_flag == '0') if begin_date is not None and begin_date != '': begin_date = datetime.strptime(begin_date, "%Y-%m-%d") where = and_(where, OnlineRollCallDetail.create_time > begin_date) if end_date is not None and end_date != '': end_date = datetime.strptime(end_date, "%Y-%m-%d") + timedelta(days=1) where = and_(where, OnlineRollCallDetail.create_time < end_date) print(where) # 计算总条目数 q = db.query(func.count(OnlineRollCallDetail.id)) q = q.filter(where) total = q.scalar() # 执行分页查询 q = db.query(OnlineRollCallDetail) q = q.filter(where) rows = q.order_by(OnlineRollCallDetail.id.desc()).offset((page - 1) * page_size).limit(page_size).all() data = [] for row in rows: duration_time = "" img_url = "" # 已应答 if row.ack_status == 1: file_info = db.query(OnlineRollCallFile).filter(and_(OnlineRollCallFile.foreign_key == str(row.id), OnlineRollCallFile.from_scenario == 'online_call_photo_file', OnlineRollCallFile.del_flag == '0')).first() if file_info is not None: img_url = file_info.storage_file_name time_diff = row.ack_time - row.create_time if time_diff.days < 0: duration_time = "超过{}天".format(abs(time_diff.days)) else: hours,minutes,seconds = str(time_diff).split(':') duration_time = str(time_diff) data.append({ "id": row.id, "create_time": get_datetime_str(row.create_time), "duration_time": duration_time, "ack_status": row.ack_status, "img_url": img_url }) # 返回结果 return { "code": 200, "msg": "查询成功", "data": data, "total": total } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) # 点名统计(我的) @router.get("/summary") async def get_call_summary(request: Request, db: Session = Depends(get_db), user_id = Depends(valid_access_token)): is_myid_where = or_(OnlineRollCallDetail.standby_staff_id == user_id, OnlineRollCallDetail.leader_id == user_id, OnlineRollCallDetail.primary_staff_id == user_id, OnlineRollCallDetail.secondary_staff_id == user_id) try: call_count = db.query(OnlineRollCallDetail).filter(and_(is_myid_where, OnlineRollCallDetail.del_flag == '0')).count() ack_count = db.query(OnlineRollCallDetail).filter(and_(is_myid_where, OnlineRollCallDetail.del_flag == '0', OnlineRollCallDetail.ack_status == 1)).count() unack_count = db.query(OnlineRollCallDetail).filter(and_(is_myid_where, OnlineRollCallDetail.del_flag == '0', OnlineRollCallDetail.ack_status == 0)).count() data = { "call_count": call_count, "ack_count": ack_count, "unack_count": unack_count } return { "code": 200, "msg": "查询成功", "data": data } except Exception as e: # 处理异常 traceback.print_exc() raise HTTPException(status_code=500, detail=str(e)) @router.get("/export", response_class=FileResponse) async def get_export_xslx( request: Request, call_id: str, db: Session = Depends(get_db)): # 导出excel表 wb = openpyxl.Workbook() ws = wb.active ws.cell(1, 1).value = "部门名称" ws.cell(1, 2).value = "值班领导" ws.cell(1, 3).value = "值班员" ws.cell(1, 4).value = "点名开始时间" ws.cell(1, 5).value = "应答状态" ws.cell(1, 6).value = "应答时间" q = db.query(OnlineRollCallDetail) q = q.filter(and_(OnlineRollCallDetail.pid == call_id, OnlineRollCallDetail.del_flag == '0')).order_by(OnlineRollCallDetail.ack_status.desc(), OnlineRollCallDetail.ack_time.asc()) rows = q.all() excel_row = 2 for row in rows: ws.cell(excel_row, 1).value = row.dept_name ws.cell(excel_row, 2).value = row.onduty_leader ws.cell(excel_row, 3).value = row.onduty_user ws.cell(excel_row, 4).value = get_datetime_str(row.create_time) ws.cell(excel_row, 5).value = get_ack_status_text(row.ack_status) ws.cell(excel_row, 6).value = get_datetime_str(row.ack_time) excel_row = excel_row + 1 upload_path = '/data/download/mergefile' if os.path.exists(upload_path) == False: os.makedirs(upload_path) excel_file_name = new_guid() +".xlsx" save_path = os.path.join(upload_path, excel_file_name) wb.save(save_path) wb.close() return FileResponse(save_path)