123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263 |
- #!/usr/bin/python3
- # -*- coding: utf-8 -*-
- import requests
- import time
- from utils.sg_auth import calcResponseSign, ranstr
- from extensions import logger
- import urllib3
- from exceptions import AppException
- # 热力图API
- # 粤政易网关
- # 区域流量热力图基础数据服务接口说明0515
- ROOT_PATH = "https://yzh-wg.gdgov.cn/ebus"
- RLT_PASSID = "mmsyjglj_qyllrltdy"
- RLT_TOKEN = "f263d7f514724944b665bee64440ffb0"
- # 2.行政区划人流量查询接口
- def listPopulationNumberByRegion(regionCode: str = '4409'):
- url = ROOT_PATH + f'/qyllrlt/listPopulationNumberByRegion?regionCode={regionCode}®ionType=city&dataSource=4'
- logger.info('rltapi get: {}', url)
- # payload = f"regionCode={regionCode}®ionType=city&dataSource=4"
- return __get_url(url)
- # 3.行政区划人口热力查询接口
- def listPopulationHeatByRegion(regionCode: str = '4409'):
- url = ROOT_PATH + f'/qyllrlt/listPopulationHeatByRegion?regionCode={regionCode}®ionType=city&dataSource=4'
- logger.info('rltapi get: {}', url)
- # payload = f"regionCode={regionCode}®ionType=city&dataSource=4"
- return __get_url(url)
- def __get_url(url: str, payload: str=''):
- # print(payload)
- timestamp = str(int(time.time()))
- nonce = ranstr(20)
- signature = calcResponseSign(timestamp, RLT_TOKEN, nonce)
- headers = {
- "x-rio-signature": signature,
- "x-rio-timestamp": timestamp,
- "x-rio-nonce": nonce,
- "x-rio-paasid": RLT_PASSID,
- "Content-Type": 'application/x-www-form-urlencoded',
- "x-datax-code": "dss-qyrkrlzp",
- "x-datax-passid": "qyllrlt"
- }
- print(headers)
- urllib3.disable_warnings()
- response = requests.get(url, headers=headers, data=payload, verify=False)
- if response.status_code == 200:
- logger.info('rltapi return: {}', response.text)
- result = response.json()
- if result['status'] == 0:
- return result['data']
- return AppException(result['status'], result['msg'])
-
- else:
- logger.info('rltapi return: {}, {}', response.status_code, response.text)
- result = response.json()
- raise AppException(response.status_code, result['desc'])
|