from datetime import datetime, timedelta from typing import Any, Union, Optional import jwt from fastapi.security import OAuth2PasswordBearer from passlib.context import CryptContext from app.core.config import settings from cryptography.fernet import Fernet pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto") ALGORITHM = "HS256" # Token认证 reusable_oauth2 = OAuth2PasswordBearer( tokenUrl=f"{settings.API_V1_STR}/login/access-token" ) def create_access_token( subject: Union[str, Any], expires_delta: timedelta = None ) -> str: if expires_delta: expire = datetime.utcnow() + expires_delta else: expire = datetime.utcnow() + timedelta( minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES ) to_encode = {"exp": expire, "sub": str(subject)} encoded_jwt = jwt.encode(to_encode, settings.SECRET_KEY, algorithm=ALGORITHM) return encoded_jwt def verify_password(plain_password: str, hashed_password: str) -> bool: return pwd_context.verify(plain_password, hashed_password) def get_password_hash(password: str) -> str: return pwd_context.hash(password) def decrypt(data, key) -> Optional[bytes]: """ 解密二进制数据 """ fernet = Fernet(key) try: return fernet.decrypt(data) except Exception as e: print(str(e)) return None