123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102 |
- # -*- coding: utf-8 -*-
- from fastapi import Header
- from datetime import datetime, timedelta
- import jwt
- from passlib.context import CryptContext
- from sqlalchemy.orm import Session
- from database import get_db,get_db_local
- import traceback
- from models import *
- from exceptions import TokenException,RoleException
- from config import settings
- from extensions import logger
- # https://fastapi.tiangolo.com/tutorial/request-forms/
- # https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
- pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
- def valid_access_token(Authorization: str = Header(..., alias="Authorization")) -> int:
- try:
- access_token = Authorization.removeprefix("Bearer ")
- token_exception = TokenException()
- payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
- print(payload,payload.get("sub"))
- user_id: str = payload.get("sub")
- logger.info('sub user_id: {}', user_id)
- except Exception:
- raise token_exception
-
- return int(user_id)
- def valid_access_token_role(Authorization: str = Header(..., alias="Authorization")) -> int:
- try:
- access_token = Authorization.removeprefix("Bearer ")
- token_exception = TokenException()
- payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
- print(payload, payload.get("sub"))
- user_id: str = payload.get("sub")
- logger.info('sub user_id: {}', user_id)
- role_list = ["superadmin","super_ld","super_worker"]
- db= get_db_local()
- role_id_list = [info.role_id for info in db.query(SysRole).filter(SysRole.role_key.in_(role_list)).all()]
- if db.query(SysUserRole).filter(SysUserRole.role_id.in_(role_id_list),SysUserRole.user_id==user_id).first() is None:
- raise RoleException(errcode=4003, errmsg="权限不够")
- except RoleException:
- raise
- except Exception:
- # 处理异常
- traceback.print_exc()
- raise token_exception
- return int(user_id)
- def valid_websocket_token(Authorization: str ) -> int: #= Header(..., alias="sec-websocket-protocol")
- # 目前小屏测试还不能用登录功能,暂时先这样 2024/11/03
- # def valid_access_token(Authorization: str = Header("Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxIiwiZXhwIjoyMDM5Njk2ODMzfQ.Rhd38oo_S1odjg0xnT4n31cCWCAAPXGb8y_V2XcgqzQ"))->int:
- try:
- access_token = Authorization.replace("Authorization: Bearer ","")
- # print(access_token)
- token_exception = TokenException()
- payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
- # print(payload, payload.get("sub"))
- user_id: str = payload.get("sub")
- logger.info('sub user_id: {}', user_id)
- except Exception:
- return -1
- # raise token_exception
- return int(user_id)
- def verify_secret(plain_secret, hashed_secret):
- return pwd_context.verify(plain_secret, hashed_secret)
- def get_secret_hash(secret):
- return pwd_context.hash(secret)
- def create_access_token(*, data: dict, expires_delta: timedelta = None):
- to_encode = data.copy()
- if expires_delta:
- expire = datetime.utcnow() + expires_delta
- else:
- expire = datetime.utcnow() + timedelta(minutes=10)
- to_encode.update({"exp": expire})
- encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
- return encoded_jwt
- #####################################################
- # 账号密码类使用如下函数
- #####################################################
- # 加密密码
- def encrypt_password(password: str) -> str:
- return pwd_context.hash(password)
- # 验证密码
- def verify_password(password: str, hashed: str) -> bool:
- return pwd_context.verify(password, hashed)
|