db_event_management.py 3.1 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from sqlalchemy.orm import Session
  4. from sqlalchemy import text, exists, and_, or_, not_
  5. from sqlalchemy.sql import func
  6. from database import get_db
  7. from models import *
  8. from extensions import logger
  9. from utils import *
  10. import random
  11. from . import db_user
  12. import copy
  13. EVENT_TRACK_LIST = [
  14. {"title": "事件登记", "event_status": "0", "tracking_time": "", "event_level": "", "recorded_by": ""},
  15. {"title": "进入指挥", "event_status": "1", "tracking_time": "", "event_level": "", "recorded_by": ""},
  16. {"title": "结束指挥", "event_status": "2", "tracking_time": "", "event_level": "", "recorded_by": ""},
  17. {"title": "关闭事件", "event_status": "3", "tracking_time": "", "event_level": "", "recorded_by": ""}
  18. ]
  19. def get_next_event_id(db: Session):
  20. while True:
  21. random_10_digit_number = random.randint(1000000000, 9999999999)
  22. eventId = 'YJSJ' + str(random_10_digit_number)
  23. it_exists = db.query(
  24. exists().where( EventBase.event_code == eventId )
  25. ).scalar()
  26. if it_exists == False:
  27. return eventId
  28. def get_event_status_track(db: Session, event_id: int):
  29. print(event_id)
  30. trace_list = copy.deepcopy(EVENT_TRACK_LIST)
  31. for n in trace_list:
  32. row = db.query(EventTracking).filter(and_(EventTracking.del_flag == '0', EventTracking.event_id == event_id, EventTracking.event_status == n['event_status'])).first()
  33. if row is not None:
  34. n['tracking_time'] = row.tracking_time.strftime("%m月%d日 %H:%M")
  35. n['event_level'] = row.event_level
  36. row = db.query(EventBase).filter(EventBase.id == event_id).first()
  37. active = int(row.event_status) + 1
  38. data = [
  39. {
  40. "title": n['title'],
  41. "description": n['tracking_time']
  42. }
  43. for n in trace_list
  44. ]
  45. return {
  46. "active": active,
  47. "items" : data
  48. }
  49. def get_event_level_track(db: Session, event_id: int):
  50. trace_list = copy.deepcopy(EVENT_TRACK_LIST)
  51. for n in trace_list:
  52. row = db.query(EventTracking).filter(and_(EventTracking.del_flag == '0', EventTracking.event_id == event_id, EventTracking.event_status == n['event_status'])).first()
  53. if row is not None:
  54. n['tracking_time'] = row.tracking_time.strftime("%Y/%m/%d %H:%M:%S")
  55. n['event_level'] = row.event_level
  56. n['recorded_by'] = row.recorded_by
  57. data = []
  58. for n in trace_list:
  59. if n['tracking_time'] != "":
  60. data.append({
  61. "name": db_user.get_nick_name_by_id(db, n['recorded_by']),
  62. "timestamp": n['tracking_time'],
  63. "level": n['event_level']
  64. })
  65. return data
  66. def get_summary_file_list(db: Session, event_id: int):
  67. rows = db.query(EventFile).filter(and_(EventFile.del_flag == '0', EventFile.event_id == event_id)).all()
  68. data = [
  69. {
  70. "file_name": row.file_name,
  71. "url": "/api/event_management/event/download_file?file_id="+str(row.id)+"&event_id="+str(row.event_id)
  72. }
  73. for row in rows
  74. ]
  75. return data