123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from sqlalchemy.orm import Session
- from sqlalchemy import text, exists, and_, or_, not_
- from sqlalchemy.sql import func
- from database import get_db
- from models import *
- from extensions import logger
- from utils import *
- import random
- from . import db_user
- import copy
- from sqlalchemy import text, exists, and_, or_, not_
- EVENT_TRACK_LIST = [
- {"title": "事件登记", "event_status": "0", "tracking_time": "", "event_level": "", "recorded_by": ""},
- {"title": "进入指挥", "event_status": "1", "tracking_time": "", "event_level": "", "recorded_by": ""},
- {"title": "结束指挥", "event_status": "2", "tracking_time": "", "event_level": "", "recorded_by": ""},
- {"title": "关闭事件", "event_status": "3", "tracking_time": "", "event_level": "", "recorded_by": ""}
- ]
- def get_next_event_id(db: Session):
- while True:
- random_10_digit_number = random.randint(1000000000, 9999999999)
- eventId = 'YJSJ' + str(random_10_digit_number)
-
- it_exists = db.query(
- exists().where( EventBase.event_code == eventId )
- ).scalar()
- if it_exists == False:
- return eventId
-
- def get_event_status_track(db: Session, event_id: int):
- print(event_id)
- trace_list = copy.deepcopy(EVENT_TRACK_LIST)
- for n in trace_list:
- row = db.query(EventTracking).filter(and_(EventTracking.del_flag == '0', EventTracking.event_id == event_id, EventTracking.event_status == n['event_status'])).first()
- if row is not None:
- n['tracking_time'] = row.tracking_time.strftime("%m月%d日 %H:%M")
- n['event_level'] = row.event_level
- row = db.query(EventBase).filter(EventBase.id == event_id).first()
- active = int(row.event_status) + 1
- data = [
- {
- "title": n['title'],
- "description": n['tracking_time']
- }
- for n in trace_list
- ]
- return {
- "active": active,
- "items" : data
- }
- def get_event_level_track(db: Session, event_id: int):
- trace_list = copy.deepcopy(EVENT_TRACK_LIST)
- for n in trace_list:
- row = db.query(EventTracking).filter(and_(EventTracking.del_flag == '0', EventTracking.event_id == event_id, EventTracking.event_status == n['event_status'])).first()
- if row is not None:
- n['tracking_time'] = row.tracking_time.strftime("%Y/%m/%d %H:%M:%S")
- n['event_level'] = row.event_level
- n['recorded_by'] = row.recorded_by
- data = []
- for n in trace_list:
- if n['tracking_time'] != "":
- data.append({
- "name": db_user.get_nick_name_by_id(db, n['recorded_by']),
- "timestamp": n['tracking_time'],
- "level": n['event_level']
- })
- return data
- def get_summary_file_list(db: Session, event_id: int):
- rows = db.query(EventFile).filter(and_(EventFile.del_flag == '0', EventFile.event_id == event_id)).all()
- data = [
- {
- "file_name": row.file_name,
- # "url": "/api/event_management/event/download_file?file_id="+str(row.id)+"&event_id="+str(row.event_id)
- "url": row.storage_file_name
- }
- for row in rows
- ]
- return data
- def get_emergency_notify_count(db: Session, event_id: str):
- return db.query(EventEmergencyNotify).filter(EventEmergencyNotify.event_id == event_id).count()
- def get_event_title(db: Session, event_code: str) -> str:
- row = db.query(EventBase).filter(EventBase.event_code == event_code).first()
- if row is not None:
- event_title = row.event_title
- if event_title is None or event_title == '':
- event_title = '[暂无事件标题]'
- return event_title
- else:
- return event_code
-
- def get_briefing_file_list(db: Session, event_id: int):
- rows = db.query(EventFile).filter(and_(EventFile.del_flag == '0', EventFile.event_id == event_id)).all()
- data = [
- {
- "uid": row.id,
- "file_name": row.file_name,
- "url": row.storage_file_name
- }
- for row in rows
- ]
- return data
|