#!/usr/bin/env python3 # -*- coding: utf-8 -*- from apscheduler.schedulers.base import BaseScheduler from apscheduler.triggers.cron import CronTrigger from datetime import datetime, timedelta from utils import * from models import * from pprint import pprint from .yzy_job import proc as yzy_proc, yzy_msg_queue_proc from .rainfall_conditions_job import proc as rainfall_proc from .avcon_job import proc as avcon_proc from .duty_job import proc as duty_proc from .vehicle_job import proc as vehicle_proc from .hkvideo_job import proc as hkvideo_proc from common.security import encrypt_password from common.TassApi import * def register_jobs(scheduler: BaseScheduler): encrptData = TransparentEnc("将原数据按密评要求加密辅助类") print("TransparentDec:", TransparentDec(encrptData)) # scheduler.add_job(yzy_proc, next_run_time=(datetime.now() + timedelta(seconds=3))) # scheduler.add_job(yzy_proc, CronTrigger.from_crontab('0 */5 * * *')) # scheduler.add_job(yzy_msg_queue_proc, CronTrigger.from_crontab('* * * * *')) scheduler.add_job(rainfall_proc, CronTrigger.from_crontab('0 10 * * *')) # scheduler.add_job(wdyy_proc, next_run_time=(datetime.now() + timedelta(seconds=3))) # scheduler.add_job(wdyy_proc, CronTrigger.from_crontab('0 * * * *')) # scheduler.add_job(wdgh_proc, next_run_time=(datetime.now() + timedelta(seconds=13))) # scheduler.add_job(wdyy_proc, CronTrigger.from_crontab('0 * * * *')) # 暂时不用了 # scheduler.add_job(avcon_proc, next_run_time=(datetime.now() + timedelta(seconds=15))) # scheduler.add_job(avcon_proc, CronTrigger.from_crontab('0 0 * * *')) # 值班提醒推送 scheduler.add_job(duty_proc, next_run_time=(datetime.now() + timedelta(seconds=3))) scheduler.add_job(duty_proc, CronTrigger.from_crontab('0 0 * * *')) # 车辆数据更新 # scheduler.add_job(vehicle_proc, next_run_time=(datetime.now() + timedelta(seconds=3))) # scheduler.add_job(vehicle_proc, CronTrigger.from_crontab('*/5 * * * *')) # 视频状态更新 scheduler.add_job(hkvideo_proc, next_run_time=(datetime.now() + timedelta(seconds=9))) scheduler.add_job(hkvideo_proc, CronTrigger.from_crontab('0 1 * * *')) def tick(): print(datetime.now())