#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends,Query, HTTPException, status from common.security import valid_access_token from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from pydantic import BaseModel from datetime import datetime from database import get_db from typing import List from models import * from utils import * import json import traceback router = APIRouter() @router.get('/list') async def get_inspection_task_list( type: str = Query(None, description='类型'), cycle :str = Query(None, description='周期'), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 构建查询 query = db.query(RiskManagementRiskTask) query = query.filter(RiskManagementRiskTask.del_flag != '2') # 应用查询条件 if type: query = query.filter(RiskManagementRiskTask.risk_type == type) if cycle: query = query.filter(RiskManagementRiskTask.task_cycle == cycle) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(RiskManagementRiskTask.create_time.desc()) # 执行分页查询 RiskTasks = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 RiskTasks_list = [] for task in RiskTasks: if task.task_status=='3': task_status = '3' #'已完结' else: if datetime.now()