YzyApi.py 7.6 KB


  1. #!/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3. import random
  4. import string
  5. import hashlib
  6. import time
  7. import json
  8. from utils.redis_util import *
  9. import requests
  10. from urllib.parse import quote
  11. import base64
  12. from utils import *
  13. from exceptions import YzyException
  14. from extensions import logger
  15. from config import settings
  16. from sqlalchemy.orm import Session
  17. # 应用名称:茂名市智慧应急平台
  18. # https://open.weixin.qq.com/connect/Oauth2/authorize?appid=wld341060039&redirect_uri=https://www.baidu.com/#/&response_type=code&scope=snsapi_base&agentid=1004302&state=xxxxxx#wechat_redirect
  19. YZY_ACCESS_TOKEN_REDIS_KEY = "YZY_ACCESS_TOKEN_REDIS_KEY"
  20. '''
  21. YZY_API_ROOT = "http://19.15.0.128:8080"
  22. YZY_AGENTID = 1004302
  23. YZY_CORPID = "wld341060039"
  24. YZY_CORPSECRET = "5_8aOBBjioNbP7KDwjyBKmwnJ05-y1WbaJlt4irM1eA"
  25. YZY_PASSID = "yzy_demo"
  26. YZY_PASSTOKEN = "WjKat55cv6PrJtpCHld0trrHsv1mbCqL"
  27. '''
  28. def __get_access_token():
  29. url = "{}/ebus/yzyapi/cgi-bin/gettoken?corpid={}&corpsecret={}".format(settings.YZY_API_ROOT, settings.YZY_CORPID, settings.YZY_CORPSECRET)
  30. print('yzy url:', url)
  31. timestamp = str(int(time.time()))
  32. nonce = ranstr(20)
  33. signature = calcResponseSign(timestamp, settings.YZY_PASSTOKEN, nonce)
  34. headers = {
  35. 'Content-Type': 'application/json;charset=UTF-8',
  36. "x-tif-signature": signature,
  37. "x-tif-timestamp": timestamp,
  38. "x-tif-nonce": nonce,
  39. "x-tif-paasid": settings.YZY_PASSID
  40. }
  41. response = requests.get(url, headers=headers, timeout=15)
  42. print('yzy return:', response.text)
  43. if response.status_code == 200 :
  44. result = response.json()
  45. errcode = int(result['errcode'])
  46. if errcode == 0:
  47. return (result['access_token'], result['expires_in'])
  48. else:
  49. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  50. def get_cache_access_token():
  51. access_token = redis_get(YZY_ACCESS_TOKEN_REDIS_KEY)
  52. if access_token is None:
  53. access_token, expires_in = __get_access_token()
  54. redis_set_with_time(YZY_ACCESS_TOKEN_REDIS_KEY, access_token, expires_in - 600)
  55. return access_token
  56. def get_user_info(code: str):
  57. access_token = get_cache_access_token()
  58. url = "{}/ebus/yzyapi/cgi-bin/user/getuserinfo?access_token={}&code={}".format(settings.YZY_API_ROOT, access_token, code)
  59. return __post_url__(url, {})
  60. def generate_deskey(secret):
  61. # 使用SHA256哈希然后取前8字节
  62. return hashlib.sha256(secret.encode()).digest()[:8]
  63. def generate_signature(appid, secret, curtime):
  64. # 拼接字符串
  65. sign_str = appid + secret + str(curtime)
  66. # 计算SHA256哈希值
  67. sha256_hash = hashlib.sha256(sign_str.encode('utf-8')).hexdigest()
  68. # 转换为大写
  69. signature = sha256_hash.upper()
  70. return signature
  71. # 2.开通用户工作台可见及第三方应用获取用户基本信息接口说明V1.3.pdf
  72. def getuserbycode(code: str):
  73. url = f"{settings.YZY_API_ROOT}/ebus/applicationsyn/getuserbycode"
  74. curtime = get_datetime_str(datetime.now())
  75. signature = generate_signature(settings.YZY_AGENTID, settings.YZY_CORPSECRET, curtime)
  76. data = {
  77. "appid": settings.YZY_AGENTID,
  78. "curtime": curtime,
  79. "code": code,
  80. "signature": signature
  81. }
  82. return __post_url__(url, data)
  83. def send_text_message(users, content: str):
  84. access_token = get_cache_access_token()
  85. url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(settings.YZY_API_ROOT, access_token)
  86. data = {
  87. "touser": "|".join(users),
  88. "msgtype" : "text",
  89. "agentid" : settings.YZY_AGENTID,
  90. "text": {
  91. "content": content
  92. }
  93. }
  94. return __post_url__(url, data)
  95. def send_textcard_message(users, title: str, description: str, detail_url):
  96. access_token = get_cache_access_token()
  97. url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(settings.YZY_API_ROOT, access_token)
  98. touser = ""
  99. if isinstance(users, list) == True:
  100. touser = "|".join(users)
  101. else:
  102. touser = str(users)
  103. data = {
  104. "touser": touser,
  105. "msgtype" : "textcard",
  106. "agentid" : settings.YZY_AGENTID,
  107. "textcard": {
  108. "title": title,
  109. "description": description,
  110. "url": detail_url
  111. }
  112. }
  113. return __post_url__(url, data)
  114. def __post_url__(url, data):
  115. print('yzy url:', url)
  116. print('yzy data:', data)
  117. json_str = json.dumps(data, ensure_ascii=False)
  118. data = json_str.encode('utf-8')
  119. timestamp = str(int(time.time()))
  120. nonce = ranstr(20)
  121. signature = calcResponseSign(timestamp, settings.YZY_PASSTOKEN, nonce)
  122. headers = {
  123. 'Content-Type': 'application/json;charset=UTF-8',
  124. "x-tif-signature": signature,
  125. "x-tif-timestamp": timestamp,
  126. "x-tif-nonce": nonce,
  127. "x-tif-paasid": settings.YZY_PASSID
  128. }
  129. response = requests.post(url, data=data,headers=headers, timeout=5)
  130. print('yzy return:', response.text)
  131. if response.status_code == 200 :
  132. result = response.json()
  133. return result
  134. '''
  135. errcode = int(result['errcode'])
  136. if errcode == 0:
  137. return True
  138. else:
  139. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  140. '''
  141. def ranstr(num):
  142. salt = ''.join(random.sample(
  143. string.ascii_letters + string.digits, num))
  144. return salt
  145. #
  146. #
  147. # 生成校验码
  148. #
  149. #
  150. def authentication(timestamp, token, nonce, uid, uinfo, ext, signature):
  151. sign_data_sha256 = calcRequestSign(
  152. timestamp, token, nonce, uid, uinfo, ext)
  153. return sign_data_sha256 == signature.upper()
  154. #
  155. #
  156. # 计算校验码
  157. #
  158. #
  159. def calcResponseSign(timestamp, token, nonce):
  160. sign_data = "{}{}{}{}".format(timestamp, token, nonce, timestamp)
  161. return hashlib.sha256(
  162. sign_data.encode("utf8")
  163. ).hexdigest().upper()
  164. #
  165. #
  166. # 计算校验码
  167. #
  168. #
  169. def calcRequestSign(timestamp, token, nonce, uid, uinfo, ext):
  170. sign_data = "{}{}{},{},".format(
  171. timestamp, token, nonce, uid)
  172. if len(uinfo) == 0:
  173. sign_data = sign_data + ","
  174. else:
  175. sign_data = sign_data + uinfo + ","
  176. if len(ext) == 0:
  177. sign_data = sign_data
  178. else:
  179. sign_data = sign_data + ext
  180. sign_data = sign_data + timestamp
  181. return hashlib.sha256(
  182. sign_data.encode("utf8")
  183. ).hexdigest().upper()
  184. def format_redirect_url(url: str) -> str:
  185. return f"{settings.YZY_WEB_ROOT}/yjxp/#/?redirect_url={url}"
  186. def add_to_msg_queue(db: Session, data: dict) -> None:
  187. new_msg = YzyMsgQueue(**data, sent_status = 0, create_time = datetime.now())
  188. db.add(new_msg)
  189. db.commit()
  190. # 辅助类
  191. # 调用JAVA编写的密评接口
  192. def desDecryptValue(appSecret: str, value: str) -> any:
  193. data = {}
  194. data['appSecret'] = appSecret
  195. data['value'] = value
  196. headers = {'Content-Type': 'application/json;charset=UTF-8'}
  197. response = requests.post(url="http://127.0.0.1:8052/yzy" + "/DecryptValue", headers=headers, json=data, timeout=60)
  198. if response.status_code == 200:
  199. result = response.json()
  200. print(result)
  201. if result['errcode'] == 0:
  202. return result['data']
  203. return None
  204. def desEncryptValue(appSecret: str, value: str) -> any:
  205. data = {}
  206. data['appSecret'] = appSecret
  207. data['value'] = value
  208. headers = {'Content-Type': 'application/json;charset=UTF-8'}
  209. response = requests.post(url="http://127.0.0.1:8052/yzy" + "/EncryptValue", headers=headers, json=data, timeout=60)
  210. if response.status_code == 200:
  211. result = response.json()
  212. print(result)
  213. if result['errcode'] == 0:
  214. return result['data']
  215. return None