# Main test file for the authentication process. # uses conftest -> db_session as an in-memory db. # Goes through the whole authentication process: # 1. Register a user # 2. Login the user # 3. Refresh the token # 4. Logout the user # 5. Verify that the user is logged out # 6. Verify that the user cannot refresh the token # 7. Verify that the user cannot login again # 8. Verify that the user cannot register again # 9. Verify that the user cannot access protected routes (/admin) import time from fastapi import status from fastapi.testclient import TestClient from sqlalchemy.orm import Session from modules.auth.models import TokenBlacklist, User from tests.conftest import fake from .helpers import generators def test_register(client: TestClient) -> None: response = client.post( "/api/auth/register", json={ "username": fake.user_name(), "password": fake.password(), "name": fake.name(), }, ) assert response.status_code == status.HTTP_201_CREATED def test_login(db: Session, client: TestClient) -> None: user, unhashed_password = generators.create_user(db) response = client.post( "/api/auth/login", data={ "username": user.username, "password": unhashed_password, }, ) assert response.status_code == status.HTTP_200_OK response_data = response.json() assert "access_token" in response_data assert "token_type" in response_data assert response_data["token_type"] == "bearer" def test_refresh_token(db: Session, client: TestClient) -> None: user, unhashed_password = generators.create_user(db) rsp = generators.login(db, user.username, unhashed_password) access_token = rsp["access_token"] refresh_token = rsp["refresh_token"] time.sleep(1) # Sleep to ensure tokens won't be identical response = client.post( "/api/auth/refresh", headers={ "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", }, json={"refresh_token": refresh_token}, ) assert response.status_code == status.HTTP_200_OK response_data = response.json() assert "access_token" in response_data assert "token_type" in response_data assert response_data["token_type"] == "bearer" assert ( response_data["access_token"] != access_token ) # Ensure the token is refreshed def test_logout(db: Session, client: TestClient) -> None: user, unhashed_password = generators.create_user(db) rsp = generators.login(db, user.username, unhashed_password) access_token = rsp["access_token"] refresh_token = rsp["refresh_token"] response = client.post( "/api/auth/logout", headers={ "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", }, json={"refresh_token": refresh_token}, ) assert response.status_code == status.HTTP_200_OK # Verify that the token is blacklisted blacklisted_token = ( db.query(TokenBlacklist).filter(TokenBlacklist.token == access_token).first() ) assert blacklisted_token is not None # Verify that we can't still actually do anything response = client.get( "/api/user/me", headers={"Authorization": f"Bearer {access_token}"}, ) assert response.status_code == status.HTTP_401_UNAUTHORIZED response = client.post( "/api/auth/refresh", headers={ "Authorization": f"Bearer {access_token}", "Content-Type": "application/json", }, json={"refresh_token": refresh_token}, ) assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_get_me(db: Session, client: TestClient) -> None: user, unhashed_password = generators.create_user(db) access_token = generators.login(db, user.username, unhashed_password)[ "access_token" ] response = client.get( "/api/user/me", headers={"Authorization": f"Bearer {access_token}"}, ) assert response.status_code == status.HTTP_200_OK response_data = response.json() assert response_data["uuid"] == user.uuid assert response_data["username"] == user.username def test_get_me_unauthorized(client: TestClient) -> None: ### This test should fail (unauthorized) because the user isn't logged in response = client.get("/api/user/me") assert response.status_code == status.HTTP_401_UNAUTHORIZED def test_get_user(db: Session, client: TestClient) -> None: user, unhashed_password = generators.create_user(db) access_token = generators.login(db, user.username, unhashed_password)[ "access_token" ] response = client.get( f"/api/user/{user.username}", headers={"Authorization": f"Bearer {access_token}"}, ) assert response.status_code == status.HTTP_200_OK response_data = response.json() assert response_data["uuid"] == user.uuid assert response_data["username"] == user.username def test_get_user_unauthorized(db: Session, client: TestClient) -> None: ### This test should fail (unauthorized) because the user isn't us user, unhashed_password = generators.create_user(db) user2, _ = generators.create_user(db) access_token = generators.login(db, user.username, unhashed_password)[ "access_token" ] response = client.get( f"/api/user/{user2.username}", headers={"Authorization": f"Bearer {access_token}"}, ) assert response.status_code == status.HTTP_403_FORBIDDEN def test_update_user(db: Session, client: TestClient) -> None: user, unhashed_password = generators.create_user(db) new_name = fake.name() access_token = generators.login(db, user.username, unhashed_password)[ "access_token" ] response = client.patch( f"/api/user/{user.username}", headers={"Authorization": f"Bearer {access_token}"}, json={"name": new_name}, ) assert response.status_code == status.HTTP_200_OK response_data = response.json() assert response_data["name"] == new_name def test_delete_user(db: Session, client: TestClient) -> None: user, unhashed_password = generators.create_user(db) access_token = generators.login(db, user.username, unhashed_password)[ "access_token" ] response = client.delete( f"/api/user/{user.username}", headers={"Authorization": f"Bearer {access_token}"}, ) assert response.status_code == status.HTTP_200_OK # Verify that the user is deleted deleted_user = db.query(User).filter(User.username == user.username).first() assert deleted_user is None def test_get_user_forbidden(db: Session, client: TestClient) -> None: """Test getting another user's profile (should be forbidden).""" user1, password_user1 = generators.create_user(db, username="user1_get_forbidden") user2, _ = generators.create_user(db, username="user2_get_forbidden") # Log in as user1 login_rsp = generators.login(db, user1.username, password_user1) access_token = login_rsp["access_token"] # Try to get user2's profile response = client.get( f"/api/user/{user2.username}", headers={"Authorization": f"Bearer {access_token}"}, ) assert response.status_code == status.HTTP_403_FORBIDDEN def test_update_user_forbidden(db: Session, client: TestClient) -> None: """Test updating another user's profile (should be forbidden).""" user1, password_user1 = generators.create_user( db, username="user1_update_forbidden" ) user2, _ = generators.create_user(db, username="user2_update_forbidden") new_name = fake.name() # Log in as user1 login_rsp = generators.login(db, user1.username, password_user1) access_token = login_rsp["access_token"] # Try to update user2's profile response = client.patch( f"/api/user/{user2.username}", headers={"Authorization": f"Bearer {access_token}"}, json={"name": new_name}, ) assert response.status_code == status.HTTP_403_FORBIDDEN def test_delete_user_forbidden(db: Session, client: TestClient) -> None: """Test deleting another user's profile (should be forbidden).""" user1, password_user1 = generators.create_user( db, username="user1_delete_forbidden" ) user2, _ = generators.create_user(db, username="user2_delete_forbidden") # Log in as user1 login_rsp = generators.login(db, user1.username, password_user1) access_token = login_rsp["access_token"] # Try to delete user2's profile response = client.delete( f"/api/user/{user2.username}", headers={"Authorization": f"Bearer {access_token}"}, ) assert response.status_code == status.HTTP_403_FORBIDDEN