#!/usr/bin/python3 # -*- coding: utf-8 -*- import requests import time from utils.sg_auth import calcResponseSign, ranstr from extensions import logger import urllib3 from exceptions import AppException # 热力图API # 粤政易网关 # 区域流量热力图基础数据服务接口说明0515 ROOT_PATH = "https://yzh-wg.gdgov.cn/ebus" RLT_PASSID = "mmsyjglj_qyllrltdy" RLT_TOKEN = "f263d7f514724944b665bee64440ffb0" # 2.行政区划人流量查询接口 def listPopulationNumberByRegion(regionCode: str = '4409'): url = ROOT_PATH + f'/qyllrlt/listPopulationNumberByRegion?regionCode={regionCode}®ionType=city&dataSource=4' logger.info('rltapi get: {}', url) # payload = f"regionCode={regionCode}®ionType=city&dataSource=4" return __get_url(url) # 3.行政区划人口热力查询接口 def listPopulationHeatByRegion(regionCode: str = '4409'): url = ROOT_PATH + f'/qyllrlt/listPopulationHeatByRegion?regionCode={regionCode}®ionType=city&dataSource=4' logger.info('rltapi get: {}', url) # payload = f"regionCode={regionCode}®ionType=city&dataSource=4" return __get_url(url) def __get_url(url: str, payload: str=''): # print(payload) timestamp = str(int(time.time())) nonce = ranstr(20) signature = calcResponseSign(timestamp, RLT_TOKEN, nonce) headers = { "x-rio-signature": signature, "x-rio-timestamp": timestamp, "x-rio-nonce": nonce, "x-rio-paasid": RLT_PASSID, "Content-Type": 'application/x-www-form-urlencoded', "x-datax-code": "dss-qyrkrlzp", "x-datax-passid": "qyllrlt" } print(headers) urllib3.disable_warnings() response = requests.get(url, headers=headers, data=payload, verify=False) if response.status_code == 200: logger.info('rltapi return: {}', response.text) result = response.json() if result['status'] == 0: return result['data'] return AppException(result['status'], result['msg']) else: logger.info('rltapi return: {}, {}', response.status_code, response.text) result = response.json() raise AppException(response.status_code, result['desc'])