1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768 |
- # -*- coding: utf-8 -*-
- from fastapi import Header
- from datetime import datetime, timedelta
- import jwt
- from passlib.context import CryptContext
- from sqlalchemy.orm import Session
- from models.base import AppInfo
- from exceptions import TokenException
- from config import settings
- from extensions import logger
- # https://fastapi.tiangolo.com/tutorial/request-forms/
- # https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- #="Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxIiwiZXhwIjoyMDM5Njk2ODMzfQ.Rhd38oo_S1odjg0xnT4n31cCWCAAPXGb8y_V2XcgqzQ")->int:#
- def valid_access_token(Authorization: str = Header(..., alias="Authorization")) -> int:
- # 目前小屏测试还不能用登录功能,暂时先这样 2024/11/03
- # def valid_access_token(Authorization: str = Header("Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxIiwiZXhwIjoyMDM5Njk2ODMzfQ.Rhd38oo_S1odjg0xnT4n31cCWCAAPXGb8y_V2XcgqzQ"))->int:
- try:
- access_token = Authorization.removeprefix("Bearer ")
- token_exception = TokenException()
- payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
- print(payload,payload.get("sub"))
- user_id: str = payload.get("sub")
- logger.info('sub user_id: {}', user_id)
- except Exception:
- return -1
- #raise token_exception
-
- return int(user_id)
- def valid_websocket_token(Authorization: str ) -> int: #= Header(..., alias="sec-websocket-protocol")
- # 目前小屏测试还不能用登录功能,暂时先这样 2024/11/03
- # def valid_access_token(Authorization: str = Header("Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxIiwiZXhwIjoyMDM5Njk2ODMzfQ.Rhd38oo_S1odjg0xnT4n31cCWCAAPXGb8y_V2XcgqzQ"))->int:
- try:
- access_token = Authorization.replace("Authorization: Bearer ","")
- # print(access_token)
- token_exception = TokenException()
- payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
- # print(payload, payload.get("sub"))
- user_id: str = payload.get("sub")
- logger.info('sub user_id: {}', user_id)
- except Exception:
- return -1
- # raise token_exception
- return int(user_id)
- def verify_secret(plain_secret, hashed_secret):
- return pwd_context.verify(plain_secret, hashed_secret)
- def get_secret_hash(secret):
- return pwd_context.hash(secret)
- def create_access_token(*, data: dict, expires_delta: timedelta = None):
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(minutes=10)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
- return encoded_jwt
|