#!/usr/bin/env python3 # -*- coding: utf-8 -*- from fastapi import APIRouter, Request, Depends, Form from database import get_db from utils.StripTagsHTMLParser import * from sqlalchemy.orm import Session from datetime import datetime, timedelta import jwt from passlib.context import CryptContext from models import * from sqlalchemy import text, exists, and_, or_, not_ from sqlalchemy.sql import func from models import * from extensions import logger from utils import * import traceback from exceptions import TokenException router = APIRouter() SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3ff" ALGORITHM = "HS256" pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") @router.post("/create/token") async def login( request: Request, client_id: str = Form(..., description=''), client_secret: str = Form(..., description=''), db: Session = Depends(get_db) ): hash_passwd = pwd_context.hash(client_secret) return { "code": 1, "msg": "", "data": hash_passwd } @router.post('/token') async def login( request: Request, client_id: str = Form(..., description=''), client_secret: str = Form(..., description=''), grant_type: str = Form(..., description=''), scope: str = Form(..., description=''), db: Session = Depends(get_db) ): app = authenticate_app(db, client_id, client_secret) if not app: return {"code": 0, "msg": "client_id not exists", "data": {}} expires_in = 7200 access_token_expires = timedelta(seconds=expires_in) access_token = create_access_token( data={"sub": client_id}, expires_delta=access_token_expires ) return { "code": 1, "msg": "成功", "data": { "access_token": access_token, "expires_in": expires_in, "token_type": "Bearer", "scope": "all" } } def verify_secret(plain_secret, hashed_secret): return pwd_context.verify(plain_secret, hashed_secret) def get_app(db: Session, client_id: str): app = db.query(DangerAppInfo).filter(DangerAppInfo.client_id == client_id).first() return app def authenticate_app(db: Session, client_id: str, client_secret: str): app = get_app(db, client_id) if not app: return False if not verify_secret(client_secret, app.client_secret): return False return app def create_access_token(*, data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.now() + expires_delta else: expire = datetime.now() + timedelta(seconds=7200) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def valid_access_token(Authorization: str = Header(..., alias="Authorization"), db: Session = Depends(get_db)) -> str: try: access_token = Authorization.removeprefix("Bearer ") payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM]) client_id: str = payload.get("sub") app = get_app(db, client_id) if not app: raise HTTPException(status_code=401, detail="access_token已失效") except Exception: # 处理异常 traceback.print_exc() raise HTTPException(status_code=401, detail="access_token已失效") return client_id