security.py 1.4 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243
  1. # -*- coding: utf-8 -*-
  2. from fastapi import Header
  3. from datetime import datetime, timedelta
  4. import jwt
  5. from passlib.context import CryptContext
  6. from sqlalchemy.orm import Session
  7. from models.base import AppInfo
  8. from exceptions import TokenException
  9. from config import settings
  10. # https://fastapi.tiangolo.com/tutorial/request-forms/
  11. # https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
  12. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  13. def valid_access_token(Authorization: str = Header(..., alias="Authorization")):
  14. access_token = Authorization.removeprefix("Bearer ")
  15. token_exception = TokenException()
  16. try:
  17. payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
  18. user_id: str = payload.get("sub")
  19. except Exception:
  20. raise token_exception
  21. return user_id
  22. def verify_secret(plain_secret, hashed_secret):
  23. return pwd_context.verify(plain_secret, hashed_secret)
  24. def get_secret_hash(secret):
  25. return pwd_context.hash(secret)
  26. def create_access_token(*, data: dict, expires_delta: timedelta = None):
  27. to_encode = data.copy()
  28. if expires_delta:
  29. expire = datetime.utcnow() + expires_delta
  30. else:
  31. expire = datetime.utcnow() + timedelta(minutes=10)
  32. to_encode.update({"exp": expire})
  33. encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
  34. return encoded_jwt