db_task.py 2.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from sqlalchemy.orm import Session
  4. from sqlalchemy import text, exists, and_, or_, not_
  5. from sqlalchemy.sql import func
  6. from database import get_db
  7. from extensions import logger
  8. import random
  9. from models import *
  10. def get_next_event_id(db: Session):
  11. while True:
  12. random_10_digit_number = random.randint(1000000000, 9999999999)
  13. taskId = 'task' + str(random_10_digit_number)
  14. it_exists = db.query(
  15. exists().where(TaskRegistration.task_id == taskId)
  16. ).scalar()
  17. if it_exists == False:
  18. return taskId
  19. def get_image_file_list(db: Session, task_id: str):
  20. row = db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).first()
  21. if row is None:
  22. return []
  23. task_id = row.task_id
  24. file_query = db.query(TaskFile)
  25. file_query = file_query.filter(TaskFile.del_flag != '2')
  26. file_query = file_query.filter(TaskFile.from_scenario == 'task_img')
  27. file_query = file_query.filter(TaskFile.foreign_key == str(row.id))
  28. files = file_query.all()
  29. result = [{
  30. "uid": file.file_id,
  31. "status": file.status,
  32. "name": file.file_name_desc,
  33. "url": file.file_name
  34. } for file in files]
  35. return result
  36. def get_task_file_list(db: Session, task_id: str):
  37. row = db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).first()
  38. if row is None:
  39. return []
  40. task_id = row.task_id
  41. file_query = db.query(TaskFile)
  42. file_query = file_query.filter(TaskFile.del_flag != '2')
  43. file_query = file_query.filter(TaskFile.from_scenario == 'task_file')
  44. file_query = file_query.filter(TaskFile.foreign_key == str(row.id))
  45. files = file_query.all()
  46. result = [{
  47. "uid": file.file_id,
  48. "status": file.status,
  49. "name": file.file_name_desc,
  50. "url": file.file_name
  51. } for file in files]
  52. return result
  53. def get_task_type_text(task_type: str) -> str:
  54. if task_type == '0':
  55. return '事件处置'
  56. elif task_type == '1':
  57. return '防范措施'
  58. elif task_type == '2':
  59. return '险情处理'
  60. elif task_type == '3':
  61. return '督办任务'
  62. else:
  63. return str(task_type)