#!/usr/bin/python3 # -*- coding: utf-8 -*- import requests import time from utils.sg_auth import calcResponseSign, ranstr from extensions import logger import urllib3 from exceptions import AppException # 无人机API # 广东省政务领域无人机管理平台开放接口文档 ROOT_PATH = "https://yzh-wg.gdgov.cn/ebus" WRJ_PASSID = "mmyjglj_wrjdy" WRJ_TOKEN = "757ebd126c02470c91c3f02745bee7ed" # 2.1.获取无人机清单 def roughTimeStatus(addressAreaId: str = '4407'): url = ROOT_PATH + '/swrj/open/uavReal/roughTimeStatus' logger.info('wrjapi get: {}', url) payload = f"addressAreaId={addressAreaId}" return __get_url(url, payload) # 2.2.获取飞行中无人机清单 def flyUavInfo(addressAreaId: str = '4407'): url = ROOT_PATH + '/swrj/open/uavReal/flyUavInfo' logger.info('wrjapi get: {}', url) payload = f"addressAreaId={addressAreaId}" return __get_url(url, payload) # 5.1.获取无人机实时飞行信息 def getRealInfo(SN: str): url = ROOT_PATH + '/swrj/open/uavReal/getRealInfo' logger.info('wrjapi get: {}', url) payload = f"SN={SN}" return __get_url(url, payload) def __get_url(url: str, payload: str): print(payload) timestamp = str(int(time.time())) nonce = ranstr(20) signature = calcResponseSign(timestamp, WRJ_TOKEN, nonce) headers = { "x-rio-signature": signature, "x-rio-timestamp": timestamp, "x-rio-nonce": nonce, "x-rio-paasid": WRJ_PASSID, "Content-Type": 'application/x-www-form-urlencoded', "sign-uav": "2ajk3d9w8qvncp8t5ekjqq" # 粤政易用户省统id, 业主id } urllib3.disable_warnings() response = requests.get(url, headers=headers, data=payload, verify=False) # logger.info('wrjapi return: {}', response.text) if response.status_code == 200: result = response.json() if result['code'] == 0: return result['data'] return AppException(result['code'], result['message']) raise AppException(1, '网络异常')