Files

38 lines
1.3 KiB
Python

from fastapi import Depends, HTTPException, status
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy import select
from app.core.database import get_db
from app.core.security import decode_token
from app.models.user import User, UserRole
bearer = HTTPBearer()
async def get_current_user(
credentials: HTTPAuthorizationCredentials = Depends(bearer),
db: AsyncSession = Depends(get_db),
) -> User:
user_id = decode_token(credentials.credentials)
if not user_id:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
result = await db.execute(select(User).where(User.id == user_id))
user = result.scalar_one_or_none()
if not user or not user.is_active:
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
return user
async def get_approved_user(user: User = Depends(get_current_user)) -> User:
if user.role == UserRole.pending:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Account pending approval")
return user
async def get_admin_user(user: User = Depends(get_approved_user)) -> User:
if user.role != UserRole.admin:
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
return user