12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364 |
- #!/usr/bin/python3
- # -*- coding: utf-8 -*-
- import requests
- import time
- from utils.sg_auth import calcResponseSign, ranstr
- from extensions import logger
- import urllib3
- from exceptions import AppException
- # 无人机API
- # 广东省政务领域无人机管理平台开放接口文档
- ROOT_PATH = "https://yzh-wg.gdgov.cn/ebus"
- WRJ_PASSID = "mmyjglj_wrjdy"
- WRJ_TOKEN = "757ebd126c02470c91c3f02745bee7ed"
- # 2.1.获取无人机清单
- def roughTimeStatus(addressAreaId: str = '4407'):
- url = ROOT_PATH + '/swrj/open/uavReal/roughTimeStatus'
- logger.info('wrjapi get: {}', url)
- payload = f"addressAreaId={addressAreaId}"
- return __get_url(url, payload)
- # 2.2.获取飞行中无人机清单
- def flyUavInfo(addressAreaId: str = '4407'):
- url = ROOT_PATH + '/swrj/open/uavReal/flyUavInfo'
- logger.info('wrjapi get: {}', url)
- payload = f"addressAreaId={addressAreaId}"
- return __get_url(url, payload)
- # 5.1.获取无人机实时飞行信息
- def getRealInfo(SN: str):
- url = ROOT_PATH + '/swrj/open/uavReal/getRealInfo'
- logger.info('wrjapi get: {}', url)
- payload = f"SN={SN}"
- return __get_url(url, payload)
- def __get_url(url: str, payload: str):
- print(payload)
- timestamp = str(int(time.time()))
- nonce = ranstr(20)
- signature = calcResponseSign(timestamp, WRJ_TOKEN, nonce)
- headers = {
- "x-rio-signature": signature,
- "x-rio-timestamp": timestamp,
- "x-rio-nonce": nonce,
- "x-rio-paasid": WRJ_PASSID,
- "Content-Type": 'application/x-www-form-urlencoded',
- "sign-uav": "2ajk3d9w8qvncp8t5ekjqq" # 粤政易用户省统id, 业主id
- }
- urllib3.disable_warnings()
- response = requests.get(url, headers=headers, data=payload, verify=False)
- # logger.info('wrjapi return: {}', response.text)
- if response.status_code == 200:
- result = response.json()
- if result['code'] == 0:
- return result['data']
- return AppException(result['code'], result['message'])
-
- raise AppException(1, '网络异常')
|