#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends,Query, HTTPException, status,BackgroundTasks from common.security import valid_access_token from fastapi.responses import JSONResponse from sqlalchemy.orm import Session from sqlalchemy import and_, or_ from pydantic import BaseModel from datetime import datetime, timedelta from dateutil.relativedelta import relativedelta from database import get_db from typing import List from models import * from utils import * import json import traceback router = APIRouter() def get_area_num(db,rang): query = db.query(GovdataArea) if rang=='0': return 1 elif rang=='1': query = query.filter(and_(GovdataArea.area_code.like('%000000'),GovdataArea.area_code.notlike('%00000000'))) return query.count() elif rang=='2': query = query.filter(and_(GovdataArea.area_code.like('%000'),GovdataArea.area_code.notlike('%000000'))) return query.count() elif rang=='3': query = query.filter(GovdataArea.area_code.notlike('%000')) return query.count() else: return 0 def get_task_date_list(cycle,start_date,end_date,corn_query): # 生成日期列表 date_list = [] if cycle=='4': #仅一次,返回当天 date_list.append(datetime.today()) return date_list elif cycle=='3': #每天 current_date = start_date while current_date <= end_date: # 将日期添加到列表中 date_list.append(current_date) # 增加一天 current_date += timedelta(days=1) return date_list elif cycle=='2': #每周 days_ahead = (7+int(corn_query) - start_date.weekday()) % 7 current_date = start_date + timedelta(days=days_ahead) while current_date <= end_date: # 将日期添加到列表中 date_list.append(current_date) # 增加七天 current_date += timedelta(days=7) return date_list elif cycle=='1': #每月 days_ahead = datetime.strptime(start_date.strftime("%Y-%m-")+corn_query, "%Y-%m-%d") if days_ahead>start_date: current_date = days_ahead else: current_date = days_ahead+relativedelta(months=1) while current_date <= end_date: # 将日期添加到列表中 date_list.append(current_date) # 增加七天 current_date += relativedelta(months=1) return date_list elif cycle=='0': #每月 d,m = corn_query.split() days_ahead = datetime.strptime(start_date.strftime("%Y-")+m+'-'+d, "%Y-%m-%d") if days_ahead>start_date: current_date = days_ahead else: current_date = days_ahead+relativedelta(years=1) while current_date <= end_date: # 将日期添加到列表中 date_list.append(current_date) # 增加七天 current_date += relativedelta(years=1) return date_list def create_children_task(db,task_info,corn_query): cycle = task_info.inspection_cycle task_range = task_info.inspection_range for t in get_task_date_list(cycle,task_info.start_time,task_info.end_time,corn_query): new_children_task = RiskManagementInspectionTaskChildrenTask( id = new_guid(), task_id= task_info.id, task_number= task_info.task_number, type = task_info.inspection_business, tsak_time = t, cycle=cycle, task_range = task_range, task_num=get_area_num(db,task_range), create_by=task_info.create_by ) db.add(new_children_task) db.commit() @router.get('/list') async def get_inspection_task_list( business: str = Query(None, description='巡查业务'), cycle :str = Query(None, description='巡查周期'), page: int = Query(1, gt=0, description='页码'), pageSize: int = Query(10, gt=0, description='每页条目数量'), db: Session = Depends(get_db), user_id = Depends(valid_access_token) ): try: # 构建查询 query = db.query(RiskManagementInspectionTask) query = query.filter(RiskManagementInspectionTask.del_flag != '2') # 应用查询条件 if business: query = query.filter(RiskManagementInspectionTask.inspection_business == business) if cycle: query = query.filter(RiskManagementInspectionTask.inspection_cycle == cycle) # 计算总条目数 total_items = query.count() # 排序 query = query.order_by(RiskManagementInspectionTask.create_time.desc()) # 执行分页查询 InspectionTasks = query.offset((page - 1) * pageSize).limit(pageSize).all() # 将查询结果转换为列表形式的字典 InspectionTasks_list = [] for task in InspectionTasks: if task.task_status=='3': task_status = '3' #'已完结' else: if datetime.now()