#!/usr/bin/env python3 # -*- coding: utf-8 -*- from sqlalchemy.orm import Session from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.sql import func from database import get_db from models import * from extensions import logger from utils import * import random from . import db_user import copy from sqlalchemy import text, exists, and_, or_, not_ EVENT_TRACK_LIST = [ {"title": "事件登记", "event_status": "0", "tracking_time": "", "event_level": "", "recorded_by": ""}, {"title": "进入指挥", "event_status": "1", "tracking_time": "", "event_level": "", "recorded_by": ""}, {"title": "结束指挥", "event_status": "2", "tracking_time": "", "event_level": "", "recorded_by": ""}, {"title": "关闭事件", "event_status": "3", "tracking_time": "", "event_level": "", "recorded_by": ""} ] def get_next_event_id(db: Session): while True: random_10_digit_number = random.randint(1000000000, 9999999999) eventId = 'YJSJ' + str(random_10_digit_number) it_exists = db.query( exists().where( EventBase.event_code == eventId ) ).scalar() if it_exists == False: return eventId def get_event_status_track(db: Session, event_id: int): print(event_id) trace_list = copy.deepcopy(EVENT_TRACK_LIST) for n in trace_list: row = db.query(EventTracking).filter(and_(EventTracking.del_flag == '0', EventTracking.event_id == event_id, EventTracking.event_status == n['event_status'])).first() if row is not None: n['tracking_time'] = row.tracking_time.strftime("%m月%d日 %H:%M") n['event_level'] = row.event_level row = db.query(EventBase).filter(EventBase.id == event_id).first() active = int(row.event_status) + 1 data = [ { "title": n['title'], "description": n['tracking_time'] } for n in trace_list ] return { "active": active, "items" : data } def get_event_level_track(db: Session, event_id: int): trace_list = copy.deepcopy(EVENT_TRACK_LIST) for n in trace_list: row = db.query(EventTracking).filter(and_(EventTracking.del_flag == '0', EventTracking.event_id == event_id, EventTracking.event_status == n['event_status'])).first() if row is not None: n['tracking_time'] = row.tracking_time.strftime("%Y/%m/%d %H:%M:%S") n['event_level'] = row.event_level n['recorded_by'] = row.recorded_by data = [] for n in trace_list: if n['tracking_time'] != "": data.append({ "name": db_user.get_nick_name_by_id(db, n['recorded_by']), "timestamp": n['tracking_time'], "level": n['event_level'] }) return data def get_summary_file_list(db: Session, event_id: int): rows = db.query(EventFile).filter(and_(EventFile.del_flag == '0', EventFile.event_id == event_id)).all() data = [ { "file_name": row.file_name, # "url": "/api/event_management/event/download_file?file_id="+str(row.id)+"&event_id="+str(row.event_id) "url": row.storage_file_name } for row in rows ] return data def get_emergency_notify_count(db: Session, event_id: str): return db.query(EventEmergencyNotify).filter(EventEmergencyNotify.event_id == event_id).count() def get_event_title(db: Session, event_code: str) -> str: row = db.query(EventBase).filter(EventBase.event_code == event_code).first() if row is not None: event_title = row.event_title if event_title is None or event_title == '': event_title = '[暂无事件标题]' return event_title else: return event_code def get_briefing_file_list(db: Session, event_id: int): rows = db.query(EventFile).filter(and_(EventFile.del_flag == '0', EventFile.event_id == event_id)).all() data = [ { "uid": row.id, "file_name": row.file_name, "url": row.storage_file_name } for row in rows ] return data