123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import requests
- import json
- import time
- import uuid
- import hmac
- import hashlib
- import base64
- from requests.packages.urllib3.exceptions import InsecureRequestWarning
- requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
- HKSDK_BASE_URL = f"https://19.155.243.66/artemis"
- def __hksdk_generate_signature(path):
- app_key = "28132932"
- app_secret = "vPUDL4ilt2mENr8XEfa7"
- # 设置请求头
- timestamp = str(int(time.time() * 1000))
- nonce = str(uuid.uuid4())
- headers = {
- "Accept": "*/*",
- "Content-Type": "application/json",
- "x-ca-key": app_key,
- "x-ca-nonce": nonce,
- "x-ca-timestamp": timestamp,
- "x-ca-signature-headers": "x-ca-key,x-ca-nonce,x-ca-timestamp"
- }
- # 拼接签名字符串
- nonce = headers.get("x-ca-nonce")
- timestamp = headers.get("x-ca-timestamp")
- sign_str = "POST\n*/*\napplication/json" + "\nx-ca-key:" + app_key + "\nx-ca-nonce:" + \
- nonce + "\nx-ca-timestamp:" + timestamp + "\n" + \
- "/artemis"+path
- # 生成签名
- signature = hmac.new(app_secret.encode(), sign_str.encode(), hashlib.sha256).digest()
- headers["x-ca-signature"]=base64.b64encode(signature).decode()
- return headers
- def get_video_url(id: str, protocol: str):
- video_url = ""
- path = "/api/video/v1/cameras/previewURLs"
- url = f"{HKSDK_BASE_URL}{path}"
- body = {
- "cameraIndexCode": id,
- "streamType":1, # 子码流
- "protocol": protocol,
- "transmode": 0, # 使用TCP传输
- "expand": "transcode=1&videotype=h264&resolution=D1"
- }
- if protocol == 'hlss':
- body['streamType'] = 0
- headers = __hksdk_generate_signature(path)
- response = requests.post(url, headers=headers, json=body, verify=False)
- if response.status_code == 200:
- data = response.json()
- if isinstance(data, dict) and data['code'] == '0':
- video_url = data['data']['url']
- return video_url
- def indexCode(id: str):
- # api_url = "http://10.181.7.236:8081/indexCode"
- # params = {
- # "cameraIndexCode": id
- # }
- # # print('param', params)
- # response = requests.post(url=api_url, params=params, timeout=15)
- # # print(response.text)
- # if response.status_code == 200:
- # result = response.json()
- # if result['errcode'] == 0:
- # data = result['data']
- # if isinstance(data,str):
- # data = json.loads(data)
- # return data
- path = "/api/resource/v1/cameras/indexCode"
- url = f"{HKSDK_BASE_URL}{path}"
- body = {
- "cameraIndexCode": id
- }
- headers = __hksdk_generate_signature(path)
- response = requests.post(url, headers=headers, json=body, verify=False)
- print(response.text)
- if response.status_code == 200:
- data = response.json()
- if isinstance(data, dict) and data['code'] == '0':
- return data['data']
- return ""
- def controlling(id: str, action: int, command: str, speed: int, presetIndex: str):
- # api_url = "http://10.181.7.236:8081/controlling"
- # params = {
- # "cameraIndexCode": id,
- # "action": action,
- # "command": command,
- # "speed": speed,
- # "presetIndex": presetIndex
- # }
- # print('param', params)
- # response = requests.post(url=api_url, params=params, timeout=15)
- # print(response.text)
- # if response.status_code == 200:
- # result = response.json()
- # if result['errcode'] == 0:
- # data = result['data']
- # return data
- path = "/api/video/v1/ptzs/controlling"
- url = f"{HKSDK_BASE_URL}{path}"
- body = {
- "cameraIndexCode": id,
- "action": action,
- "command": command,
- "speed": speed,
- "presetIndex": presetIndex
- }
- headers = __hksdk_generate_signature(path)
- response = requests.post(url, headers=headers, json=body, verify=False)
- print(response.text)
- if response.status_code == 200:
- data = response.json()
- if isinstance(data, dict) and data['code'] == '0':
- return data['data']
- return ""
- class HikvisionAPI:
- def __init__(self):
- self.app_key = "20034027"
- self.app_secret = "tfa3PNeYeN2MOrbRj3wS"
- self.base_url = f"https://19.155.243.66/artemis"
- # "/artemis/api/resource/v1/cameras"
- def _generate_signature(self, path):
- # 设置请求头
- timestamp = str(int(time.time() * 1000))
- nonce = str(uuid.uuid4())
- headers = {
- "Accept": "*/*",
- "Content-Type": "application/json",
- "x-ca-key": self.app_key,
- "x-ca-nonce": nonce,
- "x-ca-timestamp": timestamp,
- "x-ca-signature-headers": "x-ca-key,x-ca-nonce,x-ca-timestamp"
- }
- # 拼接签名字符串
- nonce = headers.get("x-ca-nonce")
- timestamp = headers.get("x-ca-timestamp")
- sign_str = "POST\n*/*\napplication/json" + "\nx-ca-key:" + self.app_key + "\nx-ca-nonce:" + \
- nonce + "\nx-ca-timestamp:" + timestamp + "\n" + \
- "/artemis"+path
- # 生成签名
- signature = hmac.new(self.app_secret.encode(), sign_str.encode(), hashlib.sha256).digest()
- headers["x-ca-signature"]=base64.b64encode(signature).decode()
- return headers
- def get_cameras(self,pageNo=1,pageSize=1):
- path = "/api/resource/v1/cameras"
- url = f"{self.base_url}{path}"
- body = {
- "pageNo": pageNo,"pageSize":pageSize
- }
- headers =self._generate_signature(path)
- response = requests.post(url, headers=headers, json=body, verify=False)
- if response.status_code == 200:
- return response.json()
- else:
- raise Exception(f"Failed to get preview URL: {response.text}")
- def get_regions(self,pageNo=1,pageSize=1):
- path = "/api/resource/v1/regions"
- url = f"{self.base_url}{path}"
- body = {
- "pageNo": pageNo,"pageSize":pageSize,
- "treeCode": "0"
- }
- headers =self._generate_signature(path)
- response = requests.post(url, headers=headers, json=body, verify=False)
- if response.status_code == 200:
- return response.json()
- else:
- raise Exception(f"Failed to get preview URL: {response.text}")
|