hk_video_api.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. import requests
  4. import json
  5. import time
  6. import uuid
  7. import hmac
  8. import hashlib
  9. import base64
  10. from requests.packages.urllib3.exceptions import InsecureRequestWarning
  11. requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
  12. def get_video_url(id: str, protocol: str):
  13. video_url = ""
  14. # api_url = "http://10.181.7.217:8081/previewURLs"
  15. api_url = "http://10.181.7.236:8081/previewURLs"
  16. params = {
  17. "cameraIndexCode": id,
  18. "protocol": protocol
  19. }
  20. print('param', params)
  21. response = requests.post(url=api_url, params=params, timeout=15)
  22. print(response.text)
  23. if response.status_code == 200:
  24. result = response.json()
  25. if result['errcode'] == 0:
  26. video_url = result['data']
  27. return video_url
  28. def indexCode(id: str):
  29. api_url = "http://10.181.7.236:8081/indexCode"
  30. params = {
  31. "cameraIndexCode": id
  32. }
  33. # print('param', params)
  34. response = requests.post(url=api_url, params=params, timeout=15)
  35. # print(response.text)
  36. if response.status_code == 200:
  37. result = response.json()
  38. if result['errcode'] == 0:
  39. data = result['data']
  40. if isinstance(data,str):
  41. data = json.loads(data)
  42. return data
  43. def controlling(id: str, action: int, command: str, speed: int, presetIndex: str):
  44. api_url = "http://10.181.7.236:8081/controlling"
  45. params = {
  46. "cameraIndexCode": id,
  47. "action": action,
  48. "command": command,
  49. "speed": speed,
  50. "presetIndex": presetIndex
  51. }
  52. print('param', params)
  53. response = requests.post(url=api_url, params=params, timeout=15)
  54. print(response.text)
  55. if response.status_code == 200:
  56. result = response.json()
  57. if result['errcode'] == 0:
  58. data = result['data']
  59. return data
  60. class HikvisionAPI:
  61. def __init__(self):
  62. self.app_key = "20034027"
  63. self.app_secret = "tfa3PNeYeN2MOrbRj3wS"
  64. self.base_url = f"https://19.155.243.66/artemis"
  65. # "/artemis/api/resource/v1/cameras"
  66. def _generate_signature(self, path):
  67. # 设置请求头
  68. timestamp = str(int(time.time() * 1000))
  69. nonce = str(uuid.uuid4())
  70. headers = {
  71. "Accept": "*/*",
  72. "Content-Type": "application/json",
  73. "x-ca-key": self.app_key,
  74. "x-ca-nonce": nonce,
  75. "x-ca-timestamp": timestamp,
  76. "x-ca-signature-headers": "x-ca-key,x-ca-nonce,x-ca-timestamp"
  77. }
  78. # 拼接签名字符串
  79. nonce = headers.get("x-ca-nonce")
  80. timestamp = headers.get("x-ca-timestamp")
  81. sign_str = "POST\n*/*\napplication/json" + "\nx-ca-key:" + self.app_key + "\nx-ca-nonce:" + \
  82. nonce + "\nx-ca-timestamp:" + timestamp + "\n" + \
  83. "/artemis"+path
  84. # 生成签名
  85. signature = hmac.new(self.app_secret.encode(), sign_str.encode(), hashlib.sha256).digest()
  86. headers["x-ca-signature"]=base64.b64encode(signature).decode()
  87. return headers
  88. def get_cameras(self,pageNo=1,pageSize=1):
  89. path = "/api/resource/v1/cameras"
  90. url = f"{self.base_url}{path}"
  91. body = {
  92. "pageNo": pageNo,"pageSize":pageSize
  93. }
  94. headers =self._generate_signature(path)
  95. response = requests.post(url, headers=headers, json=body, verify=False)
  96. if response.status_code == 200:
  97. return response.json()
  98. else:
  99. raise Exception(f"Failed to get preview URL: {response.text}")
  100. def get_regions(self,pageNo=1,pageSize=1):
  101. path = "/api/resource/v1/regions"
  102. url = f"{self.base_url}{path}"
  103. body = {
  104. "pageNo": pageNo,"pageSize":pageSize,
  105. "treeCode": "0"
  106. }
  107. headers =self._generate_signature(path)
  108. response = requests.post(url, headers=headers, json=body, verify=False)
  109. if response.status_code == 200:
  110. return response.json()
  111. else:
  112. raise Exception(f"Failed to get preview URL: {response.text}")