rainfall_conditions_job.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from datetime import datetime, timedelta
  4. from sqlalchemy.orm import Session
  5. from common.BigDataCenterAPI import *
  6. from utils import *
  7. from utils.redis_util import *
  8. from models import *
  9. from exceptions import *
  10. from database import get_local_db
  11. from extensions import logger
  12. import pymysql
  13. from config import settings
  14. def get_stcd_data(stcd='',hours=24):
  15. # 获取当前时间
  16. now = datetime.now()
  17. # 计算 24 小时前的时间
  18. twenty_four_hours_ago = now - timedelta(hours=hours)
  19. formatted_now = now.strftime('%Y-%m-%d %H')
  20. formatted_twenty_four_hours_ago = twenty_four_hours_ago.strftime('%Y-%m-%d %H')
  21. url = 'https://19.15.75.180:8581/GatewayMsg/http/api/proxy/invoke'
  22. service_code = 'P0138'
  23. passid = 'C90-44004428'
  24. passtoken = 'f539f616b2834160bc47fda5f298512c'
  25. signTime = str(GetTime() // 1000)
  26. nonce = GetNonce(5)
  27. sign = GetSign(signTime, nonce, passtoken)
  28. headers = {
  29. 'Content-Type': 'application/json',
  30. 'x-tif-signature': sign,
  31. 'x-tif-timestamp': signTime,
  32. 'x-tif-nonce': nonce,
  33. 'x-tif-paasid': passid,
  34. 'x-tif-serviceId': service_code
  35. }
  36. data = {
  37. "system_id": "C90-44004428",
  38. "vender_id": "xxx",
  39. "department_id": "xxx",
  40. "query_timestamp": str(GetTime()),
  41. "UUID": GetNonce(4),
  42. "query": {
  43. "stcd":stcd,
  44. "startTime":f"{formatted_twenty_four_hours_ago}:00:00",
  45. "endTime":f"{formatted_now}:00:00"
  46. },
  47. "audit_info": {
  48. "operator_id": "xxxx",
  49. "operator_name": "xxx",
  50. "query_object_id": "xxxx",
  51. "query_object_id_type": "01",
  52. "item_id": "xxxx",
  53. "item_code": "xxx",
  54. "item_sequence": "xxx",
  55. "terminal_info": "xxxx",
  56. "query_timestamp": "xxxx"
  57. }
  58. }
  59. response = requests.post(url=url, headers=headers, json=data, verify=False)
  60. if response.status_code == 200:
  61. try:
  62. return response.json()['data']['data']
  63. except:
  64. return []
  65. def put_data(
  66. db = get_local_db()
  67. #db=get_db_local()
  68. ):
  69. query = db.query(GovdataRainDataInfo)
  70. rainfull = get_stcd_data()
  71. if len(rainfull)>0:
  72. now = datetime.now()
  73. twenty_four_hours_ago = now - timedelta(hours=25)
  74. db.query(GovdataRainDataInfo).filter(GovdataRainDataInfo.create_time <twenty_four_hours_ago ).delete()
  75. db.commit()
  76. for info in rainfull:
  77. code = info['F3070220000034_000018001']
  78. create_time = info['F3070220000034_000018004']
  79. raininfo = query.filter(GovdataRainDataInfo.code == code).filter(GovdataRainDataInfo.create_time == create_time).first()
  80. if raininfo:
  81. raininfo.area_name = info['F3070220000034_000018002']
  82. raininfo.address = info['F3070220000034_000018003']
  83. raininfo.rainfall = info['F3070220000034_000018005']
  84. raininfo.update_time = info['F3070220000034_000018006']
  85. else:
  86. raindata=GovdataRainDataInfo(
  87. code = info['F3070220000034_000018001'],
  88. area_name = info['F3070220000034_000018002'],
  89. address = info['F3070220000034_000018003'],
  90. create_time = info['F3070220000034_000018004'],
  91. rainfall = info['F3070220000034_000018005'],
  92. update_time = info['F3070220000034_000018006']
  93. )
  94. db.add(
  95. raindata
  96. )
  97. db.commit()
  98. def proc():
  99. lock_key = "rainfall_conditions_job"
  100. if redis_lock(lock_key):
  101. logger.info(datetime.now())
  102. try:
  103. put_data()
  104. except:
  105. pass
  106. finally:
  107. redis_unlock(lock_key)