libushang 1 月之前
父節點
當前提交
e52fdb2473
共有 2 個文件被更改,包括 168 次插入1 次删除
  1. 75 0
      common/TassApi.py
  2. 93 1
      routers/prod_api/auth.py

+ 75 - 0
common/TassApi.py

@@ -0,0 +1,75 @@
+# -*- coding: utf-8 -*-
+import requests
+
+# 辅助类
+# 调用JAVA编写的密评接口
+API_ROOT = "http://127.0.0.1:8052/tass"
+
+# 隐私信息加密
+def CipherEncrypt(data: str) -> str:
+    resp = __post_data(API_ROOT + "/CipherEncrypt", data)
+    print("隐私信息加密 >>>", data, resp)
+    return resp
+
+# 隐私信息解密
+def CipherDecrypt(data: str) -> str:
+    resp = __post_data(API_ROOT + "/CipherDecrypt", data)
+    print("隐私信息解密 >>>", data, resp)
+    return resp
+
+# 敏感信息数据加密
+def TransparentEnc(data: str) -> str:
+    resp = __post_data(API_ROOT + "/TransparentEnc", data)
+    print("敏感信息数据加密 >>>", data, resp)
+    return resp
+
+# 敏感信息数据解密
+def TransparentDec(data: str) -> str:
+    resp = __post_data(API_ROOT + "/TransparentDec", data)
+    print("敏感信息数据解密 >>>", data, resp)
+    return resp
+
+# 计算HMAC
+def Hmac(data: str) -> str:
+    return __post_data(API_ROOT + "/Hmac", data)
+
+# 验证HMAC
+def HmacVerify(sign_data: str, sign_mac: str) -> str:
+    data = {}
+    data['sign_data'] = sign_data
+    data['sign_mac'] = sign_mac
+    headers = {'Content-Type': 'application/json;charset=UTF-8'}
+
+    response = requests.post(url=API_ROOT + "/HmacVerify", headers=headers, json=data, timeout=600)
+    if response.status_code == 200:
+        result = response.json()
+        print(result)
+        if result['errcode'] == 0:
+            return result['data'] == "success"
+    return False
+
+# 完成P7的签名验证
+def verifyP7Sign(p7SignData: str, p7SignValue: str) -> any:
+    data = {}
+    data['p7SignData'] = p7SignData
+    data['p7SignValue'] = p7SignValue
+    headers = {'Content-Type': 'application/json;charset=UTF-8'}
+
+    response = requests.post(url=API_ROOT + "/verifyP7Sign", headers=headers, json=data, timeout=600)
+    if response.status_code == 200:
+        result = response.json()
+        print(result)
+        if result['errcode'] == 0:
+            return result['data']
+    return None
+
+# 公用POST方法
+def __post_data(api_url: str, data: str):
+    headers = {'content-type': 'charset=utf8'}
+    response = requests.post(url=api_url, headers=headers, data=data.encode('UTF-8'), timeout=600)
+    if response.status_code == 200:
+        result = response.json()
+        print(result)
+        if result['errcode'] == 0:
+            return result['data']
+    return ""

+ 93 - 1
routers/prod_api/auth.py

@@ -17,7 +17,7 @@ from datetime import timedelta
 from common.security import verify_password
 from utils import ase_utils
 from common.auth_user import *
-from common import YzyApi
+from common import YzyApi, TassApi
 from models import *
 from urllib.parse import quote
 import requests
@@ -370,3 +370,95 @@ async def login(
             "openid": ""
         }
     }
+
+
+# USBKEY登录
+@router.post("/login_with_usbkey")
+def login_with_usbkey(
+    request: Request,
+    username: str = Body(...),
+    keyID: str = Body(...),
+    p7SignData: str = Body(...),
+    p7SignValue: str = Body(...),
+    db: Session = Depends(get_db)    
+):
+    '''
+    result = TassApi.verifyP7Sign(p7SignData, p7SignValue)
+    if result is None:
+        return {
+            "code": 500, 
+            "msg": "证书验签失败",
+        }
+    logger.info('keyID: {}', keyID)    
+    logger.info('verifyP7Sign: {}', result)
+    
+    username = TassApi.TransparentEnc(username)
+    '''    
+    
+    redis_login_key = "login_user_" + username
+    login_error_times = redis_get(redis_login_key)
+    if login_error_times is None:
+        login_error_times = 0
+    else:
+        login_error_times = int(login_error_times)
+
+    if login_error_times >= 5:
+        return {
+            "code": 500, 
+            "msg": "登录错误多,请5分钟后再尝试!",
+        }
+
+    row = db.query(SysUser).filter(SysUser.user_name == username).first()
+    if row is None:
+        login_error_times = login_error_times + 1
+        redis_set_with_time(redis_login_key, str(login_error_times), 300)
+
+        if row is None:
+            return {
+                "code": 500, 
+                "msg": "账号或者密码错误",
+            }
+
+    user_id = str(row.user_id)
+
+    auth = {
+        "user_id": user_id,
+        "user_name": row.user_name,  
+        "nick_name": row.nick_name,
+        "is_yzy_user": "0"
+    }
+
+    logger.info('auth {}', auth)
+
+    request.session['user_auth'] = auth
+    request.session['user_auth_sign'] = data_auth_sign(auth)
+    request.session['username'] = username
+
+    # db_czrz_serv.log_username(db, row.uid, row.username, "登录", "后台管理账号+密码登录成功", request.client.host)
+    row.login_date = datetime.now()
+    row.login_ip = request.client.host
+    db.commit()
+
+    access_token_expires = timedelta(days = 5)
+    access_token = security.create_access_token(
+        data={"sub": user_id}, expires_delta = access_token_expires
+    )
+
+    refresh_token_expires = timedelta(days = 5)
+    refresh_token = security.create_access_token(
+        data={"sub": user_id}, expires_delta = refresh_token_expires
+    )
+
+    return {
+        "code": 200,
+        "msg": "操作成功",
+        "data": {
+            "access_token": access_token,
+            "refresh_token": refresh_token,
+            "expire_in": 7200,
+            "refresh_expire_in": 7200,
+            "client_id": "e5cd7e4891bf95d1d19206ce24a7b32e",
+            "scope": "",
+            "openid": ""
+        }
+    }