123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from datetime import datetime, timedelta
- from sqlalchemy.orm import Session
- from sqlalchemy import text
- from common.BigDataCenterAPI import *
- from utils import *
- from utils.redis_util import *
- from models import *
- from exceptions import *
- from database import get_local_db
- from extensions import logger
- import hashlib
- import time
- import requests
- import json
- # 值班任务
- # 每30秒将当天值班人员推送一次
- def proc():
- lock_key = "vehicle_job_proc"
- if redis_lock(lock_key):
- logger.info(datetime.now())
- with get_local_db() as db:
- # 删除超过一天的轨迹记录
- delete_old_vehicle_trajectory(db)
- # 获取车辆列表
- data = get_data(url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/gps/inmm',params1={})
- add_vehicle_trajectory(db, data)
- # 删除车辆列表
- delete_old_vehicle_list(db)
- # 新增非列表内车辆信息(查询详情)
- for info in data:
- add_vehicle_list_INFO(db, info)
- if len(get_vehicle_INFO(db,info['vehicleNo'])) ==0:
- vehicle_info = get_data(url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/vecInfo',params1={"vecNo":info['vehicleNo']})
- if vehicle_info:
- # print(vehicle_info)
- add_vehicle_INFO(db, vehicle_info)
- db.commit()
- redis_unlock(lock_key)
- def get_sign(input_string:str)-> str:
- # 待签名的字符串
- # input_string = "123456appIdtesttimestamp1561360165vno浙B79306123456"
- # 创建一个SHA1哈希对象
- hash_object = hashlib.sha1()
- # 更新哈希对象,传入待签名字符串的字节表示
- hash_object.update(input_string.encode('utf-8'))
- # 获取十六进制的编码串(大写)
- hex_digest = hash_object.hexdigest().upper()
- return hex_digest
- def get_timestamp()->str:
- return str(int(time.time()))
- def get_input_string(params:dict,secret:str = 'bd217f76faa9faf800f5cf31a8b007f7')-> str:
- sorted_keys = sorted(params.keys())
- result_string = ''.join(f"{key}{params[key]}" for key in sorted_keys)
- # print(result_string)
- return secret+result_string+secret
- def get_data(url,params1):
- params = {
- 'appId':'sk-itf',
- 'timestamp':get_timestamp()
- }
- params.update(params1)
- params['sign'] = get_sign(get_input_string(params,'bd217f76faa9faf800f5cf31a8b007f7'))
- # print(params)
- # url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/vec'
- respon = requests.get(url=url,params=params)
- # print(respon.url)
- if respon.status_code==200:
- return json.loads(respon.text.encode('utf8'))['data']
- return None
- def delete_old_vehicle_trajectory(db):
- sql = text(
- """DELETE FROM sharedb.vehicle_trajectory WHERE createDate < NOW() - INTERVAL 1 DAY;""")
- db.execute(sql)
- db.commit()
- def add_vehicle_trajectory(db,data):
- for item in data:
- sql_insert = text(
- """INSERT INTO sharedb.vehicle_trajectory (
- vehicle_no, speed, mileage, dir, lat, lng, createDate, gpsDate, status_flag, gps_time
- ) VALUES (
- :vehicle_no, :speed, :mileage, :direction, :latitude, :longitude, :createDate, :gpsDate, :state, :gps_time
- )"""
- )
- # 将 updateTime 转换为 datetime 对象
- create_date = datetime.strptime(item['updateTime'], '%Y-%m-%d %H:%M:%S')
- gps_date = create_date # 假设 gpsDate 与 updateTime 相同
- # 执行插入操作
- db.execute(sql_insert, {
- 'vehicle_no': item['_id'],
- 'speed': item['speed'],
- 'mileage': None,#item['mileage'],
- 'direction': item['direction'],
- 'latitude': item['latitude'],
- 'longitude': item['longitude'],
- 'createDate': create_date,
- 'gpsDate': gps_date,
- 'state': item['state'],
- 'gps_time': item['updateTimeStamp']
- })
- db.commit()
- def delete_old_vehicle_list(db):
- sql = text(
- f"""TRUNCATE TABLE sharedb.vehicle_list;""")
- db.execute(sql)
- db.commit()
- def add_vehicle_list_INFO(db,item):
- sql_insert = text(
- """INSERT INTO sharedb.vehicle_list (
- vehicle_no, alarm_flag, vehicle_type, gpsDate
- ) VALUES (
- :vehicle_no, :alarm_flag, :vehicle_type, :gpsDate
- ) ON DUPLICATE KEY UPDATE
- alarm_flag = VALUES(alarm_flag),
- vehicle_type = VALUES(vehicle_type),
- gpsDate = VALUES(gpsDate)"""
- )
- # 将 updateTime 转换为 datetime 对象
- create_date = datetime.strptime(item['updateTime'], '%Y-%m-%d %H:%M:%S')
- gps_date = create_date # 假设 gpsDate 与 updateTime 相同
- # 执行插入操作
- db.execute(sql_insert, {
- 'vehicle_no': item['_id'],
- 'alarm_flag': 1, # 假设 state 映射为 alarm_flag
- 'vehicle_type': item['carType'], # 假设 carType 映射为 vehicle_type
- 'gpsDate': gps_date})
- def delete_old_vehicle_INFO(db):
- sql = text(
- f"""TRUNCATE TABLE sharedb.vehicle_info;""")
- db.execute(sql)
- db.commit()
- def get_vehicle_INFO(db,vehicle_no):
- sql = text(
- f"""select vehicle_no from sharedb.vehicle_info where vehicle_no = '{vehicle_no}' ;""")
- return db.execute(sql).all()
- def add_vehicle_INFO(db,item):
- sql_insert = text(
- """INSERT INTO sharedb.vehicle_info (
- vehicle_no, vin, vehicle_type, vehicle_color, chelodmass, bnscope
- ) VALUES (
- :vehicle_no, :vin, :vehicle_type, :vehicle_color, :chelodmass, :bnscope
- ) ON DUPLICATE KEY UPDATE
- vehicle_no = VALUES(vehicle_no),
- vin = VALUES(vin),
- vehicle_type = VALUES(vehicle_type),
- vehicle_color = VALUES(vehicle_color),
- chelodmass = VALUES(chelodmass),
- bnscope = VALUES(bnscope)"""
- )
- # 执行插入操作
- db.execute(sql_insert, {
- 'vehicle_no': item['vecNo'],
- 'vin': item['chassisNo'],
- 'vehicle_type': item['catNmCn'],
- 'vehicle_color': item['plateType'],
- 'chelodmass': item['apprvWeight'],
- 'bnscope': item['businessScope']})
|