security.py 3.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102
  1. # -*- coding: utf-8 -*-
  2. from fastapi import Header
  3. from datetime import datetime, timedelta
  4. import jwt
  5. from passlib.context import CryptContext
  6. from sqlalchemy.orm import Session
  7. from database import get_db,get_db_local
  8. import traceback
  9. from models import *
  10. from exceptions import TokenException,RoleException
  11. from config import settings
  12. from extensions import logger
  13. # https://fastapi.tiangolo.com/tutorial/request-forms/
  14. # https://fastapi.tiangolo.com/tutorial/security/oauth2-jwt/
  15. pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
  16. def valid_access_token(Authorization: str = Header(..., alias="Authorization")) -> int:
  17. try:
  18. access_token = Authorization.removeprefix("Bearer ")
  19. token_exception = TokenException()
  20. payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
  21. print(payload,payload.get("sub"))
  22. user_id: str = payload.get("sub")
  23. logger.info('sub user_id: {}', user_id)
  24. except Exception:
  25. raise token_exception
  26. return int(user_id)
  27. def valid_access_token_role(Authorization: str = Header(..., alias="Authorization")) -> int:
  28. try:
  29. access_token = Authorization.removeprefix("Bearer ")
  30. token_exception = TokenException()
  31. payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
  32. print(payload, payload.get("sub"))
  33. user_id: str = payload.get("sub")
  34. logger.info('sub user_id: {}', user_id)
  35. role_list = ["superadmin","super_ld","super_worker"]
  36. db= get_db_local()
  37. role_id_list = [info.role_id for info in db.query(SysRole).filter(SysRole.role_key.in_(role_list)).all()]
  38. if db.query(SysUserRole).filter(SysUserRole.role_id.in_(role_id_list),SysUserRole.user_id==user_id).first() is None:
  39. raise RoleException(errcode=4003, errmsg="权限不够")
  40. except RoleException:
  41. raise
  42. except Exception:
  43. # 处理异常
  44. traceback.print_exc()
  45. raise token_exception
  46. return int(user_id)
  47. def valid_websocket_token(Authorization: str ) -> int: #= Header(..., alias="sec-websocket-protocol")
  48. # 目前小屏测试还不能用登录功能,暂时先这样 2024/11/03
  49. # def valid_access_token(Authorization: str = Header("Bearer eyJ0eXAiOiJKV1QiLCJhbGciOiJIUzI1NiJ9.eyJzdWIiOiIxIiwiZXhwIjoyMDM5Njk2ODMzfQ.Rhd38oo_S1odjg0xnT4n31cCWCAAPXGb8y_V2XcgqzQ"))->int:
  50. try:
  51. access_token = Authorization.replace("Authorization: Bearer ","")
  52. # print(access_token)
  53. token_exception = TokenException()
  54. payload = jwt.decode(access_token, settings.SECRET_KEY, algorithms=[settings.ALGORITHM])
  55. # print(payload, payload.get("sub"))
  56. user_id: str = payload.get("sub")
  57. logger.info('sub user_id: {}', user_id)
  58. except Exception:
  59. return -1
  60. # raise token_exception
  61. return int(user_id)
  62. def verify_secret(plain_secret, hashed_secret):
  63. return pwd_context.verify(plain_secret, hashed_secret)
  64. def get_secret_hash(secret):
  65. return pwd_context.hash(secret)
  66. def create_access_token(*, data: dict, expires_delta: timedelta = None):
  67. to_encode = data.copy()
  68. if expires_delta:
  69. expire = datetime.utcnow() + expires_delta
  70. else:
  71. expire = datetime.utcnow() + timedelta(minutes=10)
  72. to_encode.update({"exp": expire})
  73. encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=settings.ALGORITHM)
  74. return encoded_jwt
  75. #####################################################
  76. # 账号密码类使用如下函数
  77. #####################################################
  78. # 加密密码
  79. def encrypt_password(password: str) -> str:
  80. return pwd_context.hash(password)
  81. # 验证密码
  82. def verify_password(password: str, hashed: str) -> bool:
  83. return pwd_context.verify(password, hashed)