rainfall_conditions_job.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from datetime import datetime, timedelta
  4. from sqlalchemy.orm import Session
  5. from common.BigDataCenterAPI import *
  6. from utils import *
  7. from utils.redis_util import *
  8. from models import *
  9. from exceptions import *
  10. from database import get_db_local
  11. from extensions import logger
  12. import pymysql
  13. from config import settings
  14. def get_stcd_data(stcd='',hours=24):
  15. # 获取当前时间
  16. now = datetime.now()
  17. # 计算 24 小时前的时间
  18. twenty_four_hours_ago = now - timedelta(hours=hours)
  19. formatted_now = now.strftime('%Y-%m-%d %H')
  20. formatted_twenty_four_hours_ago = twenty_four_hours_ago.strftime('%Y-%m-%d %H')
  21. url = 'https://19.15.75.180:8581/GatewayMsg/http/api/proxy/invoke'
  22. service_code = 'P0138'
  23. passid = 'C90-44004428'
  24. passtoken = 'f539f616b2834160bc47fda5f298512c'
  25. signTime = str(GetTime() // 1000)
  26. nonce = GetNonce(5)
  27. sign = GetSign(signTime, nonce, passtoken)
  28. headers = {
  29. 'Content-Type': 'application/json',
  30. 'x-tif-signature': sign,
  31. 'x-tif-timestamp': signTime,
  32. 'x-tif-nonce': nonce,
  33. 'x-tif-paasid': passid,
  34. 'x-tif-serviceId': service_code
  35. }
  36. data = {
  37. "system_id": "C90-44004428",
  38. "vender_id": "xxx",
  39. "department_id": "xxx",
  40. "query_timestamp": str(GetTime()),
  41. "UUID": GetNonce(4),
  42. "query": {
  43. "stcd":stcd,
  44. "startTime":f"{formatted_twenty_four_hours_ago}:00:00",
  45. "endTime":f"{formatted_now}:00:00"
  46. },
  47. "audit_info": {
  48. "operator_id": "xxxx",
  49. "operator_name": "xxx",
  50. "query_object_id": "xxxx",
  51. "query_object_id_type": "01",
  52. "item_id": "xxxx",
  53. "item_code": "xxx",
  54. "item_sequence": "xxx",
  55. "terminal_info": "xxxx",
  56. "query_timestamp": "xxxx"
  57. }
  58. }
  59. response = requests.post(url=url, headers=headers, json=data, verify=False)
  60. if response.status_code == 200:
  61. try:
  62. return response.json()['data']['data']
  63. except:
  64. return []
  65. def put_data(db=get_db_local()):
  66. query = db.query(GovdataRainDataInfo)
  67. rainfull = get_stcd_data()
  68. if len(rainfull)>0:
  69. now = datetime.now()
  70. twenty_four_hours_ago = now - timedelta(hours=25)
  71. db.query(GovdataRainDataInfo).filter(GovdataRainDataInfo.create_time <twenty_four_hours_ago ).delete()
  72. db.commit()
  73. for info in rainfull:
  74. code = info['F3070220000034_000018001']
  75. create_time = info['F3070220000034_000018004']
  76. raininfo = query.filter(GovdataRainDataInfo.code == code).filter(GovdataRainDataInfo.create_time == create_time).first()
  77. if raininfo:
  78. raininfo.area_name = info['F3070220000034_000018002']
  79. raininfo.address = info['F3070220000034_000018003']
  80. raininfo.rainfall = info['F3070220000034_000018005']
  81. raininfo.update_time = info['F3070220000034_000018006']
  82. else:
  83. raindata=GovdataRainDataInfo(
  84. code = info['F3070220000034_000018001'],
  85. area_name = info['F3070220000034_000018002'],
  86. address = info['F3070220000034_000018003'],
  87. create_time = info['F3070220000034_000018004'],
  88. rainfall = info['F3070220000034_000018005'],
  89. update_time = info['F3070220000034_000018006']
  90. )
  91. db.add(
  92. raindata
  93. )
  94. db.commit()
  95. def proc():
  96. lock_key = "rainfall_conditions_job"
  97. if redis_lock(lock_key):
  98. logger.info(datetime.now())
  99. try:
  100. put_data()
  101. except:
  102. pass
  103. finally:
  104. redis_unlock(lock_key)