123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- from fastapi import APIRouter, Request, Depends, Form
- from database import get_db
- from utils.StripTagsHTMLParser import *
- from sqlalchemy.orm import Session
- from datetime import datetime, timedelta
- import jwt
- from passlib.context import CryptContext
- from models import *
- from sqlalchemy import text, exists, and_, or_, not_
- from sqlalchemy.sql import func
- from models import *
- from extensions import logger
- from utils import *
- import traceback
- from exceptions import TokenException
- router = APIRouter()
- SECRET_KEY = "09d25e094faa6ca2556c818166b7a9563b93f7099f6f0f4caa6cf63b88e8d3ff"
- ALGORITHM = "HS256"
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- @router.post("/create/token")
- async def login(
- request: Request,
- client_id: str = Form(..., description=''),
- client_secret: str = Form(..., description=''),
- db: Session = Depends(get_db)
- ):
- hash_passwd = pwd_context.hash(client_secret)
- return {
- "code": 1,
- "msg": "",
- "data": hash_passwd
- }
- @router.post('/token')
- async def login(
- request: Request,
- client_id: str = Form(..., description=''),
- client_secret: str = Form(..., description=''),
- grant_type: str = Form(..., description=''),
- scope: str = Form(..., description=''),
- db: Session = Depends(get_db)
- ):
- app = authenticate_app(db, client_id, client_secret)
- if not app:
- return {"code": 0, "msg": "client_id not exists", "data": {}}
-
- expires_in = 7200
- access_token_expires = timedelta(seconds=expires_in)
- access_token = create_access_token(
- data={"sub": client_id}, expires_delta=access_token_expires
- )
- return {
- "code": 1,
- "msg": "成功",
- "data": {
- "access_token": access_token,
- "expires_in": expires_in,
- "token_type": "Bearer",
- "scope": "all"
- }
- }
- def verify_secret(plain_secret, hashed_secret):
- return pwd_context.verify(plain_secret, hashed_secret)
- def get_app(db: Session, client_id: str):
- app = db.query(DangerAppInfo).filter(DangerAppInfo.client_id == client_id).first()
- return app
- def authenticate_app(db: Session, client_id: str, client_secret: str):
- app = get_app(db, client_id)
- if not app:
- return False
- if not verify_secret(client_secret, app.client_secret):
- return False
- return app
- def create_access_token(*, data: dict, expires_delta: timedelta = None):
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.now() + expires_delta
- else:
- expire = datetime.now() + timedelta(seconds=7200)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
- return encoded_jwt
- def valid_access_token(Authorization: str = Header(..., alias="Authorization"), db: Session = Depends(get_db)) -> str:
- try:
- access_token = Authorization.removeprefix("Bearer ")
- payload = jwt.decode(access_token, SECRET_KEY, algorithms=[ALGORITHM])
- client_id: str = payload.get("sub")
- app = get_app(db, client_id)
- if not app:
- raise HTTPException(status_code=401, detail="access_token已失效")
-
- except Exception:
- # 处理异常
- traceback.print_exc()
- raise HTTPException(status_code=401, detail="access_token已失效")
- return client_id
|