# -*- coding: utf-8 -*- from fastapi import Header, Depends from datetime import datetime, timedelta import jwt from passlib.context import CryptContext from sqlalchemy.orm import Session from database import get_db import traceback from models import * from exceptions import TokenException,RoleException from config import settings from extensions import logger # https://fastapi.tiangolo.com/tutorial/request-forms/ # https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/ pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") def valid_access_token(Authorization: str = Header(..., alias="Authorization")) -> int: try: access_token = Authorization.removeprefix("Bearer ") token_exception = TokenException() payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) # print(payload,payload.get("sub")) user_id: str = payload.get("sub") # logger.info('sub user_id: {}', user_id) except Exception: raise token_exception return int(user_id) def valid_access_token_role(Authorization: str = Header(..., alias="Authorization"), db: Session = Depends(get_db)) -> int: try: access_token = Authorization.removeprefix("Bearer ") token_exception = TokenException() payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) # print(payload, payload.get("sub")) user_id: str = payload.get("sub") # logger.info('sub user_id: {}', user_id) role_list = ["superadmin","super_ld","super_worker"] # db= get_db_local() role_id_list = [info.role_id for info in db.query(SysRole).filter(SysRole.role_key.in_(role_list)).all()] if db.query(SysUserRole).filter(SysUserRole.role_id.in_(role_id_list),SysUserRole.user_id==user_id).first() is None: raise RoleException(errcode=4003, errmsg="权限不够") except RoleException: raise except Exception: # 处理异常 traceback.print_exc() raise token_exception return int(user_id) def valid_websocket_token(Authorization: str ) -> int: #= Header(..., alias="sec-websocket-protocol") # 目前小屏测试还不能用登录功能,暂时先这样 2024/11/03 # def valid_access_token(Authorization: str = Header("Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxIiwiZXhwIjoyMDM5Njk2ODMzfQ.Rhd38oo_S1odjg0xnT4n31cCWCAAPXGb8y_V2XcgqzQ"))->int: try: access_token = Authorization.replace("Authorization: Bearer ","") # print(access_token) token_exception = TokenException() payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM]) # print(payload, payload.get("sub")) user_id: str = payload.get("sub") logger.info('sub user_id: {}', user_id) except Exception: return -1 # raise token_exception return int(user_id) def verify_secret(plain_secret, hashed_secret): return pwd_context.verify(plain_secret, hashed_secret) def get_secret_hash(secret): return pwd_context.hash(secret) def create_access_token(*, data: dict, expires_delta: timedelta = None): to_encode = data.copy() if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta(minutes=10) to_encode.update({"exp": expire}) encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM) return encoded_jwt ##################################################### # 账号密码类使用如下函数 ##################################################### # 加密密码 def encrypt_password(password: str) -> str: return pwd_context.hash(password) # 验证密码 def verify_password(password: str, hashed: str) -> bool: return pwd_context.verify(password, hashed)