# modules/auth/api.py from fastapi import APIRouter, Depends, HTTPException, status from fastapi.security import OAuth2PasswordRequestForm from jose import JWTError from modules.auth.models import User from modules.auth.schemas import ( UserCreate, UserResponse, Token, RefreshTokenRequest, LogoutRequest, ) from modules.auth.services import create_user from modules.auth.security import ( TokenType, get_current_user, oauth2_scheme, create_access_token, create_refresh_token, verify_token, authenticate_user, blacklist_tokens, ) from sqlalchemy.orm import Session from typing import Annotated from core.database import get_db from datetime import timedelta from core.config import settings # Assuming settings is defined in core.config from core.exceptions import unauthorized_exception router = APIRouter(prefix="/auth", tags=["auth"]) @router.post( "/register", response_model=UserResponse, status_code=status.HTTP_201_CREATED ) def register(user: UserCreate, db: Annotated[Session, Depends(get_db)]): return create_user(user.username, user.password, user.name, db) @router.post("/login", response_model=Token) def login( form_data: Annotated[OAuth2PasswordRequestForm, Depends()], db: Annotated[Session, Depends(get_db)], ): """ Authenticate user and return JWT tokens in the response body. """ user = authenticate_user(form_data.username, form_data.password, db) if not user: raise HTTPException( status_code=status.HTTP_401_UNAUTHORIZED, detail="Incorrect username or password", ) access_token = create_access_token( data={"sub": user.username}, expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES), ) refresh_token = create_refresh_token(data={"sub": user.username}) return { "access_token": access_token, "refresh_token": refresh_token, "token_type": "bearer", } @router.post("/refresh") def refresh_token( payload: RefreshTokenRequest, db: Annotated[Session, Depends(get_db)] ): print("Refreshing token...") refresh_token = payload.refresh_token if not refresh_token: raise unauthorized_exception("Refresh token missing in request body") user_data = verify_token( refresh_token, expected_token_type=TokenType.REFRESH, db=db ) if not user_data: raise unauthorized_exception("Invalid refresh token") new_access_token = create_access_token(data={"sub": user_data.username}) return {"access_token": new_access_token, "token_type": "bearer"} @router.post("/logout") def logout( payload: LogoutRequest, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], access_token: str = Depends(oauth2_scheme), ): try: refresh_token = payload.refresh_token if not refresh_token: raise unauthorized_exception("Refresh token not found in request body") blacklist_tokens(access_token=access_token, refresh_token=refresh_token, db=db) return {"message": "Logged out successfully"} except JWTError: raise unauthorized_exception("Invalid token")