#!/usr/bin/env python3 # -*- coding: utf-8 -*- from datetime import datetime, timedelta from sqlalchemy.orm import Session from sqlalchemy import text from common.BigDataCenterAPI import * from utils import * from utils.redis_util import * from models import * from exceptions import * from database import get_db_local from extensions import logger import hashlib import time import requests import json # 值班任务 # 每30秒将当天值班人员推送一次 def proc(): lock_key = "vehicle_job_proc" if redis_lock(lock_key): logger.info(datetime.now()) db = get_db_local() # 删除超过一天的轨迹记录 delete_old_vehicle_trajectory(db) # 获取车辆列表 data = get_data(url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/gps/inmm',params1={}) add_vehicle_trajectory(db, data) # 删除车辆列表 delete_old_vehicle_list(db) # 新增非列表内车辆信息(查询详情) for info in data: add_vehicle_list_INFO(db, info) if len(get_vehicle_INFO(db,info['vehicleNo'])) ==0: vehicle_info = get_data(url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/vecInfo',params1={"vecNo":info['vehicleNo']}) if vehicle_info: # print(vehicle_info) add_vehicle_INFO(db, vehicle_info) db.commit() db.close() redis_unlock(lock_key) def get_sign(input_string:str)-> str: # 待签名的字符串 # input_string = "123456appIdtesttimestamp1561360165vno浙B79306123456" # 创建一个SHA1哈希对象 hash_object = hashlib.sha1() # 更新哈希对象,传入待签名字符串的字节表示 hash_object.update(input_string.encode('utf-8')) # 获取十六进制的编码串(大写) hex_digest = hash_object.hexdigest().upper() return hex_digest def get_timestamp()->str: return str(int(time.time())) def get_input_string(params:dict,secret:str = 'bd217f76faa9faf800f5cf31a8b007f7')-> str: sorted_keys = sorted(params.keys()) result_string = ''.join(f"{key}{params[key]}" for key in sorted_keys) # print(result_string) return secret+result_string+secret def get_data(url,params1): params = { 'appId':'sk-itf', 'timestamp':get_timestamp() } params.update(params1) params['sign'] = get_sign(get_input_string(params,'bd217f76faa9faf800f5cf31a8b007f7')) # print(params) # url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/vec' respon = requests.get(url=url,params=params) # print(respon.url) if respon.status_code==200: return json.loads(respon.text.encode('utf8'))['data'] return None def delete_old_vehicle_trajectory(db): sql = text( """DELETE FROM sharedb.vehicle_trajectory WHERE createDate < NOW() - INTERVAL 1 DAY;""") db.execute(sql) db.commit() def add_vehicle_trajectory(db,data): for item in data: sql_insert = text( """INSERT INTO sharedb.vehicle_trajectory ( vehicle_no, speed, mileage, dir, lat, lng, createDate, gpsDate, status_flag, gps_time ) VALUES ( :vehicle_no, :speed, :mileage, :direction, :latitude, :longitude, :createDate, :gpsDate, :state, :gps_time )""" ) # 将 updateTime 转换为 datetime 对象 create_date = datetime.strptime(item['updateTime'], '%Y-%m-%d %H:%M:%S') gps_date = create_date # 假设 gpsDate 与 updateTime 相同 # 执行插入操作 db.execute(sql_insert, { 'vehicle_no': item['_id'], 'speed': item['speed'], 'mileage': None,#item['mileage'], 'direction': item['direction'], 'latitude': item['latitude'], 'longitude': item['longitude'], 'createDate': create_date, 'gpsDate': gps_date, 'state': item['state'], 'gps_time': item['updateTimeStamp'] }) db.commit() def delete_old_vehicle_list(db): sql = text( f"""TRUNCATE TABLE sharedb.vehicle_list;""") db.execute(sql) db.commit() def add_vehicle_list_INFO(db,item): sql_insert = text( """INSERT INTO sharedb.vehicle_list ( vehicle_no, alarm_flag, vehicle_type, gpsDate ) VALUES ( :vehicle_no, :alarm_flag, :vehicle_type, :gpsDate ) ON DUPLICATE KEY UPDATE alarm_flag = VALUES(alarm_flag), vehicle_type = VALUES(vehicle_type), gpsDate = VALUES(gpsDate)""" ) # 将 updateTime 转换为 datetime 对象 create_date = datetime.strptime(item['updateTime'], '%Y-%m-%d %H:%M:%S') gps_date = create_date # 假设 gpsDate 与 updateTime 相同 # 执行插入操作 db.execute(sql_insert, { 'vehicle_no': item['_id'], 'alarm_flag': 1, # 假设 state 映射为 alarm_flag 'vehicle_type': item['carType'], # 假设 carType 映射为 vehicle_type 'gpsDate': gps_date}) def delete_old_vehicle_INFO(db): sql = text( f"""TRUNCATE TABLE sharedb.vehicle_info;""") db.execute(sql) db.commit() def get_vehicle_INFO(db,vehicle_no): sql = text( f"""select vehicle_no from sharedb.vehicle_info where vehicle_no = '{vehicle_no}' ;""") return db.execute(sql).all() def add_vehicle_INFO(db,item): sql_insert = text( """INSERT INTO sharedb.vehicle_info ( vehicle_no, vin, vehicle_type, vehicle_color, chelodmass, bnscope ) VALUES ( :vehicle_no, :vin, :vehicle_type, :vehicle_color, :chelodmass, :bnscope ) ON DUPLICATE KEY UPDATE vehicle_no = VALUES(vehicle_no), vin = VALUES(vin), vehicle_type = VALUES(vehicle_type), vehicle_color = VALUES(vehicle_color), chelodmass = VALUES(chelodmass), bnscope = VALUES(bnscope)""" ) # 执行插入操作 db.execute(sql_insert, { 'vehicle_no': item['vecNo'], 'vin': item['chassisNo'], 'vehicle_type': item['catNmCn'], 'vehicle_color': item['plateType'], 'chelodmass': item['apprvWeight'], 'bnscope': item['businessScope']})