working auth + users systems

This commit is contained in:
c-d-p
2025-04-16 21:32:57 +02:00
parent 516adc606d
commit 18ddb2f332
56 changed files with 943 additions and 0 deletions

View File

Binary file not shown.

58
backend/tests/conftest.py Normal file
View File

@@ -0,0 +1,58 @@
# conftest.py
from typing import Generator, Callable, Any
import pytest
from testcontainers.postgres import PostgresContainer
from fastapi.testclient import TestClient
from core.config import settings
from faker import Faker
from sqlalchemy.orm import Session
from core.database import get_db, get_sessionmaker
fake = Faker()
@pytest.fixture(scope="session")
def postgres_container() -> Generator[PostgresContainer, None, None]:
"""Fixture to create a PostgreSQL container for testing."""
print("Starting Postgres container...")
with PostgresContainer("postgres:latest") as postgres:
settings.DB_URL = postgres.get_connection_url()
print(f"Postgres container started at {settings.DB_URL}")
yield postgres
print("Postgres container stopped.")
@pytest.fixture(scope="function")
def db(postgres_container) -> Generator[Session, None, None]:
"""Function-scoped database session with rollback"""
SessionLocal = get_sessionmaker()
session = SessionLocal()
session.begin_nested() # Enable nested transaction
try:
yield session
finally:
session.rollback()
session.close()
@pytest.fixture(scope="function")
def client(db: Session) -> Generator[TestClient, None, None]:
"""Function-scoped test client with dependency override"""
from main import app
# Override the database dependency
def override_get_db():
try:
yield db
finally:
pass # Don't close session here
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as test_client:
yield test_client
app.dependency_overrides.clear()
def override_dependency(dependency: Callable[..., Any], mocked_response: Any) -> None:
from main import app
app.dependency_overrides[dependency] = lambda: mocked_response

View File

@@ -0,0 +1,42 @@
from datetime import timedelta
import uuid as uuid_pkg
from sqlalchemy.orm import Session
from core.config import settings
from modules.auth.models import User
from modules.auth.security import authenticate_user, create_access_token, create_refresh_token, hash_password
from modules.auth.schemas import UserRole
from tests.conftest import fake
def create_user(db: Session, is_admin: bool = False) -> User:
unhashed_password = fake.password()
_user = User(
name=fake.name(),
username=fake.user_name(),
hashed_password=hash_password(unhashed_password),
uuid=uuid_pkg.uuid4(),
role=UserRole.ADMIN if is_admin else UserRole.USER,
)
db.add(_user)
db.commit()
db.refresh(_user)
return _user, unhashed_password # return for testing
def login(db: Session, username: str, password: str) -> str:
user = authenticate_user(username, password, db)
if not user:
raise Exception("Incorrect username or password")
access_token = create_access_token(data={"sub": user.username}, expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
refresh_token = create_refresh_token(data={"sub": user.username})
max_age = settings.REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
return {
"access_token": access_token,
"refresh_token": refresh_token,
"max_age": max_age,
}

180
backend/tests/test_auth.py Normal file
View File

@@ -0,0 +1,180 @@
# Main test file for the authentication process.
# uses conftest -> db_session as an in-memory db.
# Goes through the whole authentication process:
# 1. Register a user
# 2. Login the user
# 3. Refresh the token
# 4. Logout the user
# 5. Verify that the user is logged out
# 6. Verify that the user cannot refresh the token
# 7. Verify that the user cannot login again
# 8. Verify that the user cannot register again
# 9. Verify that the user cannot access protected routes (/admin)
import time
from fastapi import status
from fastapi.testclient import TestClient
from sqlalchemy.orm import Session
from modules.auth.models import TokenBlacklist, User
from tests.conftest import fake
from .helpers import generators
def test_register(client: TestClient) -> None:
response = client.post(
"/api/auth/register",
json={
"username": fake.user_name(),
"password": fake.password(),
"name": fake.name(),
},
)
assert response.status_code == status.HTTP_201_CREATED
def test_login(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
response = client.post(
"/api/auth/login",
data={
"username": user.username,
"password": unhashed_password,
},
)
assert response.status_code == status.HTTP_200_OK
response_data = response.json()
assert "access_token" in response_data
assert "token_type" in response_data
assert response_data["token_type"] == "bearer"
def test_refresh_token(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
rsp = generators.login(db, user.username, unhashed_password)
access_token = rsp["access_token"]
refresh_token = rsp["refresh_token"]
time.sleep(1) # Sleep to ensure tokens won't be identical
response = client.post(
"/api/auth/refresh",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_200_OK
response_data = response.json()
assert "access_token" in response_data
assert "token_type" in response_data
assert response_data["token_type"] == "bearer"
assert response_data["access_token"] != access_token # Ensure the token is refreshed
def test_logout(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
rsp = generators.login(db, user.username, unhashed_password)
access_token = rsp["access_token"]
refresh_token = rsp["refresh_token"]
response = client.post(
"/api/auth/logout",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_200_OK
# Verify that the token is blacklisted
blacklisted_token = db.query(TokenBlacklist).filter(TokenBlacklist.token == access_token).first()
assert blacklisted_token is not None
# Verify that we can't still actually do anything
response = client.get(
"/api/user/me",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
response = client.post(
"/api/auth/refresh",
cookies={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_get_me(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
response = client.get(
"/api/user/me",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
response_data = response.json()
assert response_data["uuid"] == user.uuid
assert response_data["username"] == user.username
def test_get_me_unauthorized(client: TestClient) -> None:
### This test should fail (unauthorized) because the user isn't logged in
response = client.get("/api/user/me")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_get_user(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
response = client.get(
f"/api/user/{user.username}",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
response_data = response.json()
assert response_data["uuid"] == user.uuid
assert response_data["username"] == user.username
def test_get_user_unauthorized(db: Session, client: TestClient) -> None:
### This test should fail (unauthorized) because the user isn't us
user, unhashed_password = generators.create_user(db)
user2, _ = generators.create_user(db)
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
response = client.get(
f"/api/user/{user2.username}",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_update_user(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
new_name = fake.name()
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
response = client.patch(
f"/api/user/{user.username}",
headers={"Authorization": f"Bearer {access_token}"},
json={"name": new_name},
)
assert response.status_code == status.HTTP_200_OK
response_data = response.json()
assert response_data["name"] == new_name
def test_delete_user(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
response = client.delete(
f"/api/user/{user.username}",
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
# Verify that the user is deleted
deleted_user = db.query(User).filter(User.username == user.username).first()
assert deleted_user is None