# -*- coding: utf-8 -*- from fastapi import Request, Header from pydantic import BaseModel import random import string import hashlib import time import base64 import json from starlette.responses import JSONResponse from config import settings from exceptions import TokenException from extensions import logger from .aes_util import AES_ECB from datetime import datetime from .redis_util import * ################################################################ ## ## ## 数广网关 ## ## ################################################################ # 随机字符串 def ranstr(num): salt = ''.join(random.sample(string.ascii_letters + string.digits, num)) return salt # 网关验证 def authentication(timestamp, token, nonce, uid, uinfo, ext, signature): sign_data_sha256 = calcRequestSign(timestamp, token, nonce, uid, uinfo, ext) return sign_data_sha256 == signature.upper() # 验证反馈值 def calcResponseSign(timestamp, token, nonce): sign_data = "{}{}{}{}".format(timestamp, token, nonce, timestamp) return hashlib.sha256(sign_data.encode("utf8")).hexdigest().upper() # 生成sign def calcRequestSign(timestamp, token, nonce, uid, uinfo, ext): sign_data = "{}{}{},{},".format(timestamp, token, nonce, uid) if len(uinfo) == 0: sign_data = sign_data + "," else: sign_data = sign_data + uinfo + "," if len(ext) == 0: sign_data = sign_data else: sign_data = sign_data + ext sign_data = sign_data + timestamp return hashlib.sha256(sign_data.encode("utf8")).hexdigest().upper() # 生成自定义token def token() -> str: d1 = datetime.today() weekday = d1.weekday()+1 if weekday == 7: weekday = 0 tokenStr = 'yss_bdc' + str(d1.month) + str(d1.day) + str(weekday) + str(d1.year) m = hashlib.md5() m.update(tokenStr.encode('ascii')) token_md5 = m.hexdigest() return token_md5[0:16] # 按网关要求生成数据 def sg_response( obj: any ): logger.info("sp_response: {}", obj) response_nonce = ranstr(20) response_timestamp = str(int(time.time())) response_sign = calcResponseSign(response_timestamp, settings.GSPT_PASS_TOKEN, response_nonce) response_headers = { "x-tif-signature": response_sign, "x-tif-timestamp": response_timestamp, "x-tif-nonce": response_nonce } json_str = json.dumps(obj, ensure_ascii=False) key = token() aes = AES_ECB(key) data = aes.encrypt(json_str) content = { "errcode": 0, "errmsg": "", "data": data } return JSONResponse(content = content, headers = response_headers) def pt_sg_response(obj: any ): json_str = json.dumps(obj, ensure_ascii=False) key = token() aes = AES_ECB(key) data = aes.encrypt(json_str) content = { "errcode": 0, "errmsg": "", "data": data } return JSONResponse(content = content) # 读取网关参数 async def sg_request_param(request: Request): data = await request.body() body = data.decode(encoding='utf-8') key = token() aes = AES_ECB(key) try : # print('aes key:', key) # print('aes data:', body) json_str = aes.decrypt(data) data = json.loads(json_str) logger.info('sg recv =============>>>>\n{}',json.dumps(data, ensure_ascii=False, sort_keys=True, indent=2)) except Exception: print('err body:', body) raise TokenException() return data def sg_pass_sfzh(token: str = ''): if token == '': raise TokenException() elif token == 'fb5d86fa-1cf3-11ef-9b67-00ff257b5fc6': return '341181198809150011' from services.tifserv import sgtoken_service redis_key = "sg_token_" + token redis_val = redis_get_json(redis_key) if redis_val is None: redis_val = sgtoken_service.get_token_info(token) logger.info('========================>>>>>>>>>>>>>>>>>>>>>>> userinfo: {}', redis_val) redis_set_json(redis_key, redis_val) return redis_val['cid_num'] def sg_pass_extinfo(token: str): if token == 'fb5d86fa-1cf3-11ef-9b67-00ff257b5fc6': return { "name": '李步尚', "sfzh": '341181198809150011', "mobile": '13426789046', "appid": "wx82d43fee89cdc7df" } redis_key = "sg_token_" + token redis_val = redis_get_json(redis_key) if redis_val is None: sg_pass_sfzh(token) redis_val = redis_get_json(redis_key) if 'phone' not in redis_val: redis_val['phone'] = '13800138000' return { "name": redis_val['cid_name'], "sfzh": redis_val['cid_num'], "mobile": redis_val['phone'], "appid": "wx82d43fee89cdc7df" } ###################################################################################### class YstModel(BaseModel): data: str async def yst_request_param( request: Request, ystModel: YstModel ): logger.info('yst_request_param ==========>') #for n in request.headers: # logger.info('{}:{}', n, request.headers[n]) body = ystModel.data data = body.encode('utf8') logger.info("yst data: {}", body) key = token() print('token:', key) aes = AES_ECB(key) try : json_str = aes.decrypt(data) data = json.loads(json_str) logger.info('yst recv =============>>>>\n{}',json.dumps(data, ensure_ascii=False, sort_keys=True, indent=2)) except Exception: data = await request.body() body = data.decode(encoding='utf-8') print('body:', body) raise TokenException() return data def yst_pass_ext( request: Request, x_tif_signature: str = Header(None), x_tif_nonce: str = Header(None), x_tif_timestamp: str = Header(None), x_tif_uid: str = Header(None), x_tif_uinfo: str = Header(None), x_tif_ext: str = Header(None), ): return { "account": "DGD440782197706290318", "account_type": "human", "cid": "440782197706290318", "corp": { "account": "50b50b6d", "address": "", "cid": "91440705354627820H", "ctype": "49", "level": "L2", "link_person_name": "李步尚", "mobile": "13426789046", "name": "江门市新会区尚软网络科技有限公司", "role_type": "parent_linked", "social_credit_code": "91440705354627820H", "uid": "359a7db835994a7ebd147f09c5b9ae5d" }, "ctype": "10", "level": "L2", "link_person_cid": "", "login_type": "SG-GASMHS", "mobile": "13426789046", "name": "李步尚", "origin": "SG", "sex": "1", "tokenid": "f36be617ec901c64d1234484eabd7431", "uid": "5b8f1a3e774d4ba8b7ccb2b2d51a38cd", "uversion": "1.0" } pass_token = settings.YST_PASS_TOKEN token_exception = TokenException() if x_tif_signature is None or x_tif_nonce is None or x_tif_timestamp is None or x_tif_uid is None or x_tif_uinfo is None or x_tif_ext is None: logger.error('========================>>>>>>>>>>>>>>>>>>>>>>> yst authentication err: 1') raise token_exception if authentication(x_tif_timestamp, pass_token, x_tif_nonce, x_tif_uid, x_tif_uinfo, x_tif_ext, x_tif_signature) == False: logger.error('========================>>>>>>>>>>>>>>>>>>>>>>> yst authentication err: 2') raise token_exception json_str = base64.b64decode(x_tif_ext) json_str = json_str.decode(encoding='utf-8') x_tif_ext = json.loads(json_str) logger.info('========================>>>>>>>>>>>>>>>>>>>>>>> yst authentication ok: {}', x_tif_ext) logger.info('========================>>>>>>>>>>>>>>>>>>>>>>> yst account_type: {}', x_tif_ext['account_type']) return x_tif_ext def yst_response(obj: any): logger.info("sp_response: {}", obj) response_nonce = ranstr(20) response_timestamp = str(int(time.time())) response_sign = calcResponseSign(response_timestamp, settings.YST_PASS_TOKEN, response_nonce) response_headers = { "x-tif-signature": response_sign, "x-tif-timestamp": response_timestamp, "x-tif-nonce": response_nonce } json_str = json.dumps(obj, ensure_ascii=False) key = token() aes = AES_ECB(key) data = aes.encrypt(json_str) content = { "errcode": 0, "errmsg": "", "data": data } return JSONResponse(content = content, headers = response_headers) def yst_image_response(data: any): logger.info("yst_image_response: {}", data) response_nonce = ranstr(20) response_timestamp = str(int(time.time())) response_sign = calcResponseSign(response_timestamp, settings.YST_PASS_TOKEN, response_nonce) response_headers = { "x-tif-signature": response_sign, "x-tif-timestamp": response_timestamp, "x-tif-nonce": response_nonce } content = { "errcode": 0, "errmsg": "", "data": data } return JSONResponse(content = content, headers = response_headers)