vehicle_job.py 6.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from datetime import datetime, timedelta
  4. from sqlalchemy.orm import Session
  5. from sqlalchemy import text
  6. from common.BigDataCenterAPI import *
  7. from utils import *
  8. from utils.redis_util import *
  9. from models import *
  10. from exceptions import *
  11. from database import get_local_db
  12. from extensions import logger
  13. import hashlib
  14. import time
  15. import requests
  16. import json
  17. # 值班任务
  18. # 每30秒将当天值班人员推送一次
  19. def proc():
  20. lock_key = "vehicle_job_proc"
  21. if redis_lock(lock_key):
  22. logger.info(datetime.now())
  23. with get_local_db() as db:
  24. # 删除超过一天的轨迹记录
  25. delete_old_vehicle_trajectory(db)
  26. # 获取车辆列表
  27. data = get_data(url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/gps/inmm',params1={})
  28. add_vehicle_trajectory(db, data)
  29. # 删除车辆列表
  30. delete_old_vehicle_list(db)
  31. # 新增非列表内车辆信息(查询详情)
  32. for info in data:
  33. add_vehicle_list_INFO(db, info)
  34. if len(get_vehicle_INFO(db,info['vehicleNo'])) ==0:
  35. vehicle_info = get_data(url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/vecInfo',params1={"vecNo":info['vehicleNo']})
  36. if vehicle_info:
  37. # print(vehicle_info)
  38. add_vehicle_INFO(db, vehicle_info)
  39. db.commit()
  40. redis_unlock(lock_key)
  41. def get_sign(input_string:str)-> str:
  42. # 待签名的字符串
  43. # input_string = "123456appIdtesttimestamp1561360165vno浙B79306123456"
  44. # 创建一个SHA1哈希对象
  45. hash_object = hashlib.sha1()
  46. # 更新哈希对象,传入待签名字符串的字节表示
  47. hash_object.update(input_string.encode('utf-8'))
  48. # 获取十六进制的编码串(大写)
  49. hex_digest = hash_object.hexdigest().upper()
  50. return hex_digest
  51. def get_timestamp()->str:
  52. return str(int(time.time()))
  53. def get_input_string(params:dict,secret:str = 'bd217f76faa9faf800f5cf31a8b007f7')-> str:
  54. sorted_keys = sorted(params.keys())
  55. result_string = ''.join(f"{key}{params[key]}" for key in sorted_keys)
  56. # print(result_string)
  57. return secret+result_string+secret
  58. def get_data(url,params1):
  59. params = {
  60. 'appId':'sk-itf',
  61. 'timestamp':get_timestamp()
  62. }
  63. params.update(params1)
  64. params['sign'] = get_sign(get_input_string(params,'bd217f76faa9faf800f5cf31a8b007f7'))
  65. # print(params)
  66. # url = 'http://120.241.74.91:14080/public/699b424a-d39d-41f1-a660-b600ed393f2c/v1.0/vec'
  67. respon = requests.get(url=url,params=params)
  68. # print(respon.url)
  69. if respon.status_code==200:
  70. return json.loads(respon.text.encode('utf8'))['data']
  71. return None
  72. def delete_old_vehicle_trajectory(db):
  73. sql = text(
  74. """DELETE FROM sharedb.vehicle_trajectory WHERE createDate < NOW() - INTERVAL 1 DAY;""")
  75. db.execute(sql)
  76. db.commit()
  77. def add_vehicle_trajectory(db,data):
  78. for item in data:
  79. sql_insert = text(
  80. """INSERT INTO sharedb.vehicle_trajectory (
  81. vehicle_no, speed, mileage, dir, lat, lng, createDate, gpsDate, status_flag, gps_time
  82. ) VALUES (
  83. :vehicle_no, :speed, :mileage, :direction, :latitude, :longitude, :createDate, :gpsDate, :state, :gps_time
  84. )"""
  85. )
  86. # 将 updateTime 转换为 datetime 对象
  87. create_date = datetime.strptime(item['updateTime'], '%Y-%m-%d %H:%M:%S')
  88. gps_date = create_date # 假设 gpsDate 与 updateTime 相同
  89. # 执行插入操作
  90. db.execute(sql_insert, {
  91. 'vehicle_no': item['_id'],
  92. 'speed': item['speed'],
  93. 'mileage': None,#item['mileage'],
  94. 'direction': item['direction'],
  95. 'latitude': item['latitude'],
  96. 'longitude': item['longitude'],
  97. 'createDate': create_date,
  98. 'gpsDate': gps_date,
  99. 'state': item['state'],
  100. 'gps_time': item['updateTimeStamp']
  101. })
  102. db.commit()
  103. def delete_old_vehicle_list(db):
  104. sql = text(
  105. f"""TRUNCATE TABLE sharedb.vehicle_list;""")
  106. db.execute(sql)
  107. db.commit()
  108. def add_vehicle_list_INFO(db,item):
  109. sql_insert = text(
  110. """INSERT INTO sharedb.vehicle_list (
  111. vehicle_no, alarm_flag, vehicle_type, gpsDate
  112. ) VALUES (
  113. :vehicle_no, :alarm_flag, :vehicle_type, :gpsDate
  114. ) ON DUPLICATE KEY UPDATE
  115. alarm_flag = VALUES(alarm_flag),
  116. vehicle_type = VALUES(vehicle_type),
  117. gpsDate = VALUES(gpsDate)"""
  118. )
  119. # 将 updateTime 转换为 datetime 对象
  120. create_date = datetime.strptime(item['updateTime'], '%Y-%m-%d %H:%M:%S')
  121. gps_date = create_date # 假设 gpsDate 与 updateTime 相同
  122. # 执行插入操作
  123. db.execute(sql_insert, {
  124. 'vehicle_no': item['_id'],
  125. 'alarm_flag': 1, # 假设 state 映射为 alarm_flag
  126. 'vehicle_type': item['carType'], # 假设 carType 映射为 vehicle_type
  127. 'gpsDate': gps_date})
  128. def delete_old_vehicle_INFO(db):
  129. sql = text(
  130. f"""TRUNCATE TABLE sharedb.vehicle_info;""")
  131. db.execute(sql)
  132. db.commit()
  133. def get_vehicle_INFO(db,vehicle_no):
  134. sql = text(
  135. f"""select vehicle_no from sharedb.vehicle_info where vehicle_no = '{vehicle_no}' ;""")
  136. return db.execute(sql).all()
  137. def add_vehicle_INFO(db,item):
  138. sql_insert = text(
  139. """INSERT INTO sharedb.vehicle_info (
  140. vehicle_no, vin, vehicle_type, vehicle_color, chelodmass, bnscope
  141. ) VALUES (
  142. :vehicle_no, :vin, :vehicle_type, :vehicle_color, :chelodmass, :bnscope
  143. ) ON DUPLICATE KEY UPDATE
  144. vehicle_no = VALUES(vehicle_no),
  145. vin = VALUES(vin),
  146. vehicle_type = VALUES(vehicle_type),
  147. vehicle_color = VALUES(vehicle_color),
  148. chelodmass = VALUES(chelodmass),
  149. bnscope = VALUES(bnscope)"""
  150. )
  151. # 执行插入操作
  152. db.execute(sql_insert, {
  153. 'vehicle_no': item['vecNo'],
  154. 'vin': item['chassisNo'],
  155. 'vehicle_type': item['catNmCn'],
  156. 'vehicle_color': item['plateType'],
  157. 'chelodmass': item['apprvWeight'],
  158. 'bnscope': item['businessScope']})