123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from sqlalchemy.orm import Session
- from sqlalchemy import text, exists, and_, or_, not_
- from sqlalchemy.sql import func
- from database import get_db
- from extensions import logger
- import random
- from models import *
- from utils import *
- def get_next_event_id(db: Session):
- while True:
- random_10_digit_number = random.randint(1000000000, 9999999999)
- taskId = 'task' + str(random_10_digit_number)
- it_exists = db.query(
- exists().where(TaskRegistration.task_id == taskId)
- ).scalar()
- if it_exists == False:
- return taskId
-
- def get_image_file_list(db: Session, task_id: str):
- row = db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).first()
- if row is None:
- return []
- task_id = row.task_id
- file_query = db.query(TaskFile)
- file_query = file_query.filter(TaskFile.del_flag != '2')
- file_query = file_query.filter(TaskFile.from_scenario == 'task_img')
- file_query = file_query.filter(TaskFile.foreign_key == str(row.id))
- files = file_query.all()
- result = [{
- "uid": file.file_id,
- "status": file.status,
- "name": file.file_name_desc,
- "url": file.file_name
- } for file in files]
- return result
- def get_task_file_list(db: Session, task_id: str):
- row = db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).first()
- if row is None:
- return []
- task_id = row.task_id
- file_query = db.query(TaskFile)
- file_query = file_query.filter(TaskFile.del_flag != '2')
- file_query = file_query.filter(TaskFile.from_scenario == 'task_file')
- file_query = file_query.filter(TaskFile.foreign_key == str(row.id))
- files = file_query.all()
- result = [{
- "uid": file.file_id,
- "status": file.status,
- "name": file.file_name_desc,
- "url": file.file_name
- } for file in files]
- return result
- def get_task_feeback_file_list(db: Session, task_id: str):
- file_query = db.query(TaskFile)
- file_query = file_query.filter(TaskFile.del_flag != '2')
- file_query = file_query.filter(TaskFile.from_scenario == 'task_feeback')
- file_query = file_query.filter(TaskFile.foreign_key == task_id)
- files = file_query.all()
- result = [{
- "uid": file.file_id,
- "status": file.status,
- "name": file.file_name_desc,
- "url": file.file_name
- } for file in files]
- return result
- def get_task_type_text(task_type: str) -> str:
- if task_type == '0':
- return '事件处置'
- elif task_type == '1':
- return '防范措施'
- elif task_type == '2':
- return '险情处理'
- elif task_type == '3':
- return '督办任务'
- elif task_type == '4':
- return '现场人员'
- elif task_type == '5':
- return '手机工作台'
- elif task_type == '6':
- return '救援任务'
- else:
- return str(task_type)
-
- def get_task_feebacks(db: Session, task_id: str):
- rows = db.query(TaskFeeback).filter(and_(TaskFeeback.task_id == task_id)).all()
- data = []
- for row in rows:
- info = get_model_dict(row)
- info['create_time'] = get_datetime_str(row.create_time)
- info['approval_time'] = get_datetime_str(row.approval_time)
- info['fileList'] = get_task_feeback_file_list(db, task_id)
- data.append(info)
-
- return data
- # 任务发布后半个小时内没反馈记录就需要alert
- def need_task_alert(db: Session, task_id: str):
- rows = db.query(TaskFeeback).filter(and_(TaskFeeback.task_id == task_id)).all()
- if len(rows) > 0:
- return False
- row = db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).first()
- alert_time = row.creation_time + timedelta(seconds=60 * 30) # 半个小时后
- return datetime.now() > alert_time
|