123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends, Form
- from database import get_db
- from utils.StripTagsHTMLParser import *
- from sqlalchemy.orm import Session
- from datetime import datetime, timedelta
- import jwt
- from passlib.context import CryptContext
- from models import *
- from sqlalchemy import text, exists, and_, or_, not_
- from sqlalchemy.sql import func
- from models import *
- from extensions import logger
- from utils import *
- import traceback
- from exceptions import TokenException
- from . import topinfo
- router = APIRouter()
- SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3ff"
- ALGORITHM = "HS256"
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- def valid_access_token(Authorization: str = Header(..., alias="Authorization"), db: Session = Depends(get_db)) -> str:
- try:
- access_token = Authorization.removeprefix("Bearer ")
- payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM])
- client_id: str = payload.get("sub")
- app = get_app(db, client_id)
- if not app:
- raise HTTPException(status_code=401, detail="access_token已失效")
-
- except Exception:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=401, detail="access_token已失效")
- return client_id
- def get_app(db: Session, client_id: str):
- app = db.query(DangerAppInfo).filter(DangerAppInfo.client_id == client_id).first()
- return app
- router = APIRouter()
- router.include_router(topinfo.router, prefix="/topinfo", dependencies=[Depends(valid_access_token)])
|