#!/usr/bin/env python3 # -*- coding: utf-8 -*- from sqlalchemy.orm import Session from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.sql import func from database import get_db from extensions import logger import random from models import * from utils import * def get_next_event_id(db: Session): while True: random_10_digit_number = random.randint(1000000000, 9999999999) taskId = 'task' + str(random_10_digit_number) it_exists = db.query( exists().where(TaskRegistration.task_id == taskId) ).scalar() if it_exists == False: return taskId def get_image_file_list(db: Session, task_id: str): row = db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).first() if row is None: return [] task_id = row.task_id file_query = db.query(TaskFile) file_query = file_query.filter(TaskFile.del_flag != '2') file_query = file_query.filter(TaskFile.from_scenario == 'task_img') file_query = file_query.filter(TaskFile.foreign_key == str(row.id)) files = file_query.all() result = [{ "uid": file.file_id, "status": file.status, "name": file.file_name_desc, "url": file.file_name } for file in files] return result def get_task_file_list(db: Session, task_id: str): row = db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).first() if row is None: return [] task_id = row.task_id file_query = db.query(TaskFile) file_query = file_query.filter(TaskFile.del_flag != '2') file_query = file_query.filter(TaskFile.from_scenario == 'task_file') file_query = file_query.filter(TaskFile.foreign_key == str(row.id)) files = file_query.all() result = [{ "uid": file.file_id, "status": file.status, "name": file.file_name_desc, "url": file.file_name } for file in files] return result def get_task_feeback_file_list(db: Session, task_id: str): file_query = db.query(TaskFile) file_query = file_query.filter(TaskFile.del_flag != '2') file_query = file_query.filter(TaskFile.from_scenario == 'task_feeback') file_query = file_query.filter(TaskFile.foreign_key == task_id) files = file_query.all() result = [{ "uid": file.file_id, "status": file.status, "name": file.file_name_desc, "url": file.file_name } for file in files] return result def get_task_type_text(task_type: str) -> str: if task_type == '0': return '事件处置' elif task_type == '1': return '防范措施' elif task_type == '2': return '险情处理' elif task_type == '3': return '督办任务' elif task_type == '4': return '现场人员' elif task_type == '5': return '手机工作台' elif task_type == '6': return '救援任务' else: return str(task_type) def get_task_feebacks(db: Session, task_id: str): rows = db.query(TaskFeeback).filter(and_(TaskFeeback.task_id == task_id)).all() data = [] for row in rows: info = get_model_dict(row) info['create_time'] = get_datetime_str(row.create_time) info['approval_time'] = get_datetime_str(row.approval_time) info['fileList'] = get_task_feeback_file_list(db, task_id) data.append(info) return data # 任务发布后半个小时内没反馈记录就需要alert def need_task_alert(db: Session, task_id: str): rows = db.query(TaskFeeback).filter(and_(TaskFeeback.task_id == task_id)).all() if len(rows) > 0: return False row = db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).first() alert_time = row.creation_time + timedelta(seconds=60 * 30) # 半个小时后 return datetime.now() > alert_time