#!/usr/bin/env python3 # -*- coding: utf-8 -*- import requests import string import random import json from utils import * from utils.redis_util import * app_id = "820e2aa3903e46939abde098409319e1" secret= "92659e6e0dd3c550952df7c95a483d56" ''' 融合通信Mini客户端对接 U8 OpenAPI使用说明书 ''' API_ROOT = "http://19.152.196.106:12030/avcon-api" def get_nonce(length: int = 10) -> str: characters = string.ascii_letters + string.digits # 随机选择字符集的长度个字符 random_string = ''.join(random.choice(characters) for _ in range(length)) return random_string def get_sign_data(nonce: str, timestamp: str, uri: str, query: str = None, body: dict = None) -> str: data = nonce + str(timestamp) + str(uri) if query is not None: data = data + query if body is not None: data = data + json.dumps(body, ensure_ascii=False) sign_data = md5(data) print('data:', data) print('sign_data:', sign_data) return sign_data # 3.1 获得Token def get_auth_token(): nonce = get_nonce() timestamp = str(unixstamp()) uri = "/oauth/token" query = "app_id="+app_id+"&secret="+secret sign_data = get_sign_data(nonce, timestamp, uri, query) headers = { "Content-Type": 'application/json', "X-Nonce": nonce, "X-Timestamp": timestamp, "X-Signature": sign_data } response = requests.get(API_ROOT + uri + "?" + query, headers=headers, timeout=15) print(response.text) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data['token'] def get_redis_token(): redis_key = "avcon_mini_api_token" token_val = redis_get(redis_key) if token_val is None: token_val = get_auth_token() if token_val is not None: redis_set_with_time(redis_key, token_val, 300) print('auth_token:', token_val) return token_val # 4.1 查询配置项 def get_config(name: str): token = get_redis_token() nonce = get_nonce() timestamp = str(unixstamp()) uri = "/open/api/v1/config/" + name sign_data = get_sign_data(nonce, timestamp, uri) headers = { "Content-Type": 'application/json', "Authorization": token, "X-Nonce": nonce, "X-Timestamp": timestamp, "X-Signature": sign_data } response = requests.get(API_ROOT + uri, headers=headers, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data else: print(response.text) # 5.1 查询区域 def get_group(group_id: str = 'G1@mm.zw.yj'): token = get_redis_token() nonce = get_nonce() timestamp = str(unixstamp()) uri = "/open/api/v1/monitor-room-group/" + group_id sign_data = get_sign_data(nonce, timestamp, uri) headers = { "Content-Type": 'application/json', "Authorization": token, "X-Nonce": nonce, "X-Timestamp": timestamp, "X-Signature": sign_data } response = requests.get(API_ROOT + uri, headers=headers, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data else: print(response.text) # 5.2 查询区域分页 def get_group_page(): token = get_redis_token() nonce = get_nonce() timestamp = str(unixstamp()) uri = "/open/api/v1/monitor-room-group/page" query = "page_num=1&page_size=100" sign_data = get_sign_data(nonce, timestamp, uri, query) headers = { "Content-Type": 'application/json', "Authorization": token, "X-Nonce": nonce, "X-Timestamp": timestamp, "X-Signature": sign_data } response = requests.get(API_ROOT + uri + "?" + query, headers=headers, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data else: print(response.text) # 5.3 查询区域的设备分页 def get_group_device(group_id: str): token = get_redis_token() nonce = get_nonce() timestamp = str(unixstamp()) uri = "/open/api/v1/monitor-room-group/" + group_id + "/device/page" query = "page_num=1&page_size=100" sign_data = get_sign_data(nonce, timestamp, uri, query) headers = { "Content-Type": 'application/json', "Authorization": token, "X-Nonce": nonce, "X-Timestamp": timestamp, "X-Signature": sign_data } response = requests.get(API_ROOT + uri + "?" + query, headers=headers, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data else: print(response.text) # 6.1 查询设备 def get_device_info(dev_id: str): token = get_redis_token() nonce = get_nonce() timestamp = str(unixstamp()) uri = "/open/api/v1/device/" + dev_id sign_data = get_sign_data(nonce, timestamp, uri) headers = { "Content-Type": 'application/json', "Authorization": token, "X-Nonce": nonce, "X-Timestamp": timestamp, "X-Signature": sign_data } response = requests.get(API_ROOT + uri, headers=headers, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data else: print(response.text) # 6.2 查询设备分页 def get_device_all(): token = get_redis_token() nonce = get_nonce() timestamp = str(unixstamp()) uri = "/open/api/v1/device/page" json_data = { "domain": "mm.zw.yj" } query = "json="+json.dumps(json_data, ensure_ascii=False) + "&page_num=1&page_size=1000" sign_data = get_sign_data(nonce, timestamp, uri, query) headers = { "Content-Type": 'application/json', "Authorization": token, "X-Nonce": nonce, "X-Timestamp": timestamp, "X-Signature": sign_data } response = requests.get(API_ROOT + uri + "?" + query, headers=headers, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data else: print(response.text) # 6.3 查询设备的通道分页 def get_device_channel(dev_id: str): token = get_redis_token() nonce = get_nonce() timestamp = str(unixstamp()) uri = "/open/api/v1/device/" + dev_id + "/channel/page" json_data = { "domain": "mm.zw.yj" } query = "json="+json.dumps(json_data, ensure_ascii=False) + "&page_num=1&page_size=1000" sign_data = get_sign_data(nonce, timestamp, uri, query) headers = { "Content-Type": 'application/json', "Authorization": token, "X-Nonce": nonce, "X-Timestamp": timestamp, "X-Signature": sign_data } response = requests.get(API_ROOT + uri + "?" + query, headers=headers, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data else: print(response.text) # 6.4 查询设备的通道GPS位置分页 def get_device_channel_gps(dev_id: str): token = get_redis_token() nonce = get_nonce() timestamp = str(unixstamp()) uri = "/open/api/v1/device/" + dev_id + "/channel-gps-point" sign_data = get_sign_data(nonce, timestamp, uri) headers = { "Content-Type": 'application/json', "Authorization": token, "X-Nonce": nonce, "X-Timestamp": timestamp, "X-Signature": sign_data } response = requests.get(API_ROOT + uri, headers=headers, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data else: print(response.text) else: print(response.text) # 7.1 查询通道 # 7.2 查询通道分页 def get_channel_all(): token = get_redis_token() nonce = get_nonce() timestamp = str(unixstamp()) uri = "/open/api/v1/device-channel/page" json_data = { "domain": "mm.zw.yj" } query = "json="+json.dumps(json_data, ensure_ascii=False) + "&page_num=1&page_size=1000" sign_data = get_sign_data(nonce, timestamp, uri, query) headers = { "Content-Type": 'application/json', "Authorization": token, "X-Nonce": nonce, "X-Timestamp": timestamp, "X-Signature": sign_data } response = requests.get(API_ROOT + uri + "?" + query, headers=headers, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data else: print(response.text) # 7.3 查询通道GPS位置