#!/usr/bin/env python3 # -*- coding: utf-8 -*- import requests from utils.redis_util import * app_id = "f41ddc53aee74905834ccfc93fe91787" secret= "d70322a3cc40d1a345539800c417fcf2" ''' 融合通信H5对接 ''' API_ROOT = "http://19.152.196.223:5050/rpy" # 1.1获取Token ''' { "code": 0, "msg": "Success.", "data": { "app_id": "5cdf02e49d434b3ac4c272a39d3211ca", "secret": "f3e2182a13690b75ff87c5273063201", "token": "Bearer eyJhbGciOiJIUzUxMiJ9.eyJpc3MiOiJhdmNvbiIsInN1YiI6IjczNjkzYmI5ZGYwYTRiYjZiMWMyNTMxMWM0NjAwM2Q0IiwiaWF0IjoxNjM5MjA0MTgzLCJ1c2VyX3R5cGUiOiJvYXV0aCIsInJvbGVzIjpbIlJPTEVfb2F1dGgiXX0MS5JX9NJ7dQomZ4-udFukGMQsFFTMyYTKZ9wS0VgjTYM4A2MvpDZeGB4Ish3GOcH4Z_Sn2TAijyZYrJtLGAmww"} } ''' def get_token(): params = { "app_id": app_id, "secret": secret } api_url = API_ROOT + "/token" print('param', params) response = requests.get(url=api_url, params=params, timeout=5) print(response.text) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data['token'].removeprefix("Bearer ") def get_redis_token(): redis_key = "avcon_h5_api_token" token_val = redis_get(redis_key) if token_val is None: token_val = get_token() if token_val is not None: redis_set_with_time(redis_key, token_val, 300) # print('token:', token_val) return token_val # 2.1获取区域 ''' { "code": 0, "msg": "Success.", "data": { count:1 region:[{ "region_id": "YS@hp", "region_name": "演示图像资源" }] } } ''' def get_region(): token = get_redis_token() headers = { 'Content-Type': 'application/json;charset=UTF-8', "Authorization": "Bearer " + token } api_url = API_ROOT + "/region" response = requests.get(url=api_url, headers=headers, timeout=5) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data['region'] else: print(response.text) # 2.2获取区域组节点 ''' { "code": 0, "msg": "Success.", "data": { "count": 4, "group": [ { "group_id": "Gc@hp", "group_name": "监控设备", "domain": "hp", "parent_id": "S1@hp", "level_id": 1, "order_id": 9999, "child_count": 1 }, ....略.... ] } } ''' def get_group(region_id: str, parent_id: str = ''): token = get_redis_token() headers = { 'Content-Type': 'application/json;charset=UTF-8', "Authorization": "Bearer " + token } params = { "region_id": region_id } if parent_id != '': params = { "parent_id": parent_id } api_url = API_ROOT + "/group" response = requests.get(url=api_url, headers=headers, params=params, timeout=5) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data['group'] else: print(response.text) # 2.3根据名称搜索区域下所有的节点 ''' { "code": 0, "msg": "Success.", "data": [ { "group_id": "G1@xf", "group_name": "消防指挥域", "domain": "xf", "parent_id": "G0@xf", "level_id": 1, "order_id": 9999, "child_count": 0, //区域组数 "total_number": 0, //设备数 }, { "group_id": "G2@xf", "group_name": "消防指挥域", "domain": "xf", "parent_id": "G1@xf", "level_id": 2, "order_id": 9999, "child_count": 2, //区域组数 "total_number": 1, //设备数 "child_group": [ { "group_id": "G110@xf", "group_name": "指挥中心", "domain": "xf", "parent_id": "G2@xf", "level_id": 2, "order_id": 9999, "child_count": 0, "total_number": 1, //设备数 "child_device": [ { "dev_id": "cy0001@xf", "dev_name": "sx", "domain": "xf", "group_id": "G110@xf", "channel_num": 1, "status": 0, "dev_type": "001", "child_channel": [ { "channel_id": "cy0001@xf_00", "channel_name": "cy0001@xf_00", "channel_no": 0, "dev_id": "cy0001@xf", "status": 1, "lat": 28.28, "lng": 116.38 } ] } ] }, { "group_id": "G110@xf", "group_name": "电视墙", "domain": "xf", "parent_id": "G110@xf", "level_id": 2, "order_id": 9999, "child_count": 0, "total_number": 0 } ], "child_device": [ { "dev_id": "sx@xf", "dev_name": "sx", "domain": "xf", "group_id": "G110@xf", "channel_num": 1, "status": 0, "dev_type": "001", "child_channel": [ { "channel_id": "sx@xf_00", "channel_name": "sx@xf_00", "channel_no": 0, "dev_id": "sx@xf", "status": 0 } ] } ] } ] } ''' def get_search_region(region_name: str): token = get_redis_token() headers = { 'Content-Type': 'application/json;charset=UTF-8', "Authorization": "Bearer " + token } params = { "region_name": region_name } api_url = API_ROOT + "/search/region" response = requests.get(url=api_url, headers=headers, params=params, timeout=5) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data else: print(response.text) # 2.4搜索范围内的通道信息及直播流地址 ''' { "code": 0, "msg": "Success.", "data": { "count": 2, "channel": [ { "channel_id": "hk01@hp_00", "channel_name": "学校校门外监控", "channel_no": 0, "dev_id": "hk01@hp", "status": 1, "lat": 28.631629, "lng": 115.877328, "live_url": "http://192.168.0.1:1935/live/hk01@hp_00.flv" }, { "channel_id": "hk02@hp_00", "channel_name": "学校校门外监控2", "channel_no": 0, "dev_id": "hk02@hp", "status": 1, "lat": 28.631529, "lng": 115.874328, "live_url": "http://192.168.0.1:1935/live/hk02@hp_00.flv" } ] } } ''' def get_search_live_location(center_lat: float, center_lng: float, point_lat: float, point_lng: float): token = get_redis_token() headers = { 'Content-Type': 'application/json;charset=UTF-8', "Authorization": "Bearer " + token } params = { "center_lat": center_lat, "center_lng": center_lng, "point_lat": point_lat, "point_lng": point_lng } api_url = API_ROOT + "/search/live-location" response = requests.get(url=api_url, headers=headers, params=params, timeout=5) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data['channel'] else: print(response.text) # 3.1获取设备 ''' { "code": 0, "msg": "Success.", "data": { "count": 10, "device": [ { "dev_id": "hk01@hp", "dev_name": "海康设备", "domain": "hp", "group_id": "Gc@hp", "channel_num": 1, "status": 1, "dev_type": "135" }, { "dev_id": "jkwg01@hp", "dev_name": "监控网关", "domain": "hp", "group_id": "Gc@hp", "channel_num": 0, "status": 0, "dev_type": "085" } ....略 ] } } ''' def get_group_device(group_id: str, dev_type: dict = None, status: int = -1): token = get_redis_token() headers = { 'Content-Type': 'application/json;charset=UTF-8', "Authorization": "Bearer " + token } params = {} if status != -1: params['status'] = status if dev_type != None: params['dev_type'] = "[" + ",".join(dev_type) + "]" api_url = API_ROOT + "/group/" + group_id + "/device" print('get_group_device:', api_url) response = requests.get(url=api_url, headers=headers, params=params, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data['device'] else: print(response.text) # 4.1 获取通道 ''' { "code": 0, "msg": "Success.", "data": { "count": 1, "channel": [ { "channel_id": "hk01@hp_00", "channel_name": "学校校门外监控", "channel_no": 0, "dev_id": "hk01@hp", "status": 1, "lat": 28.631629, "lng": 115.877328 } ] } } ''' def get_device_channel(device_id: str, status: int = -1): token = get_redis_token() headers = { 'Content-Type': 'application/json;charset=UTF-8', "Authorization": "Bearer " + token } params = {} if status != -1: params['status'] = status api_url = API_ROOT + "/device/" + device_id + "/channel" response = requests.get(url=api_url, headers=headers, params=params, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data['channel'] else: print(response.text) # 4.2 获取所有通道 ''' { "code": 0, "msg": "Success.", "data": { "count": 80, "channel": [ { "channel_id": "hk01@hp_00", "channel_name": "学校校门外监控", "channel_no": 0, "dev_id": "hk01@hp", "status": 1, "lat": 28.631629, "lng": 115.877328 }, { "channel_id": "hk02@hp_00", "channel_name": "学校校门外监控2", "channel_no": 0, "dev_id": "hk02@hp", "status": 1, "lat": 28.631529, "lng": 115.874328 } ...略 ] } } ''' def get_channel_all(status: int = -1, gpsonly: int = -1): token = get_redis_token() headers = { 'Content-Type': 'application/json;charset=UTF-8', "Authorization": "Bearer " + token } params = {} if status != -1: params['status'] = status if gpsonly != -1: params['gpsonly'] = gpsonly api_url = API_ROOT + "/channel/all" response = requests.get(url=api_url, headers=headers, params=params, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data['channel'] else: print(response.text) # 5.1 获取直播流 ''' { "code": 0, "msg": "success.", "data": { "live_url": "http://192.168.0.1:1935/live/hk01@hp_00.flv" } } ''' def get_live_streaming(channel_id: str): token = get_redis_token() headers = { 'Content-Type': 'application/json;charset=UTF-8', "Authorization": "Bearer " + token } api_url = API_ROOT + "/live/streaming/" + channel_id response = requests.get(url=api_url, headers=headers, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data['live_url'] else: print(response.text) # 5.2云台控制 # 没用 # 5.3获取通道直播页面完整地址 ''' { "code":0, "msg":"Success", "data":{ "play_url":"http://192.168.0.88:5050/liveplay.html?liveurl=ws://192.168.0.88:9001/live/gbdh01@xf_00.flv" } } ''' def get_live_playing(channel_id: str): token = get_redis_token() headers = { 'Content-Type': 'application/json;charset=UTF-8', "Authorization": "Bearer " + token } api_url = API_ROOT + "/live/playing/" + channel_id response = requests.get(url=api_url, headers=headers, timeout=15) if response.status_code == 200: result = response.json() if result['code'] == 0: data = result['data'] return data['play_url'] else: print(response.text) if __name__ == '__main__': get_region()