#!/usr/bin/python3 # -*- coding: utf-8 -*- import requests import time from utils.sg_auth import calcResponseSign, ranstr from extensions import logger import urllib3 from exceptions import AppException # 热力图API # 粤政易网关 # 区域流量热力图基础数据服务接口说明0515 ROOT_PATH = "https://yzh-wg.gdgov.cn/ebus" RLT_PASSID = "mmsyjglj_qyllrltdy" RLT_TOKEN = "f263d7f514724944b665bee64440ffb0" # 2.行政区划人流量查询接口 def listPopulationNumberByRegion(regionCode: str = '4407'): url = ROOT_PATH + '/qyllrlt/listPopulationNumberByRegion' logger.info('rltapi get: {}', url) payload = f"regionCode={regionCode}®ionType=city&dataSource=4" return __get_url(url, payload) # 3.行政区划人口热力查询接口 def listPopulationHeatByRegion(regionCode: str = '4407'): url = ROOT_PATH + '/qyllrlt/listPopulationHeatByRegion' logger.info('rltapi get: {}', url) payload = f"regionCode={regionCode}®ionType=city&dataSource=4" return __get_url(url, payload) def __get_url(url: str, payload: str): print(payload) timestamp = str(int(time.time())) nonce = ranstr(20) signature = calcResponseSign(timestamp, RLT_TOKEN, nonce) headers = { "x-rio-signature": signature, "x-rio-timestamp": timestamp, "x-rio-nonce": nonce, "x-rio-paasid": RLT_PASSID, "Content-Type": 'application/x-www-form-urlencoded' } print(headers) urllib3.disable_warnings() response = requests.get(url, headers=headers, data=payload, verify=False) if response.status_code == 200: result = response.json() if result['code'] == 0: return result['data'] return AppException(result['code'], result['message']) else: logger.info('rltapi return: {}, {}', response.status_code, response.text) result = response.json() raise AppException(response.status_code, result['desc'])