sg_auth.py 7.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285
  1. # -*- coding: utf-8 -*-
  2. from fastapi import Request, Header
  3. from pydantic import BaseModel
  4. import random
  5. import string
  6. import hashlib
  7. import time
  8. import base64
  9. import json
  10. from starlette.responses import JSONResponse, PlainTextResponse
  11. from config import settings
  12. from exceptions import TokenException
  13. from extensions import logger
  14. from .aes_util import AES_ECB
  15. from datetime import datetime
  16. from .redis_util import *
  17. ################################################################
  18. ##
  19. ##
  20. ## 数广网关
  21. ##
  22. ##
  23. ################################################################
  24. # 随机字符串
  25. def ranstr(num):
  26. salt = ''.join(random.sample(string.ascii_letters + string.digits, num))
  27. return salt
  28. # 网关验证
  29. def authentication(timestamp, token, nonce, uid, uinfo, ext, signature):
  30. sign_data_sha256 = calcRequestSign(timestamp, token, nonce, uid, uinfo, ext)
  31. return sign_data_sha256 == signature.upper()
  32. # 验证反馈值
  33. def calcResponseSign(timestamp, token, nonce):
  34. sign_data = "{}{}{}{}".format(timestamp, token, nonce, timestamp)
  35. return hashlib.sha256(sign_data.encode("utf8")).hexdigest().upper()
  36. # 生成sign
  37. def calcRequestSign(timestamp, token, nonce, uid, uinfo, ext):
  38. sign_data = "{}{}{},{},".format(timestamp, token, nonce, uid)
  39. if len(uinfo) == 0:
  40. sign_data = sign_data + ","
  41. else:
  42. sign_data = sign_data + uinfo + ","
  43. if len(ext) == 0:
  44. sign_data = sign_data
  45. else:
  46. sign_data = sign_data + ext
  47. sign_data = sign_data + timestamp
  48. return hashlib.sha256(sign_data.encode("utf8")).hexdigest().upper()
  49. # 生成自定义token
  50. def token() -> str:
  51. d1 = datetime.today()
  52. weekday = d1.weekday()+1
  53. if weekday == 7:
  54. weekday = 0
  55. tokenStr = 'yss_bdc' + str(d1.month) + str(d1.day) + str(weekday) + str(d1.year)
  56. m = hashlib.md5()
  57. m.update(tokenStr.encode('ascii'))
  58. token_md5 = m.hexdigest()
  59. return token_md5[0:16]
  60. # 按网关要求生成数据
  61. def sg_response(
  62. obj: any
  63. ):
  64. logger.info("sp_response: {}", obj)
  65. response_nonce = ranstr(20)
  66. response_timestamp = str(int(time.time()))
  67. response_sign = calcResponseSign(response_timestamp, settings.GSPT_PASS_TOKEN, response_nonce)
  68. response_headers = {
  69. "x-tif-signature": response_sign,
  70. "x-tif-timestamp": response_timestamp,
  71. "x-tif-nonce": response_nonce
  72. }
  73. json_str = json.dumps(obj, ensure_ascii=False)
  74. key = token()
  75. aes = AES_ECB(key)
  76. data = aes.encrypt(json_str)
  77. content = {
  78. "errcode": 0,
  79. "errmsg": "",
  80. "data": data
  81. }
  82. return JSONResponse(content = content, headers = response_headers)
  83. def pt_sg_response(obj: any
  84. ):
  85. json_str = json.dumps(obj, ensure_ascii=False)
  86. key = token()
  87. aes = AES_ECB(key)
  88. data = aes.encrypt(json_str)
  89. content = {
  90. "errcode": 0,
  91. "errmsg": "",
  92. "data": data
  93. }
  94. return JSONResponse(content = content)
  95. # 读取网关参数
  96. async def sg_request_param(request: Request):
  97. data = await request.body()
  98. body = data.decode(encoding='utf-8')
  99. key = token()
  100. aes = AES_ECB(key)
  101. try :
  102. # print('aes key:', key)
  103. # print('aes data:', body)
  104. json_str = aes.decrypt(data)
  105. data = json.loads(json_str)
  106. logger.info('sg recv =============>>>>\n{}',json.dumps(data, ensure_ascii=False, sort_keys=True, indent=2))
  107. except Exception:
  108. print('err body:', body)
  109. raise TokenException()
  110. return data
  111. ######################################################################################
  112. class YstModel(BaseModel):
  113. data: str
  114. async def yst_request_param(
  115. request: Request,
  116. ystModel: YstModel
  117. ):
  118. logger.info('yst_request_param ==========>')
  119. #for n in request.headers:
  120. # logger.info('{}:{}', n, request.headers[n])
  121. body = ystModel.data
  122. data = body.encode('utf8')
  123. logger.info("yst data: {}", body)
  124. key = token()
  125. print('token:', key)
  126. aes = AES_ECB(key)
  127. try :
  128. json_str = aes.decrypt(data)
  129. data = json.loads(json_str)
  130. logger.info('yst recv =============>>>>\n{}',json.dumps(data, ensure_ascii=False, sort_keys=True, indent=2))
  131. except Exception:
  132. data = await request.body()
  133. body = data.decode(encoding='utf-8')
  134. print('body:', body)
  135. raise TokenException()
  136. return data
  137. def yst_pass_ext(
  138. request: Request,
  139. token: str = '',
  140. x_tif_signature: str = Header(None),
  141. x_tif_nonce: str = Header(None),
  142. x_tif_timestamp: str = Header(None),
  143. x_tif_uid: str = Header(None),
  144. x_tif_uinfo: str = Header(None),
  145. x_tif_ext: str = Header(None),
  146. ):
  147. '''
  148. if token == 'fb5d86fa-1cf3-11ef-9b67-00ff257b5fc6':
  149. return {
  150. "account": "DGD440782197706290318",
  151. "account_type": "human",
  152. "cid": "440782197706290318",
  153. "corp": {
  154. "account": "50b50b6d",
  155. "address": "",
  156. "cid": "91440705354627820H",
  157. "ctype": "49",
  158. "level": "L2",
  159. "link_person_name": "李步尚",
  160. "mobile": "13426789046",
  161. "name": "江门市新会区尚软网络科技有限公司",
  162. "role_type": "parent_linked",
  163. "social_credit_code": "91440705354627820H",
  164. "uid": "359a7db835994a7ebd147f09c5b9ae5d"
  165. },
  166. "ctype": "10",
  167. "level": "L2",
  168. "link_person_cid": "",
  169. "login_type": "SG-GASMHS",
  170. "mobile": "13426789046",
  171. "name": "李步尚",
  172. "origin": "SG",
  173. "sex": "1",
  174. "tokenid": "f36be617ec901c64d1234484eabd7431",
  175. "uid": "5b8f1a3e774d4ba8b7ccb2b2d51a38cd",
  176. "uversion": "1.0"
  177. }
  178. '''
  179. logger.info("token: {}", token)
  180. redis_key = "yst_token_" + token
  181. redis_val = redis_get_json(redis_key)
  182. if redis_val is None:
  183. logger.info("redis_key {} not exists", redis_key)
  184. raise TokenException()
  185. else:
  186. return redis_val
  187. def yst_response(obj: any):
  188. logger.info("sp_response: {}", obj)
  189. response_nonce = ranstr(20)
  190. response_timestamp = str(int(time.time()))
  191. response_sign = calcResponseSign(response_timestamp, settings.YST_PASS_TOKEN, response_nonce)
  192. response_headers = {
  193. "x-tif-signature": response_sign,
  194. "x-tif-timestamp": response_timestamp,
  195. "x-tif-nonce": response_nonce
  196. }
  197. json_str = json.dumps(obj, ensure_ascii=False)
  198. key = token()
  199. aes = AES_ECB(key)
  200. data = aes.encrypt(json_str)
  201. content = {
  202. "errcode": 0,
  203. "errmsg": "",
  204. "data": data
  205. }
  206. return JSONResponse(content = content, headers = response_headers)
  207. def yst_image_response(data: any):
  208. logger.info("yst_image_response: {}", data)
  209. response_nonce = ranstr(20)
  210. response_timestamp = str(int(time.time()))
  211. response_sign = calcResponseSign(response_timestamp, settings.YST_PASS_TOKEN, response_nonce)
  212. response_headers = {
  213. "x-tif-signature": response_sign,
  214. "x-tif-timestamp": response_timestamp,
  215. "x-tif-nonce": response_nonce
  216. }
  217. content = {
  218. "errcode": 0,
  219. "errmsg": "",
  220. "data": data
  221. }
  222. return JSONResponse(content = content, headers = response_headers)
  223. def yst_user_info_response(data: str):
  224. logger.info("yst_user_info_response: {}", data)
  225. response_nonce = ranstr(20)
  226. response_timestamp = str(int(time.time()))
  227. response_sign = calcResponseSign(response_timestamp, settings.YST_PASS_TOKEN, response_nonce)
  228. response_headers = {
  229. "x-tif-signature": response_sign,
  230. "x-tif-timestamp": response_timestamp,
  231. "x-tif-nonce": response_nonce
  232. }
  233. return PlainTextResponse(content = data, headers = response_headers)