YzyApi.py 5.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211
  1. #!/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3. import random
  4. import string
  5. import hashlib
  6. import time
  7. import json
  8. from utils.redis_util import *
  9. import requests
  10. from urllib.parse import quote
  11. import base64
  12. from utils import *
  13. from exceptions import YzyException
  14. from extensions import logger
  15. from config import settings
  16. # 应用名称:茂名市智慧应急平台
  17. # https://open.weixin.qq.com/connect/Oauth2/authorize?appid=wld341060039&redirect_uri=https://www.baidu.com/#/&response_type=code&scope=snsapi_base&agentid=1004302&state=xxxxxx#wechat_redirect
  18. YZY_HOST = "http://19.15.0.128:8080"
  19. # YZY_HOST = "https://xtbg.digitalgd.com.cn"
  20. YZY_AGENTID = 1004302
  21. YZY_CORPID = "wld341060039"
  22. YZY_CORPSECRET = "5_8aOBBjioNbP7KDwjyBKmwnJ05-y1WbaJlt4irM1eA"
  23. YZY_ACCESS_TOKEN_REDIS_KEY = "YZY_ACCESS_TOKEN_REDIS_KEY"
  24. YZH_PASSID = "yzy_demo"
  25. YZH_PASSTOKEN = "WjKat55cv6PrJtpCHld0trrHsv1mbCqL"
  26. def __get_access_token():
  27. url = "{}/ebus/yzyapi/cgi-bin/gettoken?corpid={}&corpsecret={}".format(YZY_HOST, YZY_CORPID, YZY_CORPSECRET)
  28. print('yzy url:', url)
  29. timestamp = str(int(time.time()))
  30. nonce = ranstr(20)
  31. signature = calcResponseSign(timestamp, YZH_PASSTOKEN, nonce)
  32. headers = {
  33. 'Content-Type': 'application/json;charset=UTF-8',
  34. "x-tif-signature": signature,
  35. "x-tif-timestamp": timestamp,
  36. "x-tif-nonce": nonce,
  37. "x-tif-paasid": YZH_PASSID
  38. }
  39. response = requests.get(url, headers=headers, timeout=15)
  40. print('yzy return:', response.text)
  41. if response.status_code == 200 :
  42. result = response.json()
  43. errcode = int(result['errcode'])
  44. if errcode == 0:
  45. return result['access_token']
  46. else:
  47. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  48. def get_cache_access_token():
  49. access_token = redis_get(YZY_ACCESS_TOKEN_REDIS_KEY)
  50. if access_token is None:
  51. access_token = __get_access_token()
  52. redis_set_with_time(YZY_ACCESS_TOKEN_REDIS_KEY, access_token, 3600)
  53. return access_token
  54. def get_user_info(code: str):
  55. access_token = get_cache_access_token()
  56. url = "{}/ebus/yzyapi/cgi-bin/user/getuserinfo?access_token={}&code={}".format(YZY_HOST, access_token, code)
  57. return __post_url__(url, {})
  58. def send_text_message(users, content: str):
  59. access_token = get_cache_access_token()
  60. url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(YZY_HOST, access_token)
  61. data = {
  62. "touser": "|".join(users),
  63. "msgtype" : "text",
  64. "agentid" : YZY_AGENTID,
  65. "text": {
  66. "content": content
  67. }
  68. }
  69. return __post_url__(url, data)
  70. def send_textcard_message(users, title: str, description: str, detail_url: str):
  71. access_token = get_cache_access_token()
  72. url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(YZY_HOST, access_token)
  73. touser = ""
  74. if isinstance(users, list) == True:
  75. touser = "|".join(users)
  76. else:
  77. touser = str(users)
  78. data = {
  79. "touser": touser,
  80. "msgtype" : "textcard",
  81. "agentid" : YZY_AGENTID,
  82. "textcard": {
  83. "title": title,
  84. "description": description,
  85. "url": detail_url
  86. }
  87. }
  88. return __post_url__(url, data)
  89. def __post_url__(url, data):
  90. print('yzy url:', url)
  91. print('yzy data:', data)
  92. json_str = json.dumps(data, ensure_ascii=False)
  93. data = json_str.encode('utf-8')
  94. timestamp = str(int(time.time()))
  95. nonce = ranstr(20)
  96. signature = calcResponseSign(timestamp, YZH_PASSTOKEN, nonce)
  97. headers = {
  98. 'Content-Type': 'application/json;charset=UTF-8',
  99. "x-tif-signature": signature,
  100. "x-tif-timestamp": timestamp,
  101. "x-tif-nonce": nonce,
  102. "x-tif-paasid": YZH_PASSID
  103. }
  104. response = requests.post(url, data=data,headers=headers, timeout=5)
  105. print('yzy return:', response.text)
  106. if response.status_code == 200 :
  107. result = response.json()
  108. return result
  109. '''
  110. errcode = int(result['errcode'])
  111. if errcode == 0:
  112. return True
  113. else:
  114. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  115. '''
  116. def ranstr(num):
  117. salt = ''.join(random.sample(
  118. string.ascii_letters + string.digits, num))
  119. return salt
  120. #
  121. #
  122. # 生成校验码
  123. #
  124. #
  125. def authentication(timestamp, token, nonce, uid, uinfo, ext, signature):
  126. sign_data_sha256 = calcRequestSign(
  127. timestamp, token, nonce, uid, uinfo, ext)
  128. return sign_data_sha256 == signature.upper()
  129. #
  130. #
  131. # 计算校验码
  132. #
  133. #
  134. def calcResponseSign(timestamp, token, nonce):
  135. sign_data = "{}{}{}{}".format(timestamp, token, nonce, timestamp)
  136. return hashlib.sha256(
  137. sign_data.encode("utf8")
  138. ).hexdigest().upper()
  139. #
  140. #
  141. # 计算校验码
  142. #
  143. #
  144. def calcRequestSign(timestamp, token, nonce, uid, uinfo, ext):
  145. sign_data = "{}{}{},{},".format(
  146. timestamp, token, nonce, uid)
  147. if len(uinfo) == 0:
  148. sign_data = sign_data + ","
  149. else:
  150. sign_data = sign_data + uinfo + ","
  151. if len(ext) == 0:
  152. sign_data = sign_data
  153. else:
  154. sign_data = sign_data + ext
  155. sign_data = sign_data + timestamp
  156. return hashlib.sha256(
  157. sign_data.encode("utf8")
  158. ).hexdigest().upper()
  159. def format_redirect_url(redirect_url: str) -> str:
  160. yzy_callback_url = quote(settings.YJXP_CALLBACK_WEB_PATH)
  161. logger.info("yzy_callback_url: {}", yzy_callback_url)
  162. state_json = {
  163. "redirect_url": redirect_url,
  164. "rnd": new_guid()
  165. }
  166. state_str = json.dumps(state_json)
  167. # print(state_str)
  168. state = base64.b64encode(state_str.encode('utf-8')).decode('utf-8')
  169. detail_url = "https://open.weixin.qq.com/connect/Oauth2/authorize?appid={}&redirect_uri={}&response_type=code&scope=snsapi_base&agentid={}&state={}#wechat_redirect".format(
  170. YZY_CORPID,
  171. yzy_callback_url,
  172. YZY_AGENTID,
  173. state)
  174. logger.info("detail_url: {}", detail_url)
  175. return detail_url