__init__.py 1.6 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354
  1. #!/usr/bin/env python3
  2. # -*- coding: utf-8 -*-
  3. from fastapi import APIRouter, Request, Depends, Form
  4. from database import get_db
  5. from utils.StripTagsHTMLParser import *
  6. from sqlalchemy.orm import Session
  7. from datetime import datetime, timedelta
  8. import jwt
  9. from passlib.context import CryptContext
  10. from models import *
  11. from sqlalchemy import text, exists, and_, or_, not_
  12. from sqlalchemy.sql import func
  13. from models import *
  14. from extensions import logger
  15. from utils import *
  16. import traceback
  17. from exceptions import TokenException
  18. from . import topinfo
  19. router = APIRouter()
  20. SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3ff"
  21. ALGORITHM = "HS256"
  22. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  23. def valid_access_token(Authorization: str = Header(..., alias="Authorization"), db: Session = Depends(get_db)) -> str:
  24. try:
  25. access_token = Authorization.removeprefix("Bearer ")
  26. payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM])
  27. client_id: str = payload.get("sub")
  28. app = get_app(db, client_id)
  29. if not app:
  30. raise HTTPException(status_code=401, detail="access_token已失效")
  31. except Exception:
  32. # 处理异常
  33. traceback.print_exc()
  34. raise HTTPException(status_code=401, detail="access_token已失效")
  35. return client_id
  36. def get_app(db: Session, client_id: str):
  37. app = db.query(DangerAppInfo).filter(DangerAppInfo.client_id == client_id).first()
  38. return app
  39. router = APIRouter()
  40. router.include_router(topinfo.router, prefix="/topinfo", dependencies=[Depends(valid_access_token)])