YzyApi.py 11 KB


  1. #!/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3. import random
  4. import string
  5. import hashlib
  6. import time
  7. import json
  8. from utils.redis_util import *
  9. import requests
  10. from urllib.parse import quote
  11. import base64
  12. from utils import *
  13. from exceptions import YzyException
  14. from extensions import logger
  15. from config import settings
  16. from sqlalchemy.orm import Session
  17. # 应用名称:茂名市智慧应急平台
  18. # https://open.weixin.qq.com/connect/Oauth2/authorize?appid=wld341060039&redirect_uri=https://www.baidu.com/#/&response_type=code&scope=snsapi_base&agentid=1004302&state=xxxxxx#wechat_redirect
  19. YZY_ACCESS_TOKEN_REDIS_KEY = "YZY_ACCESS_TOKEN_REDIS_KEY"
  20. YZY_JSAPI_TICKET_REDIS_KEY = "YZY_JSAPI_TICKET_REDIS_KEY"
  21. '''
  22. YZY_API_ROOT = "http://19.15.0.128:8080"
  23. YZY_AGENTID = 1004302
  24. YZY_CORPID = "wld341060039"
  25. YZY_CORPSECRET = "5_8aOBBjioNbP7KDwjyBKmwnJ05-y1WbaJlt4irM1eA"
  26. YZY_PASSID = "yzy_demo"
  27. YZY_PASSTOKEN = "WjKat55cv6PrJtpCHld0trrHsv1mbCqL"
  28. '''
  29. def __get_access_token():
  30. url = "{}/ebus/yzyapi/cgi-bin/gettoken?corpid={}&corpsecret={}".format(settings.YZY_API_ROOT, settings.YZY_CORPID, settings.YZY_CORPSECRET)
  31. print('yzy url:', url)
  32. timestamp = str(int(time.time()))
  33. nonce = ranstr(20)
  34. signature = calcResponseSign(timestamp, settings.YZY_PASSTOKEN, nonce)
  35. headers = {
  36. 'Content-Type': 'application/json;charset=UTF-8',
  37. "x-tif-signature": signature,
  38. "x-tif-timestamp": timestamp,
  39. "x-tif-nonce": nonce,
  40. "x-tif-paasid": settings.YZY_PASSID
  41. }
  42. response = requests.get(url, headers=headers, timeout=15)
  43. print('yzy return:', response.text)
  44. if response.status_code == 200 :
  45. result = response.json()
  46. errcode = int(result['errcode'])
  47. if errcode == 0:
  48. return (result['access_token'], result['expires_in'])
  49. else:
  50. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  51. def get_cache_access_token():
  52. access_token = redis_get(YZY_ACCESS_TOKEN_REDIS_KEY)
  53. if access_token is None:
  54. access_token, expires_in = __get_access_token()
  55. redis_set_with_time(YZY_ACCESS_TOKEN_REDIS_KEY, access_token, expires_in - 600)
  56. return access_token
  57. def __get_jsapi_ticket():
  58. access_token = get_cache_access_token()
  59. url = "{}/ebus/yzyapi/cgi-bin/get_jsapi_ticket?access_token={}".format(settings.YZY_API_ROOT, access_token)
  60. print('yzy url:', url)
  61. timestamp = str(int(time.time()))
  62. nonce = ranstr(20)
  63. signature = calcResponseSign(timestamp, settings.YZY_PASSTOKEN, nonce)
  64. headers = {
  65. 'Content-Type': 'application/json;charset=UTF-8',
  66. "x-tif-signature": signature,
  67. "x-tif-timestamp": timestamp,
  68. "x-tif-nonce": nonce,
  69. "x-tif-paasid": settings.YZY_PASSID
  70. }
  71. response = requests.get(url, headers=headers, timeout=15)
  72. print('yzy return:', response.text)
  73. if response.status_code == 200 :
  74. result = response.json()
  75. errcode = int(result['errcode'])
  76. if errcode == 0:
  77. return (result['ticket'], result['expires_in'])
  78. else:
  79. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  80. def get_cache_jsapi_ticket():
  81. ticket = redis_get(YZY_JSAPI_TICKET_REDIS_KEY)
  82. if ticket is None:
  83. ticket, expires_in = __get_jsapi_ticket()
  84. redis_set_with_time(YZY_JSAPI_TICKET_REDIS_KEY, ticket, expires_in - 600)
  85. return ticket
  86. def get_user_info(code: str):
  87. access_token = get_cache_access_token()
  88. url = "{}/ebus/yzyapi/cgi-bin/user/getuserinfo?access_token={}&code={}".format(settings.YZY_API_ROOT, access_token, code)
  89. return __post_url__(url, {})
  90. def generate_deskey(secret):
  91. # 使用SHA256哈希然后取前8字节
  92. return hashlib.sha256(secret.encode()).digest()[:8]
  93. def generate_signature(appid, secret, curtime):
  94. # 拼接字符串
  95. sign_str = appid + secret + str(curtime)
  96. # 计算SHA256哈希值
  97. sha256_hash = hashlib.sha256(sign_str.encode('utf-8')).hexdigest()
  98. # 转换为大写
  99. signature = sha256_hash.upper()
  100. return signature
  101. # 2.开通用户工作台可见及第三方应用获取用户基本信息接口说明V1.3.pdf
  102. def getuserbycode(code: str):
  103. url = f"{settings.YZY_API_ROOT}/ebus/applicationsyn/getuserbycode"
  104. curtime = get_datetime_str(datetime.now())
  105. signature = generate_signature(settings.YZY_AGENTID, settings.YZY_CORPSECRET, curtime)
  106. data = {
  107. "appid": settings.YZY_AGENTID,
  108. "curtime": curtime,
  109. "code": code,
  110. "signature": signature
  111. }
  112. return __post_url__(url, data)
  113. # 管理中心已授权用户信息批量更新接口
  114. def getauthorizedusersbyupdatetime(starttime: str, endtime: str, pageno: int = 1, pagesize: int = 500):
  115. url = f"{settings.YZY_API_ROOT}/ebus/applicationsyn/getauthorizedusersbyupdatetime"
  116. curtime = get_datetime_str(datetime.now())
  117. signature = generate_signature(settings.YZY_AGENTID, settings.YZY_CORPSECRET, curtime)
  118. data = {
  119. "appid": settings.YZY_AGENTID,
  120. "curtime": curtime,
  121. "starttime": starttime,
  122. "endtime": endtime,
  123. "pageno": pageno,
  124. "pagesize": pagesize,
  125. "signature": signature
  126. }
  127. return __post_url__(url, data)
  128. # 4.3.3 查询应用开通的组织机构范围更新信息
  129. def getappunitsbyupdatetime(starttime: str, endtime: str, pageno: int = 1, pagesize: int = 500):
  130. url = f"{settings.YZY_API_ROOT}/ebus/applicationsyn/getappunitsbyupdatetime"
  131. curtime = get_datetime_str(datetime.now())
  132. signature = generate_signature(settings.YZY_AGENTID, settings.YZY_CORPSECRET, curtime)
  133. data = {
  134. "appid": settings.YZY_AGENTID,
  135. "curtime": curtime,
  136. "starttime": starttime,
  137. "endtime": endtime,
  138. "pageno": pageno,
  139. "pagesize": pagesize,
  140. "signature": signature
  141. }
  142. return __post_url__(url, data)
  143. # 4.3.4 查询应用开通的组织机构的下级机构更新信息
  144. def getunitsbyuintidandupdatetime(unitid: str, starttime: str, endtime: str, pageno: int = 1, pagesize: int = 500):
  145. url = f"{settings.YZY_API_ROOT}/ebus/applicationsyn/getunitsbyuintidandupdatetime"
  146. curtime = get_datetime_str(datetime.now())
  147. signature = generate_signature(settings.YZY_AGENTID, settings.YZY_CORPSECRET, curtime)
  148. data = {
  149. "appid": settings.YZY_AGENTID,
  150. "curtime": curtime,
  151. "starttime": starttime,
  152. "endtime": endtime,
  153. "pageno": pageno,
  154. "pagesize": pagesize,
  155. "unitid": unitid,
  156. "signature": signature
  157. }
  158. return __post_url__(url, data)
  159. # 4.3.5 查询业务应用开通的用户更新信息
  160. def getappusersbyupdatetime(starttime: str, endtime: str, pageno: int = 1, pagesize: int = 500):
  161. url = f"{settings.YZY_API_ROOT}/ebus/applicationsyn/getappusersbyupdatetime"
  162. curtime = get_datetime_str(datetime.now())
  163. signature = generate_signature(settings.YZY_AGENTID, settings.YZY_CORPSECRET, curtime)
  164. data = {
  165. "appid": settings.YZY_AGENTID,
  166. "curtime": curtime,
  167. "starttime": starttime,
  168. "endtime": endtime,
  169. "pageno": pageno,
  170. "pagesize": pagesize,
  171. "signature": signature
  172. }
  173. return __post_url__(url, data)
  174. def send_text_message(users, content: str):
  175. access_token = get_cache_access_token()
  176. url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(settings.YZY_API_ROOT, access_token)
  177. data = {
  178. "touser": "|".join(users),
  179. "msgtype" : "text",
  180. "agentid" : settings.YZY_AGENTID,
  181. "text": {
  182. "content": content
  183. }
  184. }
  185. return __post_url__(url, data)
  186. def send_textcard_message(users, title: str, description: str, detail_url):
  187. access_token = get_cache_access_token()
  188. url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(settings.YZY_API_ROOT, access_token)
  189. touser = ""
  190. if isinstance(users, list) == True:
  191. touser = "|".join(users)
  192. else:
  193. touser = str(users)
  194. data = {
  195. "touser": touser,
  196. "msgtype" : "textcard",
  197. "agentid" : settings.YZY_AGENTID,
  198. "textcard": {
  199. "title": title,
  200. "description": description,
  201. "url": detail_url
  202. }
  203. }
  204. return __post_url__(url, data)
  205. def __post_url__(url, data):
  206. print('yzy url:', url)
  207. print('yzy data:', data)
  208. json_str = json.dumps(data, ensure_ascii=False)
  209. data = json_str.encode('utf-8')
  210. timestamp = str(int(time.time()))
  211. nonce = ranstr(20)
  212. signature = calcResponseSign(timestamp, settings.YZY_PASSTOKEN, nonce)
  213. headers = {
  214. 'Content-Type': 'application/json;charset=UTF-8',
  215. "x-tif-signature": signature,
  216. "x-tif-timestamp": timestamp,
  217. "x-tif-nonce": nonce,
  218. "x-tif-paasid": settings.YZY_PASSID
  219. }
  220. response = requests.post(url, data=data,headers=headers, timeout=5)
  221. print('yzy return:', response.text)
  222. if response.status_code == 200 :
  223. result = response.json()
  224. return result
  225. '''
  226. errcode = int(result['errcode'])
  227. if errcode == 0:
  228. return True
  229. else:
  230. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  231. '''
  232. def ranstr(num):
  233. salt = ''.join(random.sample(
  234. string.ascii_letters + string.digits, num))
  235. return salt
  236. #
  237. #
  238. # 生成校验码
  239. #
  240. #
  241. def authentication(timestamp, token, nonce, uid, uinfo, ext, signature):
  242. sign_data_sha256 = calcRequestSign(
  243. timestamp, token, nonce, uid, uinfo, ext)
  244. return sign_data_sha256 == signature.upper()
  245. #
  246. #
  247. # 计算校验码
  248. #
  249. #
  250. def calcResponseSign(timestamp, token, nonce):
  251. sign_data = "{}{}{}{}".format(timestamp, token, nonce, timestamp)
  252. return hashlib.sha256(
  253. sign_data.encode("utf8")
  254. ).hexdigest().upper()
  255. #
  256. #
  257. # 计算校验码
  258. #
  259. #
  260. def calcRequestSign(timestamp, token, nonce, uid, uinfo, ext):
  261. sign_data = "{}{}{},{},".format(
  262. timestamp, token, nonce, uid)
  263. if len(uinfo) == 0:
  264. sign_data = sign_data + ","
  265. else:
  266. sign_data = sign_data + uinfo + ","
  267. if len(ext) == 0:
  268. sign_data = sign_data
  269. else:
  270. sign_data = sign_data + ext
  271. sign_data = sign_data + timestamp
  272. return hashlib.sha256(
  273. sign_data.encode("utf8")
  274. ).hexdigest().upper()
  275. def format_redirect_url(url: str) -> str:
  276. return f"{settings.YZY_WEB_ROOT}/yjxp/#/?redirect_url={url}"
  277. def add_to_msg_queue(db: Session, data: dict) -> None:
  278. new_msg = YzyMsgQueue(**data, sent_status = 0, create_time = datetime.now())
  279. db.add(new_msg)
  280. db.commit()
  281. # 辅助类
  282. # 调用JAVA编写的密评接口
  283. def desDecryptValue(appSecret: str, value: str) -> any:
  284. data = {}
  285. data['appSecret'] = appSecret
  286. data['value'] = value
  287. headers = {'Content-Type': 'application/json;charset=UTF-8'}
  288. response = requests.post(url="http://127.0.0.1:8052/yzy" + "/DecryptValue", headers=headers, json=data, timeout=60)
  289. if response.status_code == 200:
  290. result = response.json()
  291. print(result)
  292. if result['errcode'] == 0:
  293. return result['data']
  294. return None
  295. def desEncryptValue(appSecret: str, value: str) -> any:
  296. data = {}
  297. data['appSecret'] = appSecret
  298. data['value'] = value
  299. headers = {'Content-Type': 'application/json;charset=UTF-8'}
  300. response = requests.post(url="http://127.0.0.1:8052/yzy" + "/EncryptValue", headers=headers, json=data, timeout=60)
  301. if response.status_code == 200:
  302. result = response.json()
  303. print(result)
  304. if result['errcode'] == 0:
  305. return result['data']
  306. return None