#!/usr/bin/env python3 # -*- coding: utf-8 -*- from sqlalchemy.orm import Session from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.sql import func from database import get_db from extensions import logger import random from models import * def get_next_event_id(db: Session): while True: random_10_digit_number = random.randint(1000000000, 9999999999) taskId = 'task' + str(random_10_digit_number) it_exists = db.query( exists().where(TaskRegistration.task_id == taskId) ).scalar() if it_exists == False: return taskId def get_image_file_list(db: Session, task_id: str): row = db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).first() if row is None: return [] task_id = row.task_id file_query = db.query(TaskFile) file_query = file_query.filter(TaskFile.del_flag != '2') file_query = file_query.filter(TaskFile.from_scenario == 'task_img') file_query = file_query.filter(TaskFile.foreign_key == str(row.id)) files = file_query.all() result = [{ "uid": file.file_id, "status": file.status, "name": file.file_name_desc, "url": file.file_name } for file in files] return result def get_task_file_list(db: Session, task_id: str): row = db.query(TaskRegistration).filter(TaskRegistration.task_id == task_id).first() if row is None: return [] task_id = row.task_id file_query = db.query(TaskFile) file_query = file_query.filter(TaskFile.del_flag != '2') file_query = file_query.filter(TaskFile.from_scenario == 'task_file') file_query = file_query.filter(TaskFile.foreign_key == str(row.id)) files = file_query.all() result = [{ "uid": file.file_id, "status": file.status, "name": file.file_name_desc, "url": file.file_name } for file in files] return result