123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282 |
- # -*- coding: utf-8 -*-
- from fastapi import Request, Header
- from pydantic import BaseModel
- import random
- import string
- import hashlib
- import time
- import base64
- import json
- from starlette.responses import JSONResponse, PlainTextResponse
- from config import settings
- from exceptions import TokenException
- from extensions import logger
- from .aes_util import AES_ECB
- from datetime import datetime
- from .redis_util import *
-
- ################################################################
- ##
- ##
- ## 数广网关
- ##
- ##
- ################################################################
- # 随机字符串
- def ranstr(num):
- salt = ''.join(random.sample(string.ascii_letters + string.digits, num))
- return salt
- # 网关验证
- def authentication(timestamp, token, nonce, uid, uinfo, ext, signature):
- sign_data_sha256 = calcRequestSign(timestamp, token, nonce, uid, uinfo, ext)
- return sign_data_sha256 == signature.upper()
- # 验证反馈值
- def calcResponseSign(timestamp, token, nonce):
- sign_data = "{}{}{}{}".format(timestamp, token, nonce, timestamp)
- return hashlib.sha256(sign_data.encode("utf8")).hexdigest().upper()
- # 生成sign
- def calcRequestSign(timestamp, token, nonce, uid, uinfo, ext):
- sign_data = "{}{}{},{},".format(timestamp, token, nonce, uid)
- if len(uinfo) == 0:
- sign_data = sign_data + ","
- else:
- sign_data = sign_data + uinfo + ","
- if len(ext) == 0:
- sign_data = sign_data
- else:
- sign_data = sign_data + ext
- sign_data = sign_data + timestamp
- return hashlib.sha256(sign_data.encode("utf8")).hexdigest().upper()
- # 生成自定义token
- def token() -> str:
- d1 = datetime.today()
- weekday = d1.weekday()+1
- if weekday == 7:
- weekday = 0
- tokenStr = 'yss_bdc' + str(d1.month) + str(d1.day) + str(weekday) + str(d1.year)
- m = hashlib.md5()
- m.update(tokenStr.encode('ascii'))
- token_md5 = m.hexdigest()
- return token_md5[0:16]
- # 按网关要求生成数据
- def sg_response(
- obj: any
- ):
- logger.info("sp_response: {}", obj)
- response_nonce = ranstr(20)
- response_timestamp = str(int(time.time()))
- response_sign = calcResponseSign(response_timestamp, settings.GSPT_PASS_TOKEN, response_nonce)
-
- response_headers = {
- "x-tif-signature": response_sign,
- "x-tif-timestamp": response_timestamp,
- "x-tif-nonce": response_nonce
- }
- json_str = json.dumps(obj, ensure_ascii=False)
- key = token()
- aes = AES_ECB(key)
- data = aes.encrypt(json_str)
-
- content = {
- "errcode": 0,
- "errmsg": "",
- "data": data
- }
- return JSONResponse(content = content, headers = response_headers)
- def pt_sg_response(obj: any
- ):
- json_str = json.dumps(obj, ensure_ascii=False)
- key = token()
- aes = AES_ECB(key)
- data = aes.encrypt(json_str)
- content = {
- "errcode": 0,
- "errmsg": "",
- "data": data
- }
- return JSONResponse(content = content)
- # 读取网关参数
- async def sg_request_param(request: Request):
- data = await request.body()
- body = data.decode(encoding='utf-8')
-
- key = token()
- aes = AES_ECB(key)
-
- try :
- # print('aes key:', key)
- # print('aes data:', body)
- json_str = aes.decrypt(data)
- data = json.loads(json_str)
- logger.info('sg recv =============>>>>\n{}',json.dumps(data, ensure_ascii=False, sort_keys=True, indent=2))
-
- except Exception:
- print('err body:', body)
- raise TokenException()
- return data
- ######################################################################################
- class YstModel(BaseModel):
- data: str
- async def yst_request_param(
- request: Request,
- ystModel: YstModel
- ):
- logger.info('yst_request_param ==========>')
- #for n in request.headers:
- # logger.info('{}:{}', n, request.headers[n])
-
- body = ystModel.data
- data = body.encode('utf8')
- logger.info("yst data: {}", body)
-
- key = token()
- print('token:', key)
- aes = AES_ECB(key)
-
- try :
- json_str = aes.decrypt(data)
- data = json.loads(json_str)
- logger.info('yst recv =============>>>>\n{}',json.dumps(data, ensure_ascii=False, sort_keys=True, indent=2))
- except Exception:
- data = await request.body()
- body = data.decode(encoding='utf-8')
- print('body:', body)
- raise TokenException()
- return data
- def yst_pass_ext(
- request: Request,
- token: str = '',
- x_tif_signature: str = Header(None),
- x_tif_nonce: str = Header(None),
- x_tif_timestamp: str = Header(None),
- x_tif_uid: str = Header(None),
- x_tif_uinfo: str = Header(None),
- x_tif_ext: str = Header(None),
- ):
- '''
- if token == 'fb5d86fa-1cf3-11ef-9b67-00ff257b5fc6':
- return {
- "account": "DGD440782197706290318",
- "account_type": "human",
- "cid": "440782197706290318",
- "corp": {
- "account": "50b50b6d",
- "address": "",
- "cid": "91440705354627820H",
- "ctype": "49",
- "level": "L2",
- "link_person_name": "李步尚",
- "mobile": "13426789046",
- "name": "江门市新会区尚软网络科技有限公司",
- "role_type": "parent_linked",
- "social_credit_code": "91440705354627820H",
- "uid": "359a7db835994a7ebd147f09c5b9ae5d"
- },
- "ctype": "10",
- "level": "L2",
- "link_person_cid": "",
- "login_type": "SG-GASMHS",
- "mobile": "13426789046",
- "name": "李步尚",
- "origin": "SG",
- "sex": "1",
- "tokenid": "f36be617ec901c64d1234484eabd7431",
- "uid": "5b8f1a3e774d4ba8b7ccb2b2d51a38cd",
- "uversion": "1.0"
- }
- '''
- redis_key = "yst_token_" + token
- redis_val = redis_get_json(redis_key)
- if redis_val is None:
- raise TokenException()
- else:
- return redis_val
- def yst_response(obj: any):
- logger.info("sp_response: {}", obj)
- response_nonce = ranstr(20)
- response_timestamp = str(int(time.time()))
- response_sign = calcResponseSign(response_timestamp, settings.YST_PASS_TOKEN, response_nonce)
-
- response_headers = {
- "x-tif-signature": response_sign,
- "x-tif-timestamp": response_timestamp,
- "x-tif-nonce": response_nonce
- }
- json_str = json.dumps(obj, ensure_ascii=False)
- key = token()
- aes = AES_ECB(key)
- data = aes.encrypt(json_str)
-
- content = {
- "errcode": 0,
- "errmsg": "",
- "data": data
- }
- return JSONResponse(content = content, headers = response_headers)
- def yst_image_response(data: any):
- logger.info("yst_image_response: {}", data)
- response_nonce = ranstr(20)
- response_timestamp = str(int(time.time()))
- response_sign = calcResponseSign(response_timestamp, settings.YST_PASS_TOKEN, response_nonce)
-
- response_headers = {
- "x-tif-signature": response_sign,
- "x-tif-timestamp": response_timestamp,
- "x-tif-nonce": response_nonce
- }
- content = {
- "errcode": 0,
- "errmsg": "",
- "data": data
- }
- return JSONResponse(content = content, headers = response_headers)
- def yst_user_info_response(data: str):
- logger.info("yst_user_info_response: {}", data)
- response_nonce = ranstr(20)
- response_timestamp = str(int(time.time()))
- response_sign = calcResponseSign(response_timestamp, settings.YST_PASS_TOKEN, response_nonce)
-
- response_headers = {
- "x-tif-signature": response_sign,
- "x-tif-timestamp": response_timestamp,
- "x-tif-nonce": response_nonce
- }
- return PlainTextResponse(content = data, headers = response_headers)
|