#!/usr/bin/env python3 # -*- coding: utf-8 -*- from apscheduler.schedulers.base import BaseScheduler from apscheduler.triggers.cron import CronTrigger from datetime import datetime, timedelta from utils import * from models import * from .yzy_job import proc as yzy_proc, yzy_msg_queue_proc from .rainfall_conditions_job import proc as rainfall_proc from .avcon_job import proc as avcon_proc def register_jobs(scheduler: BaseScheduler): scheduler.add_job(yzy_proc, next_run_time=(datetime.now() + timedelta(seconds=3))) # scheduler.add_job(yzy_proc, CronTrigger.from_crontab('0 */5 * * *')) scheduler.add_job(yzy_msg_queue_proc, CronTrigger.from_crontab('* * * * *')) scheduler.add_job(rainfall_proc, CronTrigger.from_crontab('0 10 * * *')) # scheduler.add_job(wdyy_proc, next_run_time=(datetime.now() + timedelta(seconds=3))) # scheduler.add_job(wdyy_proc, CronTrigger.from_crontab('0 * * * *')) # scheduler.add_job(wdgh_proc, next_run_time=(datetime.now() + timedelta(seconds=13))) # scheduler.add_job(wdyy_proc, CronTrigger.from_crontab('0 * * * *')) # scheduler.add_job(avcon_proc, next_run_time=(datetime.now() + timedelta(seconds=15))) scheduler.add_job(avcon_proc, CronTrigger.from_crontab('0 0 * * *')) def tick(): print(datetime.now())