# -*- coding: utf-8 -*- from fastapi import Request, Header from pydantic import BaseModel import random import string import hashlib import time import base64 import json from starlette.responses import JSONResponse, PlainTextResponse from config import settings from exceptions import TokenException from extensions import logger from .aes_util import AES_ECB from datetime import datetime from .redis_util import * ################################################################ ## ## ## 数广网关 ## ## ################################################################ # 随机字符串 def ranstr(num): salt = ''.join(random.sample(string.ascii_letters + string.digits, num)) return salt # 网关验证 def authentication(timestamp, token, nonce, uid, uinfo, ext, signature): sign_data_sha256 = calcRequestSign(timestamp, token, nonce, uid, uinfo, ext) return sign_data_sha256 == signature.upper() # 验证反馈值 def calcResponseSign(timestamp, token, nonce): sign_data = "{}{}{}{}".format(timestamp, token, nonce, timestamp) return hashlib.sha256(sign_data.encode("utf8")).hexdigest().upper() # 生成sign def calcRequestSign(timestamp, token, nonce, uid, uinfo, ext): sign_data = "{}{}{},{},".format(timestamp, token, nonce, uid) if len(uinfo) == 0: sign_data = sign_data + "," else: sign_data = sign_data + uinfo + "," if len(ext) == 0: sign_data = sign_data else: sign_data = sign_data + ext sign_data = sign_data + timestamp return hashlib.sha256(sign_data.encode("utf8")).hexdigest().upper() # 生成自定义token def token() -> str: d1 = datetime.today() weekday = d1.weekday()+1 if weekday == 7: weekday = 0 tokenStr = 'yss_bdc' + str(d1.month) + str(d1.day) + str(weekday) + str(d1.year) m = hashlib.md5() m.update(tokenStr.encode('ascii')) token_md5 = m.hexdigest() return token_md5[0:16] # 按网关要求生成数据 def sg_response( obj: any ): logger.info("sp_response: {}", obj) response_nonce = ranstr(20) response_timestamp = str(int(time.time())) response_sign = calcResponseSign(response_timestamp, settings.GSPT_PASS_TOKEN, response_nonce) response_headers = { "x-tif-signature": response_sign, "x-tif-timestamp": response_timestamp, "x-tif-nonce": response_nonce } json_str = json.dumps(obj, ensure_ascii=False) key = token() aes = AES_ECB(key) data = aes.encrypt(json_str) content = { "errcode": 0, "errmsg": "", "data": data } return JSONResponse(content = content, headers = response_headers) def pt_sg_response(obj: any ): json_str = json.dumps(obj, ensure_ascii=False) key = token() aes = AES_ECB(key) data = aes.encrypt(json_str) content = { "errcode": 0, "errmsg": "", "data": data } return JSONResponse(content = content) # 读取网关参数 async def sg_request_param(request: Request): data = await request.body() body = data.decode(encoding='utf-8') key = token() aes = AES_ECB(key) try : # print('aes key:', key) # print('aes data:', body) json_str = aes.decrypt(data) data = json.loads(json_str) logger.info('sg recv =============>>>>\n{}',json.dumps(data, ensure_ascii=False, sort_keys=True, indent=2)) except Exception: print('err body:', body) raise TokenException() return data ###################################################################################### class YstModel(BaseModel): data: str async def yst_request_param( request: Request, ystModel: YstModel ): logger.info('yst_request_param ==========>') #for n in request.headers: # logger.info('{}:{}', n, request.headers[n]) body = ystModel.data data = body.encode('utf8') logger.info("yst data: {}", body) key = token() print('token:', key) aes = AES_ECB(key) try : json_str = aes.decrypt(data) data = json.loads(json_str) logger.info('yst recv =============>>>>\n{}',json.dumps(data, ensure_ascii=False, sort_keys=True, indent=2)) except Exception: data = await request.body() body = data.decode(encoding='utf-8') print('body:', body) raise TokenException() return data def yst_pass_ext( request: Request, token: str = '', x_tif_signature: str = Header(None), x_tif_nonce: str = Header(None), x_tif_timestamp: str = Header(None), x_tif_uid: str = Header(None), x_tif_uinfo: str = Header(None), x_tif_ext: str = Header(None), ): ''' if token == 'fb5d86fa-1cf3-11ef-9b67-00ff257b5fc6': return { "account": "DGD440782197706290318", "account_type": "human", "cid": "440782197706290318", "corp": { "account": "50b50b6d", "address": "", "cid": "91440705354627820H", "ctype": "49", "level": "L2", "link_person_name": "李步尚", "mobile": "13426789046", "name": "江门市新会区尚软网络科技有限公司", "role_type": "parent_linked", "social_credit_code": "91440705354627820H", "uid": "359a7db835994a7ebd147f09c5b9ae5d" }, "ctype": "10", "level": "L2", "link_person_cid": "", "login_type": "SG-GASMHS", "mobile": "13426789046", "name": "李步尚", "origin": "SG", "sex": "1", "tokenid": "f36be617ec901c64d1234484eabd7431", "uid": "5b8f1a3e774d4ba8b7ccb2b2d51a38cd", "uversion": "1.0" } ''' redis_key = "yst_token_" + token redis_val = redis_get_json(redis_key) if redis_val is None: raise TokenException() else: return redis_val def yst_response(obj: any): logger.info("sp_response: {}", obj) response_nonce = ranstr(20) response_timestamp = str(int(time.time())) response_sign = calcResponseSign(response_timestamp, settings.YST_PASS_TOKEN, response_nonce) response_headers = { "x-tif-signature": response_sign, "x-tif-timestamp": response_timestamp, "x-tif-nonce": response_nonce } json_str = json.dumps(obj, ensure_ascii=False) key = token() aes = AES_ECB(key) data = aes.encrypt(json_str) content = { "errcode": 0, "errmsg": "", "data": data } return JSONResponse(content = content, headers = response_headers) def yst_image_response(data: any): logger.info("yst_image_response: {}", data) response_nonce = ranstr(20) response_timestamp = str(int(time.time())) response_sign = calcResponseSign(response_timestamp, settings.YST_PASS_TOKEN, response_nonce) response_headers = { "x-tif-signature": response_sign, "x-tif-timestamp": response_timestamp, "x-tif-nonce": response_nonce } content = { "errcode": 0, "errmsg": "", "data": data } return JSONResponse(content = content, headers = response_headers) def yst_user_info_response(data: str): logger.info("yst_user_info_response: {}", data) response_nonce = ranstr(20) response_timestamp = str(int(time.time())) response_sign = calcResponseSign(response_timestamp, settings.YST_PASS_TOKEN, response_nonce) response_headers = { "x-tif-signature": response_sign, "x-tif-timestamp": response_timestamp, "x-tif-nonce": response_nonce } return PlainTextResponse(content = data, headers = response_headers)