RLTApi.py 1.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960
  1. #!/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3. import requests
  4. import time
  5. from utils.sg_auth import calcResponseSign, ranstr
  6. from extensions import logger
  7. import urllib3
  8. from exceptions import AppException
  9. # 热力图API
  10. # 粤政易网关
  11. # 区域流量热力图基础数据服务接口说明0515
  12. ROOT_PATH = "https://yzh-wg.gdgov.cn/ebus"
  13. RLT_PASSID = "mmsyjglj_qyllrltdy"
  14. RLT_TOKEN = "f263d7f514724944b665bee64440ffb0"
  15. # 2.行政区划人流量查询接口
  16. def listPopulationNumberByRegion(regionCode: str = '4407'):
  17. url = ROOT_PATH + '/qyllrlt/listPopulationNumberByRegion'
  18. logger.info('rltapi get: {}', url)
  19. payload = f"regionCode={regionCode}&regionType=city&dataSource=4"
  20. return __get_url(url, payload)
  21. # 3.行政区划人口热力查询接口
  22. def listPopulationHeatByRegion(regionCode: str = '4407'):
  23. url = ROOT_PATH + '/qyllrlt/listPopulationHeatByRegion'
  24. logger.info('rltapi get: {}', url)
  25. payload = f"regionCode={regionCode}&regionType=city&dataSource=4"
  26. return __get_url(url, payload)
  27. def __get_url(url: str, payload: str):
  28. print(payload)
  29. timestamp = str(int(time.time()))
  30. nonce = ranstr(20)
  31. signature = calcResponseSign(timestamp, RLT_TOKEN, nonce)
  32. headers = {
  33. "x-rio-signature": signature,
  34. "x-rio-timestamp": timestamp,
  35. "x-rio-nonce": nonce,
  36. "x-rio-paasid": RLT_PASSID,
  37. "Content-Type": 'application/x-www-form-urlencoded'
  38. }
  39. print(headers)
  40. urllib3.disable_warnings()
  41. response = requests.get(url, headers=headers, data=payload, verify=False)
  42. if response.status_code == 200:
  43. result = response.json()
  44. if result['code'] == 0:
  45. return result['data']
  46. return AppException(result['code'], result['message'])
  47. else:
  48. logger.info('rltapi return: {}, {}', response.status_code, response.text)
  49. result = response.json()
  50. raise AppException(response.status_code, result['desc'])