# -*- coding: utf-8 -*- from fastapi import Header from datetime import datetime, timedelta import jwt from passlib.context import CryptContext from sqlalchemy.orm import Session from models.base import AppInfo from exceptions import TokenException from config import settings from extensions import logger # https://fastapi.tiangolo.com/tutorial/request-forms/ # https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") #="Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxIiwiZXhwIjoyMDM5Njk2ODMzfQ.Rhd38oo_S1odjg0xnT4n31cCWCAAPXGb8y_V2XcgqzQ")->int:# def valid_access_token(Authorization: str = Header(..., alias="Authorization")) -> int: # 目前小屏测试还不能用登录功能,暂时先这样 2024/11/03 # def valid_access_token(Authorization: str = Header("Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxIiwiZXhwIjoyMDM5Njk2ODMzfQ.Rhd38oo_S1odjg0xnT4n31cCWCAAPXGb8y_V2XcgqzQ"))->int: try: access_token = Authorization.removeprefix("Bearer ") token_exception = TokenException() payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) print(payload,payload.get("sub")) user_id: str = payload.get("sub") logger.info('sub user_id: {}', user_id) except Exception: return -1 #raise token_exception return int(user_id) def valid_websocket_token(Authorization: str ) -> int: #= Header(..., alias="sec-websocket-protocol") # 目前小屏测试还不能用登录功能,暂时先这样 2024/11/03 # def valid_access_token(Authorization: str = Header("Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxIiwiZXhwIjoyMDM5Njk2ODMzfQ.Rhd38oo_S1odjg0xnT4n31cCWCAAPXGb8y_V2XcgqzQ"))->int: try: access_token = Authorization.replace("Authorization: Bearer ","") # print(access_token) token_exception = TokenException() payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) # print(payload, payload.get("sub")) user_id: str = payload.get("sub") logger.info('sub user_id: {}', user_id) except Exception: return -1 # raise token_exception return int(user_id) def verify_secret(plain_secret, hashed_secret): return pwd_context.verify(plain_secret, hashed_secret) def get_secret_hash(secret): return pwd_context.hash(secret) def create_access_token(*, data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=10) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt