security.py 1.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445
  1. # -*- coding: utf-8 -*-
  2. from fastapi import Header
  3. from datetime import datetime, timedelta
  4. import jwt
  5. from passlib.context import CryptContext
  6. from sqlalchemy.orm import Session
  7. from models.base import AppInfo
  8. from exceptions import TokenException
  9. from config import settings
  10. from extensions import logger
  11. # https://fastapi.tiangolo.com/tutorial/request-forms/
  12. # https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
  13. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  14. def valid_access_token(Authorization: str = Header(..., alias="Authorization")) -> int:
  15. access_token = Authorization.removeprefix("Bearer ")
  16. token_exception = TokenException()
  17. try:
  18. payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
  19. user_id: str = payload.get("sub")
  20. logger.info('sub user_id: {}', user_id)
  21. except Exception:
  22. raise token_exception
  23. return int(user_id)
  24. def verify_secret(plain_secret, hashed_secret):
  25. return pwd_context.verify(plain_secret, hashed_secret)
  26. def get_secret_hash(secret):
  27. return pwd_context.hash(secret)
  28. def create_access_token(*, data: dict, expires_delta: timedelta = None):
  29. to_encode = data.copy()
  30. if expires_delta:
  31. expire = datetime.utcnow() + expires_delta
  32. else:
  33. expire = datetime.utcnow() + timedelta(minutes=10)
  34. to_encode.update({"exp": expire})
  35. encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
  36. return encoded_jwt