38 lines
1.3 KiB
Python
38 lines
1.3 KiB
Python
from fastapi import Depends, HTTPException, status
|
|
from fastapi.security import HTTPBearer, HTTPAuthorizationCredentials
|
|
from sqlalchemy.ext.asyncio import AsyncSession
|
|
from sqlalchemy import select
|
|
|
|
from app.core.database import get_db
|
|
from app.core.security import decode_token
|
|
from app.models.user import User, UserRole
|
|
|
|
bearer = HTTPBearer()
|
|
|
|
|
|
async def get_current_user(
|
|
credentials: HTTPAuthorizationCredentials = Depends(bearer),
|
|
db: AsyncSession = Depends(get_db),
|
|
) -> User:
|
|
user_id = decode_token(credentials.credentials)
|
|
if not user_id:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="Invalid token")
|
|
|
|
result = await db.execute(select(User).where(User.id == user_id))
|
|
user = result.scalar_one_or_none()
|
|
if not user or not user.is_active:
|
|
raise HTTPException(status_code=status.HTTP_401_UNAUTHORIZED, detail="User not found")
|
|
return user
|
|
|
|
|
|
async def get_approved_user(user: User = Depends(get_current_user)) -> User:
|
|
if user.role == UserRole.PENDING:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Account pending approval")
|
|
return user
|
|
|
|
|
|
async def get_admin_user(user: User = Depends(get_approved_user)) -> User:
|
|
if user.role != UserRole.ADMIN:
|
|
raise HTTPException(status_code=status.HTTP_403_FORBIDDEN, detail="Admin access required")
|
|
return user
|