YzyApi.py 5.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196
  1. #!/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3. import random
  4. import string
  5. import hashlib
  6. import time
  7. import json
  8. from utils.redis_util import *
  9. import requests
  10. from urllib.parse import quote
  11. import base64
  12. from utils import *
  13. from exceptions import YzyException
  14. from extensions import logger
  15. from config import settings
  16. from sqlalchemy.orm import Session
  17. # 应用名称:茂名市智慧应急平台
  18. # https://open.weixin.qq.com/connect/Oauth2/authorize?appid=wld341060039&redirect_uri=https://www.baidu.com/#/&response_type=code&scope=snsapi_base&agentid=1004302&state=xxxxxx#wechat_redirect
  19. YZY_ACCESS_TOKEN_REDIS_KEY = "YZY_ACCESS_TOKEN_REDIS_KEY"
  20. '''
  21. YZY_API_ROOT = "http://19.15.0.128:8080"
  22. YZY_AGENTID = 1004302
  23. YZY_CORPID = "wld341060039"
  24. YZY_CORPSECRET = "5_8aOBBjioNbP7KDwjyBKmwnJ05-y1WbaJlt4irM1eA"
  25. YZY_PASSID = "yzy_demo"
  26. YZY_PASSTOKEN = "WjKat55cv6PrJtpCHld0trrHsv1mbCqL"
  27. '''
  28. def __get_access_token():
  29. url = "{}/ebus/yzyapi/cgi-bin/gettoken?corpid={}&corpsecret={}".format(settings.YZY_API_ROOT, settings.YZY_CORPID, settings.YZY_CORPSECRET)
  30. print('yzy url:', url)
  31. timestamp = str(int(time.time()))
  32. nonce = ranstr(20)
  33. signature = calcResponseSign(timestamp, settings.YZY_PASSTOKEN, nonce)
  34. headers = {
  35. 'Content-Type': 'application/json;charset=UTF-8',
  36. "x-tif-signature": signature,
  37. "x-tif-timestamp": timestamp,
  38. "x-tif-nonce": nonce,
  39. "x-tif-paasid": settings.YZY_PASSID
  40. }
  41. response = requests.get(url, headers=headers, timeout=15)
  42. print('yzy return:', response.text)
  43. if response.status_code == 200 :
  44. result = response.json()
  45. errcode = int(result['errcode'])
  46. if errcode == 0:
  47. return (result['access_token'], result['expires_in'])
  48. else:
  49. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  50. def get_cache_access_token():
  51. access_token = redis_get(YZY_ACCESS_TOKEN_REDIS_KEY)
  52. if access_token is None:
  53. access_token, expires_in = __get_access_token()
  54. redis_set_with_time(YZY_ACCESS_TOKEN_REDIS_KEY, access_token, expires_in - 600)
  55. return access_token
  56. def get_user_info(code: str):
  57. access_token = get_cache_access_token()
  58. url = "{}/ebus/yzyapi/cgi-bin/user/getuserinfo?access_token={}&code={}".format(settings.YZY_API_ROOT, access_token, code)
  59. return __post_url__(url, {})
  60. def send_text_message(users, content: str):
  61. access_token = get_cache_access_token()
  62. url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(settings.YZY_API_ROOT, access_token)
  63. data = {
  64. "touser": "|".join(users),
  65. "msgtype" : "text",
  66. "agentid" : settings.YZY_AGENTID,
  67. "text": {
  68. "content": content
  69. }
  70. }
  71. return __post_url__(url, data)
  72. def send_textcard_message(users, title: str, description: str, detail_url):
  73. access_token = get_cache_access_token()
  74. url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(settings.YZY_API_ROOT, access_token)
  75. touser = ""
  76. if isinstance(users, list) == True:
  77. touser = "|".join(users)
  78. else:
  79. touser = str(users)
  80. data = {
  81. "touser": touser,
  82. "msgtype" : "textcard",
  83. "agentid" : settings.YZY_AGENTID,
  84. "textcard": {
  85. "title": title,
  86. "description": description,
  87. "url": detail_url
  88. }
  89. }
  90. return __post_url__(url, data)
  91. def __post_url__(url, data):
  92. print('yzy url:', url)
  93. print('yzy data:', data)
  94. json_str = json.dumps(data, ensure_ascii=False)
  95. data = json_str.encode('utf-8')
  96. timestamp = str(int(time.time()))
  97. nonce = ranstr(20)
  98. signature = calcResponseSign(timestamp, settings.YZY_PASSTOKEN, nonce)
  99. headers = {
  100. 'Content-Type': 'application/json;charset=UTF-8',
  101. "x-tif-signature": signature,
  102. "x-tif-timestamp": timestamp,
  103. "x-tif-nonce": nonce,
  104. "x-tif-paasid": settings.YZY_PASSID
  105. }
  106. response = requests.post(url, data=data,headers=headers, timeout=5)
  107. print('yzy return:', response.text)
  108. if response.status_code == 200 :
  109. result = response.json()
  110. return result
  111. '''
  112. errcode = int(result['errcode'])
  113. if errcode == 0:
  114. return True
  115. else:
  116. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  117. '''
  118. def ranstr(num):
  119. salt = ''.join(random.sample(
  120. string.ascii_letters + string.digits, num))
  121. return salt
  122. #
  123. #
  124. # 生成校验码
  125. #
  126. #
  127. def authentication(timestamp, token, nonce, uid, uinfo, ext, signature):
  128. sign_data_sha256 = calcRequestSign(
  129. timestamp, token, nonce, uid, uinfo, ext)
  130. return sign_data_sha256 == signature.upper()
  131. #
  132. #
  133. # 计算校验码
  134. #
  135. #
  136. def calcResponseSign(timestamp, token, nonce):
  137. sign_data = "{}{}{}{}".format(timestamp, token, nonce, timestamp)
  138. return hashlib.sha256(
  139. sign_data.encode("utf8")
  140. ).hexdigest().upper()
  141. #
  142. #
  143. # 计算校验码
  144. #
  145. #
  146. def calcRequestSign(timestamp, token, nonce, uid, uinfo, ext):
  147. sign_data = "{}{}{},{},".format(
  148. timestamp, token, nonce, uid)
  149. if len(uinfo) == 0:
  150. sign_data = sign_data + ","
  151. else:
  152. sign_data = sign_data + uinfo + ","
  153. if len(ext) == 0:
  154. sign_data = sign_data
  155. else:
  156. sign_data = sign_data + ext
  157. sign_data = sign_data + timestamp
  158. return hashlib.sha256(
  159. sign_data.encode("utf8")
  160. ).hexdigest().upper()
  161. def format_redirect_url(url: str) -> str:
  162. return f"{settings.YZY_WEB_ROOT}/yjxp/#/?redirect_url={url}"
  163. def add_to_msg_queue(db: Session, data: dict) -> None:
  164. new_msg = YzyMsgQueue(**data, sent_status = 0, create_time = datetime.now())
  165. db.add(new_msg)
  166. db.commit()