47 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
			
		
		
	
	
			47 lines
		
	
	
		
			1.4 KiB
		
	
	
	
		
			Python
		
	
	
	
	
	
| import jwt
 | |
| from fastapi import Depends, HTTPException, status
 | |
| from sqlalchemy.orm import Session
 | |
| 
 | |
| from app import schemas
 | |
| from app.core import settings, security
 | |
| from app.core.security import reusable_oauth2
 | |
| from app.db import get_db
 | |
| from app.db.models.user import User
 | |
| 
 | |
| 
 | |
| def get_current_user(
 | |
|         db: Session = Depends(get_db), token: str = Depends(reusable_oauth2)
 | |
| ) -> User:
 | |
|     try:
 | |
|         payload = jwt.decode(
 | |
|             token, settings.SECRET_KEY, algorithms=[security.ALGORITHM]
 | |
|         )
 | |
|         token_data = schemas.TokenPayload(**payload)
 | |
|     except (jwt.DecodeError, jwt.InvalidTokenError, jwt.ImmatureSignatureError):
 | |
|         raise HTTPException(
 | |
|             status_code=status.HTTP_403_FORBIDDEN,
 | |
|             detail="token校验不通过",
 | |
|         )
 | |
|     user = User.get(db, rid=token_data.sub)
 | |
|     if not user:
 | |
|         raise HTTPException(status_code=404, detail="用户不存在")
 | |
|     return user
 | |
| 
 | |
| 
 | |
| def get_current_active_user(
 | |
|         current_user: User = Depends(get_current_user),
 | |
| ) -> User:
 | |
|     if not current_user.is_active:
 | |
|         raise HTTPException(status_code=400, detail="用户未激活")
 | |
|     return current_user
 | |
| 
 | |
| 
 | |
| def get_current_active_superuser(
 | |
|         current_user: User = Depends(get_current_user),
 | |
| ) -> User:
 | |
|     if not current_user.is_superuser:
 | |
|         raise HTTPException(
 | |
|             status_code=400, detail="用户权限不足"
 | |
|         )
 | |
|     return current_user
 |