YzyApi.py 10 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323
  1. #!/usr/bin/python3
  2. # -*- coding: utf-8 -*-
  3. import random
  4. import string
  5. import hashlib
  6. import time
  7. import json
  8. from utils.redis_util import *
  9. import requests
  10. from urllib.parse import quote
  11. import base64
  12. from utils import *
  13. from exceptions import YzyException
  14. from extensions import logger
  15. from config import settings
  16. from sqlalchemy.orm import Session
  17. # 应用名称:茂名市智慧应急平台
  18. # https://open.weixin.qq.com/connect/Oauth2/authorize?appid=wld341060039&redirect_uri=https://www.baidu.com/#/&response_type=code&scope=snsapi_base&agentid=1004302&state=xxxxxx#wechat_redirect
  19. YZY_ACCESS_TOKEN_REDIS_KEY = "YZY_ACCESS_TOKEN_REDIS_KEY"
  20. '''
  21. YZY_API_ROOT = "http://19.15.0.128:8080"
  22. YZY_AGENTID = 1004302
  23. YZY_CORPID = "wld341060039"
  24. YZY_CORPSECRET = "5_8aOBBjioNbP7KDwjyBKmwnJ05-y1WbaJlt4irM1eA"
  25. YZY_PASSID = "yzy_demo"
  26. YZY_PASSTOKEN = "WjKat55cv6PrJtpCHld0trrHsv1mbCqL"
  27. '''
  28. def __get_access_token():
  29. url = "{}/ebus/yzyapi/cgi-bin/gettoken?corpid={}&corpsecret={}".format(settings.YZY_API_ROOT, settings.YZY_CORPID, settings.YZY_CORPSECRET)
  30. print('yzy url:', url)
  31. timestamp = str(int(time.time()))
  32. nonce = ranstr(20)
  33. signature = calcResponseSign(timestamp, settings.YZY_PASSTOKEN, nonce)
  34. headers = {
  35. 'Content-Type': 'application/json;charset=UTF-8',
  36. "x-tif-signature": signature,
  37. "x-tif-timestamp": timestamp,
  38. "x-tif-nonce": nonce,
  39. "x-tif-paasid": settings.YZY_PASSID
  40. }
  41. response = requests.get(url, headers=headers, timeout=15)
  42. print('yzy return:', response.text)
  43. if response.status_code == 200 :
  44. result = response.json()
  45. errcode = int(result['errcode'])
  46. if errcode == 0:
  47. return (result['access_token'], result['expires_in'])
  48. else:
  49. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  50. def get_cache_access_token():
  51. access_token = redis_get(YZY_ACCESS_TOKEN_REDIS_KEY)
  52. if access_token is None:
  53. access_token, expires_in = __get_access_token()
  54. redis_set_with_time(YZY_ACCESS_TOKEN_REDIS_KEY, access_token, expires_in - 600)
  55. return access_token
  56. def get_user_info(code: str):
  57. access_token = get_cache_access_token()
  58. url = "{}/ebus/yzyapi/cgi-bin/user/getuserinfo?access_token={}&code={}".format(settings.YZY_API_ROOT, access_token, code)
  59. return __post_url__(url, {})
  60. def generate_deskey(secret):
  61. # 使用SHA256哈希然后取前8字节
  62. return hashlib.sha256(secret.encode()).digest()[:8]
  63. def generate_signature(appid, secret, curtime):
  64. # 拼接字符串
  65. sign_str = appid + secret + str(curtime)
  66. # 计算SHA256哈希值
  67. sha256_hash = hashlib.sha256(sign_str.encode('utf-8')).hexdigest()
  68. # 转换为大写
  69. signature = sha256_hash.upper()
  70. return signature
  71. # 2.开通用户工作台可见及第三方应用获取用户基本信息接口说明V1.3.pdf
  72. def getuserbycode(code: str):
  73. url = f"{settings.YZY_API_ROOT}/ebus/applicationsyn/getuserbycode"
  74. curtime = get_datetime_str(datetime.now())
  75. signature = generate_signature(settings.YZY_AGENTID, settings.YZY_CORPSECRET, curtime)
  76. data = {
  77. "appid": settings.YZY_AGENTID,
  78. "curtime": curtime,
  79. "code": code,
  80. "signature": signature
  81. }
  82. return __post_url__(url, data)
  83. # 管理中心已授权用户信息批量更新接口
  84. def getauthorizedusersbyupdatetime(starttime: str, endtime: str, pageno: int = 1, pagesize: int = 500):
  85. url = f"{settings.YZY_API_ROOT}/ebus/applicationsyn/getauthorizedusersbyupdatetime"
  86. curtime = get_datetime_str(datetime.now())
  87. signature = generate_signature(settings.YZY_AGENTID, settings.YZY_CORPSECRET, curtime)
  88. data = {
  89. "appid": settings.YZY_AGENTID,
  90. "curtime": curtime,
  91. "starttime": starttime,
  92. "endtime": endtime,
  93. "pageno": pageno,
  94. "pagesize": pagesize,
  95. "signature": signature
  96. }
  97. return __post_url__(url, data)
  98. # 4.3.3 查询应用开通的组织机构范围更新信息
  99. def getappunitsbyupdatetime(starttime: str, endtime: str, pageno: int = 1, pagesize: int = 500):
  100. url = f"{settings.YZY_API_ROOT}/ebus/applicationsyn/getappunitsbyupdatetime"
  101. curtime = get_datetime_str(datetime.now())
  102. signature = generate_signature(settings.YZY_AGENTID, settings.YZY_CORPSECRET, curtime)
  103. data = {
  104. "appid": settings.YZY_AGENTID,
  105. "curtime": curtime,
  106. "starttime": starttime,
  107. "endtime": endtime,
  108. "pageno": pageno,
  109. "pagesize": pagesize,
  110. "signature": signature
  111. }
  112. return __post_url__(url, data)
  113. # 4.3.4 查询应用开通的组织机构的下级机构更新信息
  114. def getunitsbyuintidandupdatetime(unitid: str, starttime: str, endtime: str, pageno: int = 1, pagesize: int = 500):
  115. url = f"{settings.YZY_API_ROOT}/ebus/applicationsyn/getunitsbyuintidandupdatetime"
  116. curtime = get_datetime_str(datetime.now())
  117. signature = generate_signature(settings.YZY_AGENTID, settings.YZY_CORPSECRET, curtime)
  118. data = {
  119. "appid": settings.YZY_AGENTID,
  120. "curtime": curtime,
  121. "starttime": starttime,
  122. "endtime": endtime,
  123. "pageno": pageno,
  124. "pagesize": pagesize,
  125. "unitid": unitid,
  126. "signature": signature
  127. }
  128. return __post_url__(url, data)
  129. # 4.3.5 查询业务应用开通的用户更新信息
  130. def getappusersbyupdatetime(starttime: str, endtime: str, pageno: int = 1, pagesize: int = 500):
  131. url = f"{settings.YZY_API_ROOT}/ebus/applicationsyn/getappusersbyupdatetime"
  132. curtime = get_datetime_str(datetime.now())
  133. signature = generate_signature(settings.YZY_AGENTID, settings.YZY_CORPSECRET, curtime)
  134. data = {
  135. "appid": settings.YZY_AGENTID,
  136. "curtime": curtime,
  137. "starttime": starttime,
  138. "endtime": endtime,
  139. "pageno": pageno,
  140. "pagesize": pagesize,
  141. "signature": signature
  142. }
  143. return __post_url__(url, data)
  144. def send_text_message(users, content: str):
  145. access_token = get_cache_access_token()
  146. url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(settings.YZY_API_ROOT, access_token)
  147. data = {
  148. "touser": "|".join(users),
  149. "msgtype" : "text",
  150. "agentid" : settings.YZY_AGENTID,
  151. "text": {
  152. "content": content
  153. }
  154. }
  155. return __post_url__(url, data)
  156. def send_textcard_message(users, title: str, description: str, detail_url):
  157. access_token = get_cache_access_token()
  158. url = "{}/ebus/yzyapi/cgi-bin/message/send?access_token={}".format(settings.YZY_API_ROOT, access_token)
  159. touser = ""
  160. if isinstance(users, list) == True:
  161. touser = "|".join(users)
  162. else:
  163. touser = str(users)
  164. data = {
  165. "touser": touser,
  166. "msgtype" : "textcard",
  167. "agentid" : settings.YZY_AGENTID,
  168. "textcard": {
  169. "title": title,
  170. "description": description,
  171. "url": detail_url
  172. }
  173. }
  174. return __post_url__(url, data)
  175. def __post_url__(url, data):
  176. print('yzy url:', url)
  177. print('yzy data:', data)
  178. json_str = json.dumps(data, ensure_ascii=False)
  179. data = json_str.encode('utf-8')
  180. timestamp = str(int(time.time()))
  181. nonce = ranstr(20)
  182. signature = calcResponseSign(timestamp, settings.YZY_PASSTOKEN, nonce)
  183. headers = {
  184. 'Content-Type': 'application/json;charset=UTF-8',
  185. "x-tif-signature": signature,
  186. "x-tif-timestamp": timestamp,
  187. "x-tif-nonce": nonce,
  188. "x-tif-paasid": settings.YZY_PASSID
  189. }
  190. response = requests.post(url, data=data,headers=headers, timeout=5)
  191. print('yzy return:', response.text)
  192. if response.status_code == 200 :
  193. result = response.json()
  194. return result
  195. '''
  196. errcode = int(result['errcode'])
  197. if errcode == 0:
  198. return True
  199. else:
  200. raise YzyException(errcode=errcode, errmsg=result['errmsg'])
  201. '''
  202. def ranstr(num):
  203. salt = ''.join(random.sample(
  204. string.ascii_letters + string.digits, num))
  205. return salt
  206. #
  207. #
  208. # 生成校验码
  209. #
  210. #
  211. def authentication(timestamp, token, nonce, uid, uinfo, ext, signature):
  212. sign_data_sha256 = calcRequestSign(
  213. timestamp, token, nonce, uid, uinfo, ext)
  214. return sign_data_sha256 == signature.upper()
  215. #
  216. #
  217. # 计算校验码
  218. #
  219. #
  220. def calcResponseSign(timestamp, token, nonce):
  221. sign_data = "{}{}{}{}".format(timestamp, token, nonce, timestamp)
  222. return hashlib.sha256(
  223. sign_data.encode("utf8")
  224. ).hexdigest().upper()
  225. #
  226. #
  227. # 计算校验码
  228. #
  229. #
  230. def calcRequestSign(timestamp, token, nonce, uid, uinfo, ext):
  231. sign_data = "{}{}{},{},".format(
  232. timestamp, token, nonce, uid)
  233. if len(uinfo) == 0:
  234. sign_data = sign_data + ","
  235. else:
  236. sign_data = sign_data + uinfo + ","
  237. if len(ext) == 0:
  238. sign_data = sign_data
  239. else:
  240. sign_data = sign_data + ext
  241. sign_data = sign_data + timestamp
  242. return hashlib.sha256(
  243. sign_data.encode("utf8")
  244. ).hexdigest().upper()
  245. def format_redirect_url(url: str) -> str:
  246. return f"{settings.YZY_WEB_ROOT}/yjxp/#/?redirect_url={url}"
  247. def add_to_msg_queue(db: Session, data: dict) -> None:
  248. new_msg = YzyMsgQueue(**data, sent_status = 0, create_time = datetime.now())
  249. db.add(new_msg)
  250. db.commit()
  251. # 辅助类
  252. # 调用JAVA编写的密评接口
  253. def desDecryptValue(appSecret: str, value: str) -> any:
  254. data = {}
  255. data['appSecret'] = appSecret
  256. data['value'] = value
  257. headers = {'Content-Type': 'application/json;charset=UTF-8'}
  258. response = requests.post(url="http://127.0.0.1:8052/yzy" + "/DecryptValue", headers=headers, json=data, timeout=60)
  259. if response.status_code == 200:
  260. result = response.json()
  261. print(result)
  262. if result['errcode'] == 0:
  263. return result['data']
  264. return None
  265. def desEncryptValue(appSecret: str, value: str) -> any:
  266. data = {}
  267. data['appSecret'] = appSecret
  268. data['value'] = value
  269. headers = {'Content-Type': 'application/json;charset=UTF-8'}
  270. response = requests.post(url="http://127.0.0.1:8052/yzy" + "/EncryptValue", headers=headers, json=data, timeout=60)
  271. if response.status_code == 200:
  272. result = response.json()
  273. print(result)
  274. if result['errcode'] == 0:
  275. return result['data']
  276. return None