1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from apscheduler.schedulers.base import BaseScheduler
- from apscheduler.triggers.cron import CronTrigger
- from datetime import datetime, timedelta
- from utils import *
- from models import *
- from pprint import pprint
- from .yzy_job import proc as yzy_proc, yzy_msg_queue_proc
- from .rainfall_conditions_job import proc as rainfall_proc
- from .avcon_job import proc as avcon_proc
- from .duty_job import proc as duty_proc
- from .vehicle_job import proc as vehicle_proc
- from .hkvideo_job import proc as hkvideo_proc
- from .sign_data_job import sign_data_proc
- from .yst_job import daiban_proc as yst_daiban_proc
- def register_jobs(scheduler: BaseScheduler):
- # scheduler.add_job(yzy_proc, next_run_time=(datetime.now() + timedelta(seconds=3)))
- # scheduler.add_job(yzy_proc, CronTrigger.from_crontab('0 */5 * * *'))
- # scheduler.add_job(yzy_msg_queue_proc, CronTrigger.from_crontab('* * * * *'))
- scheduler.add_job(rainfall_proc, CronTrigger.from_crontab('0 10 * * *'))
- # scheduler.add_job(wdyy_proc, next_run_time=(datetime.now() + timedelta(seconds=3)))
- # scheduler.add_job(wdyy_proc, CronTrigger.from_crontab('0 * * * *'))
- # scheduler.add_job(yst_daiban_proc, next_run_time=(datetime.now() + timedelta(seconds=13)))
- scheduler.add_job(yst_daiban_proc, CronTrigger.from_crontab('* * * * *'))
- # 暂时不用了
- # scheduler.add_job(avcon_proc, next_run_time=(datetime.now() + timedelta(seconds=15)))
- # scheduler.add_job(avcon_proc, CronTrigger.from_crontab('0 0 * * *'))
- # 值班提醒推送
- scheduler.add_job(duty_proc, next_run_time=(datetime.now() + timedelta(seconds=3)))
- scheduler.add_job(duty_proc, CronTrigger.from_crontab('0 0 * * *'))
- # 车辆数据更新
- # scheduler.add_job(vehicle_proc, next_run_time=(datetime.now() + timedelta(seconds=3)))
- # scheduler.add_job(vehicle_proc, CronTrigger.from_crontab('*/5 * * * *'))
- # 视频状态更新
- # scheduler.add_job(hkvideo_proc, next_run_time=(datetime.now() + timedelta(seconds=19)))
- scheduler.add_job(hkvideo_proc, CronTrigger.from_crontab('0 1 * * *'))
- # 数据库关键数据加密
- scheduler.add_job(sign_data_proc, next_run_time=(datetime.now() + timedelta(seconds=3)))
- scheduler.add_job(sign_data_proc, CronTrigger.from_crontab('0 2 * * *'))
- def tick():
- print(datetime.now())
|