vehicle_job.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from datetime import datetime, timedelta
  4. from sqlalchemy.orm import Session
  5. from sqlalchemy import text
  6. from common.BigDataCenterAPI import *
  7. from utils import *
  8. from utils.redis_util import *
  9. from models import *
  10. from exceptions import *
  11. from database import get_db_local
  12. from extensions import logger
  13. import hashlib
  14. import time
  15. import requests
  16. import json
  17. # 值班任务
  18. # 每30秒将当天值班人员推送一次
  19. def proc():
  20. lock_key = "vehicle_job_proc"
  21. if redis_lock(lock_key):
  22. logger.info(datetime.now())
  23. db = get_db_local()
  24. # 删除超过一天的轨迹记录
  25. delete_old_vehicle_trajectory(db)
  26. # 获取车辆列表
  27. data = get_data(url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/gps/inmm',params1={})
  28. add_vehicle_trajectory(db, data)
  29. # 删除车辆列表
  30. delete_old_vehicle_list(db)
  31. # 新增非列表内车辆信息(查询详情)
  32. for info in data:
  33. add_vehicle_list_INFO(db, info)
  34. if len(get_vehicle_INFO(db,info['vehicleNo'])) ==0:
  35. vehicle_info = get_data(url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/vecInfo',params1={"vecNo":info['vehicleNo']})
  36. if vehicle_info:
  37. # print(vehicle_info)
  38. add_vehicle_INFO(db, vehicle_info)
  39. db.commit()
  40. db.close()
  41. redis_unlock(lock_key)
  42. def get_sign(input_string:str)-> str:
  43. # 待签名的字符串
  44. # input_string = "123456appIdtesttimestamp1561360165vno浙B79306123456"
  45. # 创建一个SHA1哈希对象
  46. hash_object = hashlib.sha1()
  47. # 更新哈希对象,传入待签名字符串的字节表示
  48. hash_object.update(input_string.encode('utf-8'))
  49. # 获取十六进制的编码串(大写)
  50. hex_digest = hash_object.hexdigest().upper()
  51. return hex_digest
  52. def get_timestamp()->str:
  53. return str(int(time.time()))
  54. def get_input_string(params:dict,secret:str = 'bd217f76faa9faf800f5cf31a8b007f7')-> str:
  55. sorted_keys = sorted(params.keys())
  56. result_string = ''.join(f"{key}{params[key]}" for key in sorted_keys)
  57. # print(result_string)
  58. return secret+result_string+secret
  59. def get_data(url,params1):
  60. params = {
  61. 'appId':'sk-itf',
  62. 'timestamp':get_timestamp()
  63. }
  64. params.update(params1)
  65. params['sign'] = get_sign(get_input_string(params,'bd217f76faa9faf800f5cf31a8b007f7'))
  66. # print(params)
  67. # url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/vec'
  68. respon = requests.get(url=url,params=params)
  69. # print(respon.url)
  70. if respon.status_code==200:
  71. return json.loads(respon.text.encode('utf8'))['data']
  72. return None
  73. def delete_old_vehicle_trajectory(db):
  74. sql = text(
  75. """DELETE FROM sharedb.vehicle_trajectory WHERE createDate < NOW() - INTERVAL 1 DAY;""")
  76. db.execute(sql)
  77. db.commit()
  78. def add_vehicle_trajectory(db,data):
  79. for item in data:
  80. sql_insert = text(
  81. """INSERT INTO sharedb.vehicle_trajectory (
  82. vehicle_no, speed, mileage, dir, lat, lng, createDate, gpsDate, status_flag, gps_time
  83. ) VALUES (
  84. :vehicle_no, :speed, :mileage, :direction, :latitude, :longitude, :createDate, :gpsDate, :state, :gps_time
  85. )"""
  86. )
  87. # 将 updateTime 转换为 datetime 对象
  88. create_date = datetime.strptime(item['updateTime'], '%Y-%m-%d %H:%M:%S')
  89. gps_date = create_date # 假设 gpsDate 与 updateTime 相同
  90. # 执行插入操作
  91. db.execute(sql_insert, {
  92. 'vehicle_no': item['_id'],
  93. 'speed': item['speed'],
  94. 'mileage': None,#item['mileage'],
  95. 'direction': item['direction'],
  96. 'latitude': item['latitude'],
  97. 'longitude': item['longitude'],
  98. 'createDate': create_date,
  99. 'gpsDate': gps_date,
  100. 'state': item['state'],
  101. 'gps_time': item['updateTimeStamp']
  102. })
  103. db.commit()
  104. def delete_old_vehicle_list(db):
  105. sql = text(
  106. f"""TRUNCATE TABLE sharedb.vehicle_list;""")
  107. db.execute(sql)
  108. db.commit()
  109. def add_vehicle_list_INFO(db,item):
  110. sql_insert = text(
  111. """INSERT INTO sharedb.vehicle_list (
  112. vehicle_no, alarm_flag, vehicle_type, gpsDate
  113. ) VALUES (
  114. :vehicle_no, :alarm_flag, :vehicle_type, :gpsDate
  115. ) ON DUPLICATE KEY UPDATE
  116. alarm_flag = VALUES(alarm_flag),
  117. vehicle_type = VALUES(vehicle_type),
  118. gpsDate = VALUES(gpsDate)"""
  119. )
  120. # 将 updateTime 转换为 datetime 对象
  121. create_date = datetime.strptime(item['updateTime'], '%Y-%m-%d %H:%M:%S')
  122. gps_date = create_date # 假设 gpsDate 与 updateTime 相同
  123. # 执行插入操作
  124. db.execute(sql_insert, {
  125. 'vehicle_no': item['_id'],
  126. 'alarm_flag': 1, # 假设 state 映射为 alarm_flag
  127. 'vehicle_type': item['carType'], # 假设 carType 映射为 vehicle_type
  128. 'gpsDate': gps_date})
  129. def delete_old_vehicle_INFO(db):
  130. sql = text(
  131. f"""TRUNCATE TABLE sharedb.vehicle_info;""")
  132. db.execute(sql)
  133. db.commit()
  134. def get_vehicle_INFO(db,vehicle_no):
  135. sql = text(
  136. f"""select vehicle_no from sharedb.vehicle_info where vehicle_no = '{vehicle_no}' ;""")
  137. return db.execute(sql).all()
  138. def add_vehicle_INFO(db,item):
  139. sql_insert = text(
  140. """INSERT INTO sharedb.vehicle_info (
  141. vehicle_no, vin, vehicle_type, vehicle_color, chelodmass, bnscope
  142. ) VALUES (
  143. :vehicle_no, :vin, :vehicle_type, :vehicle_color, :chelodmass, :bnscope
  144. ) ON DUPLICATE KEY UPDATE
  145. vehicle_no = VALUES(vehicle_no),
  146. vin = VALUES(vin),
  147. vehicle_type = VALUES(vehicle_type),
  148. vehicle_color = VALUES(vehicle_color),
  149. chelodmass = VALUES(chelodmass),
  150. bnscope = VALUES(bnscope)"""
  151. )
  152. # 执行插入操作
  153. db.execute(sql_insert, {
  154. 'vehicle_no': item['vecNo'],
  155. 'vin': item['chassisNo'],
  156. 'vehicle_type': item['catNmCn'],
  157. 'vehicle_color': item['plateType'],
  158. 'chelodmass': item['apprvWeight'],
  159. 'bnscope': item['businessScope']})