YzyApi.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177
  1. #!/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3. import random
  4. import string
  5. import hashlib
  6. import time
  7. import base64
  8. import json
  9. from utils.redis_util import *
  10. import requests
  11. from exceptions import YzyException
  12. # 应用名称:茂名市智慧应急平台
  13. # https://open.weixin.qq.com/connect/Oauth2/authorize?appid=wld341060039&redirect_uri=https://www.baidu.com/#/&response_type=code&scope=snsapi_base&agentid=1004302&state=xxxxxx#wechat_redirect
  14. YZY_HOST = "http://19.15.0.128:8080"
  15. # YZY_HOST = "https://xtbg.digitalgd.com.cn"
  16. YZY_AGENTID = 1004302
  17. YZY_CORPID = "wld341060039"
  18. YZY_CORPSECRET = "5_8aOBBjioNbP7KDwjyBKmwnJ05-y1WbaJlt4irM1eA"
  19. YZY_ACCESS_TOKEN_REDIS_KEY = "YZY_ACCESS_TOKEN_REDIS_KEY"
  20. YZH_PASSID = "yzy_demo"
  21. YZH_PASSTOKEN = "WjKat55cv6PrJtpCHld0trrHsv1mbCqL"
  22. def __get_access_token():
  23. url = "{}/ebus/yzyapi/cgi-bin/gettoken?corpid={}&corpsecret={}".format(YZY_HOST, YZY_CORPID, YZY_CORPSECRET)
  24. print('yzy url:', url)
  25. timestamp = str(int(time.time()))
  26. nonce = ranstr(20)
  27. signature = calcResponseSign(timestamp, YZH_PASSTOKEN, nonce)
  28. headers = {
  29. 'Content-Type': 'application/json;charset=UTF-8',
  30. "x-tif-signature": signature,
  31. "x-tif-timestamp": timestamp,
  32. "x-tif-nonce": nonce,
  33. "x-tif-paasid": YZH_PASSID
  34. }
  35. response = requests.get(url, headers=headers, timeout=15)
  36. print('yzy return:', response.text)
  37. if response.status_code == 200 :
  38. result = response.json()
  39. errcode = int(result['errcode'])
  40. if errcode == 0:
  41. return result['access_token']
  42. else:
  43. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  44. def get_cache_access_token():
  45. access_token = redis_get(YZY_ACCESS_TOKEN_REDIS_KEY)
  46. if access_token is None:
  47. access_token = __get_access_token()
  48. redis_set_with_time(YZY_ACCESS_TOKEN_REDIS_KEY, access_token, 3600)
  49. return access_token
  50. def get_user_info(code: str):
  51. access_token = get_cache_access_token()
  52. url = "{}/ebus/yzyapi/cgi-bin/user/getuserinfo?access_token={}&code={}".format(YZY_HOST, access_token, code)
  53. return __post_url__(url, {})
  54. def send_text_message(users, content: str):
  55. access_token = get_cache_access_token()
  56. url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(YZY_HOST, access_token)
  57. data = {
  58. "touser": "|".join(users),
  59. "msgtype" : "text",
  60. "agentid" : YZY_AGENTID,
  61. "text": {
  62. "content": content
  63. }
  64. }
  65. return __post_url__(url, data)
  66. def send_textcard_message(users, title: str, description: str, detail_url: str):
  67. access_token = get_cache_access_token()
  68. url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(YZY_HOST, access_token)
  69. data = {
  70. "touser": "|".join(users),
  71. "msgtype" : "textcard",
  72. "agentid" : YZY_AGENTID,
  73. "textcard": {
  74. "title": title,
  75. "description": description,
  76. "url": detail_url
  77. }
  78. }
  79. return __post_url__(url, data)
  80. def __post_url__(url, data):
  81. print('yzy url:', url)
  82. print('yzy data:', data)
  83. json_str = json.dumps(data, ensure_ascii=False)
  84. data = json_str.encode('utf-8')
  85. timestamp = str(int(time.time()))
  86. nonce = ranstr(20)
  87. signature = calcResponseSign(timestamp, YZH_PASSTOKEN, nonce)
  88. headers = {
  89. 'Content-Type': 'application/json;charset=UTF-8',
  90. "x-tif-signature": signature,
  91. "x-tif-timestamp": timestamp,
  92. "x-tif-nonce": nonce,
  93. "x-tif-paasid": YZH_PASSID
  94. }
  95. response = requests.post(url, data=data,headers=headers, timeout=15)
  96. print('yzy return:', response.text)
  97. if response.status_code == 200 :
  98. result = response.json()
  99. errcode = int(result['errcode'])
  100. if errcode == 0:
  101. return True
  102. else:
  103. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  104. def ranstr(num):
  105. salt = ''.join(random.sample(
  106. string.ascii_letters + string.digits, num))
  107. return salt
  108. #
  109. #
  110. # 生成校验码
  111. #
  112. #
  113. def authentication(timestamp, token, nonce, uid, uinfo, ext, signature):
  114. sign_data_sha256 = calcRequestSign(
  115. timestamp, token, nonce, uid, uinfo, ext)
  116. return sign_data_sha256 == signature.upper()
  117. #
  118. #
  119. # 计算校验码
  120. #
  121. #
  122. def calcResponseSign(timestamp, token, nonce):
  123. sign_data = "{}{}{}{}".format(timestamp, token, nonce, timestamp)
  124. return hashlib.sha256(
  125. sign_data.encode("utf8")
  126. ).hexdigest().upper()
  127. #
  128. #
  129. # 计算校验码
  130. #
  131. #
  132. def calcRequestSign(timestamp, token, nonce, uid, uinfo, ext):
  133. sign_data = "{}{}{},{},".format(
  134. timestamp, token, nonce, uid)
  135. if len(uinfo) == 0:
  136. sign_data = sign_data + ","
  137. else:
  138. sign_data = sign_data + uinfo + ","
  139. if len(ext) == 0:
  140. sign_data = sign_data
  141. else:
  142. sign_data = sign_data + ext
  143. sign_data = sign_data + timestamp
  144. return hashlib.sha256(
  145. sign_data.encode("utf8")
  146. ).hexdigest().upper()