Browse Source

250112-1代码。

baoyubo 4 months ago
parent
commit
13b0f762b7
2 changed files with 183 additions and 0 deletions
  1. 5 0
      jobs/__init__.py
  2. 178 0
      jobs/vehicle_job.py

+ 5 - 0
jobs/__init__.py

@@ -11,6 +11,7 @@ from .yzy_job import proc as yzy_proc, yzy_msg_queue_proc
 from .rainfall_conditions_job import proc as rainfall_proc
 from .avcon_job import proc as avcon_proc
 from .duty_job import proc as duty_proc
+from .vehicle_job import proc as vehicle_proc
 
 def register_jobs(scheduler: BaseScheduler):
     scheduler.add_job(yzy_proc, next_run_time=(datetime.now() + timedelta(seconds=3)))
@@ -31,5 +32,9 @@ def register_jobs(scheduler: BaseScheduler):
     scheduler.add_job(duty_proc, next_run_time=(datetime.now() + timedelta(seconds=3)))
     scheduler.add_job(duty_proc, CronTrigger.from_crontab('0 0 * * *'))
 
+    # 车辆数据更新
+    scheduler.add_job(vehicle_proc, next_run_time=(datetime.now() + timedelta(seconds=3)))
+    scheduler.add_job(vehicle_proc, CronTrigger.from_crontab('*/5 * * * *'))
+
 def tick():
     print(datetime.now())

+ 178 - 0
jobs/vehicle_job.py

@@ -0,0 +1,178 @@
+#!/usr/bin/env python3
+# -*- coding: utf-8 -*-
+from datetime import datetime, timedelta
+from sqlalchemy.orm import Session
+from sqlalchemy import  text
+from common.BigDataCenterAPI import *
+from utils import *
+from utils.redis_util import *
+from models import *
+from exceptions import *
+from database import get_db_local
+from extensions import logger
+import hashlib
+import time
+import requests
+import json
+
+# 值班任务
+# 每30秒将当天值班人员推送一次
+def proc():
+
+    lock_key = "vehicle_job_proc"
+    if redis_lock(lock_key):
+        logger.info(datetime.now())
+
+        db = get_db_local()
+
+        # 删除超过一天的轨迹记录
+        delete_old_vehicle_trajectory(db)
+        # 获取车辆列表
+        data = get_data(url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/gps/inmm',params1={})
+
+        add_vehicle_trajectory(db, data)
+        # 删除车辆列表
+        delete_old_vehicle_list(db)
+
+        # 新增非列表内车辆信息(查询详情)
+        for info in data:
+            add_vehicle_list_INFO(db, info)
+            if len(get_vehicle_INFO(db,info['vehicleNo'])) ==0:
+                vehicle_info = get_data(url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/vecInfo',params1={"vecNo":info['vehicleNo']})
+
+                if vehicle_info:
+                    # print(vehicle_info)
+                    add_vehicle_INFO(db, vehicle_info)
+        db.commit()
+
+        db.close()
+        redis_unlock(lock_key)
+
+
+def get_sign(input_string:str)-> str:
+    # 待签名的字符串
+    # input_string = "123456appIdtesttimestamp1561360165vno浙B79306123456"
+    # 创建一个SHA1哈希对象
+    hash_object = hashlib.sha1()
+    # 更新哈希对象,传入待签名字符串的字节表示
+    hash_object.update(input_string.encode('utf-8'))
+    # 获取十六进制的编码串(大写)
+    hex_digest = hash_object.hexdigest().upper()
+    return hex_digest
+def get_timestamp()->str:
+    return str(int(time.time()))
+def get_input_string(params:dict,secret:str = 'bd217f76faa9faf800f5cf31a8b007f7')-> str:
+    sorted_keys   = sorted(params.keys())
+    result_string  = ''.join(f"{key}{params[key]}" for key in sorted_keys)
+    # print(result_string)
+    return secret+result_string+secret
+def get_data(url,params1):
+    params = {
+    'appId':'sk-itf',
+    'timestamp':get_timestamp()
+    }
+    params.update(params1)
+    params['sign'] = get_sign(get_input_string(params,'bd217f76faa9faf800f5cf31a8b007f7'))
+    # print(params)
+    # url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/vec'
+    respon = requests.get(url=url,params=params)
+    # print(respon.url)
+    if respon.status_code==200:
+        return json.loads(respon.text.encode('utf8'))['data']
+    return None
+
+def delete_old_vehicle_trajectory(db):
+    sql = text(
+        """DELETE FROM sharedb.vehicle_trajectory WHERE createDate < NOW() - INTERVAL 1 DAY;""")
+    db.execute(sql)
+    db.commit()
+
+def add_vehicle_trajectory(db,data):
+    for item in data:
+        sql_insert = text(
+            """INSERT INTO sharedb.vehicle_trajectory (
+                vehicle_no, speed, mileage, dir, lat, lng, createDate, gpsDate, status_flag, gps_time
+            ) VALUES (
+                :vehicle_no, :speed, :mileage, :direction, :latitude, :longitude, :createDate, :gpsDate, :state, :gps_time
+            )"""
+        )
+        # 将 updateTime 转换为 datetime 对象
+        create_date = datetime.strptime(item['updateTime'], '%Y-%m-%d %H:%M:%S')
+        gps_date = create_date  # 假设 gpsDate 与 updateTime 相同
+
+        # 执行插入操作
+        db.execute(sql_insert, {
+            'vehicle_no': item['_id'],
+            'speed': item['speed'],
+            'mileage': None,#item['mileage'],
+            'direction': item['direction'],
+            'latitude': item['latitude'],
+            'longitude': item['longitude'],
+            'createDate': create_date,
+            'gpsDate': gps_date,
+            'state': item['state'],
+            'gps_time': item['updateTimeStamp']
+        })
+    db.commit()
+
+
+def delete_old_vehicle_list(db):
+    sql = text(
+        f"""TRUNCATE TABLE sharedb.vehicle_list;""")
+    db.execute(sql)
+    db.commit()
+
+def add_vehicle_list_INFO(db,item):
+    sql_insert = text(
+        """INSERT INTO sharedb.vehicle_list (
+            vehicle_no, alarm_flag, vehicle_type, gpsDate
+        ) VALUES (
+            :vehicle_no, :alarm_flag, :vehicle_type, :gpsDate
+        ) ON DUPLICATE KEY UPDATE
+            alarm_flag = VALUES(alarm_flag),
+            vehicle_type = VALUES(vehicle_type),
+            gpsDate = VALUES(gpsDate)"""
+    )
+    # 将 updateTime 转换为 datetime 对象
+    create_date = datetime.strptime(item['updateTime'], '%Y-%m-%d %H:%M:%S')
+    gps_date = create_date  # 假设 gpsDate 与 updateTime 相同
+
+    # 执行插入操作
+    db.execute(sql_insert, {
+        'vehicle_no': item['_id'],
+        'alarm_flag': 1,  # 假设 state 映射为 alarm_flag
+        'vehicle_type': item['carType'],  # 假设 carType 映射为 vehicle_type
+        'gpsDate': gps_date})
+def delete_old_vehicle_INFO(db):
+    sql = text(
+        f"""TRUNCATE TABLE sharedb.vehicle_info;""")
+    db.execute(sql)
+    db.commit()
+def get_vehicle_INFO(db,vehicle_no):
+    sql = text(
+        f"""select vehicle_no from sharedb.vehicle_info where vehicle_no = '{vehicle_no}' ;""")
+    return db.execute(sql).all()
+def add_vehicle_INFO(db,item):
+    sql_insert = text(
+        """INSERT INTO sharedb.vehicle_info (
+                   vehicle_no, vin, vehicle_type, vehicle_color, chelodmass, bnscope
+                ) VALUES (
+                     :vehicle_no, :vin, :vehicle_type, :vehicle_color, :chelodmass, :bnscope
+                ) ON DUPLICATE KEY UPDATE
+                    vehicle_no = VALUES(vehicle_no),
+                    vin = VALUES(vin),
+                    vehicle_type = VALUES(vehicle_type),
+                    vehicle_color = VALUES(vehicle_color),
+                    chelodmass = VALUES(chelodmass),
+                    bnscope = VALUES(bnscope)"""
+    )
+
+
+    # 执行插入操作
+    db.execute(sql_insert, {
+        'vehicle_no': item['vecNo'],
+        'vin': item['chassisNo'],
+        'vehicle_type': item['catNmCn'],
+        'vehicle_color': item['plateType'],
+        'chelodmass': item['apprvWeight'],
+        'bnscope': item['businessScope']})