[REFORMAT] Ran black reformat

This commit is contained in:
c-d-p
2025-04-23 01:00:56 +02:00
parent d5d0a24403
commit 1553004efc
38 changed files with 1005 additions and 384 deletions

View File

@@ -12,6 +12,7 @@ from core.database import get_db, get_sessionmaker
fake = Faker()
@pytest.fixture(scope="session")
def postgres_container() -> Generator[PostgresContainer, None, None]:
"""Fixture to create a PostgreSQL container for testing."""
@@ -21,7 +22,8 @@ def postgres_container() -> Generator[PostgresContainer, None, None]:
print(f"Postgres container started at {settings.DB_URL}")
yield postgres
print("Postgres container stopped.")
@pytest.fixture(scope="function")
def db(postgres_container) -> Generator[Session, None, None]:
"""Function-scoped database session with rollback"""
@@ -34,25 +36,28 @@ def db(postgres_container) -> Generator[Session, None, None]:
session.rollback()
session.close()
@pytest.fixture(scope="function")
def client(db: Session) -> Generator[TestClient, None, None]:
"""Function-scoped test client with dependency override"""
from main import app
# Override the database dependency
def override_get_db():
try:
yield db
finally:
pass # Don't close session here
app.dependency_overrides[get_db] = override_get_db
with TestClient(app) as test_client:
yield test_client
app.dependency_overrides.clear()
def override_dependency(dependency: Callable[..., Any], mocked_response: Any) -> None:
from main import app
app.dependency_overrides[dependency] = lambda: mocked_response
app.dependency_overrides[dependency] = lambda: mocked_response

View File

@@ -5,17 +5,24 @@ from sqlalchemy.orm import Session
from core.config import settings
from modules.auth.models import User
from modules.auth.security import authenticate_user, create_access_token, create_refresh_token, hash_password
from modules.auth.security import (
authenticate_user,
create_access_token,
create_refresh_token,
hash_password,
)
from modules.auth.schemas import UserRole
from tests.conftest import fake
from typing import Optional # Import Optional
from typing import Optional # Import Optional
def create_user(db: Session, is_admin: bool = False, username: Optional[str] = None) -> User:
def create_user(
db: Session, is_admin: bool = False, username: Optional[str] = None
) -> User:
unhashed_password = fake.password()
_user = User(
name=fake.name(),
username=username or fake.user_name(), # Use provided username or generate one
username=username or fake.user_name(), # Use provided username or generate one
hashed_password=hash_password(unhashed_password),
uuid=uuid_pkg.uuid4(),
role=UserRole.ADMIN if is_admin else UserRole.USER,
@@ -24,14 +31,18 @@ def create_user(db: Session, is_admin: bool = False, username: Optional[str] = N
db.add(_user)
db.commit()
db.refresh(_user)
return _user, unhashed_password # return for testing
return _user, unhashed_password # return for testing
def login(db: Session, username: str, password: str) -> str:
user = authenticate_user(username, password, db)
if not user:
raise Exception("Incorrect username or password")
access_token = create_access_token(data={"sub": user.username}, expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES))
access_token = create_access_token(
data={"sub": user.username},
expires_delta=timedelta(minutes=settings.ACCESS_TOKEN_EXPIRE_MINUTES),
)
refresh_token = create_refresh_token(data={"sub": user.username})
max_age = settings.REFRESH_TOKEN_EXPIRE_DAYS * 24 * 60 * 60
@@ -40,4 +51,4 @@ def login(db: Session, username: str, password: str) -> str:
"access_token": access_token,
"refresh_token": refresh_token,
"max_age": max_age,
}
}

View File

@@ -7,71 +7,93 @@ from tests.helpers import generators
# Test admin routes require admin privileges
def test_read_admin_unauthorized(client: TestClient) -> None:
"""Test accessing admin route without authentication."""
response = client.get("/api/admin/")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_read_admin_forbidden(db: Session, client: TestClient) -> None:
"""Test accessing admin route as a non-admin user."""
user, password = generators.create_user(db, is_admin=False) # Use is_admin=False
user, password = generators.create_user(db, is_admin=False) # Use is_admin=False
login_rsp = generators.login(db, user.username, password)
access_token = login_rsp["access_token"]
response = client.get("/api/admin/", headers={"Authorization": f"Bearer {access_token}"})
response = client.get(
"/api/admin/", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_read_admin_success(db: Session, client: TestClient) -> None:
"""Test accessing admin route as an admin user."""
admin_user, password = generators.create_user(db, is_admin=True) # Use is_admin=True
admin_user, password = generators.create_user(
db, is_admin=True
) # Use is_admin=True
login_rsp = generators.login(db, admin_user.username, password)
access_token = login_rsp["access_token"]
response = client.get("/api/admin/", headers={"Authorization": f"Bearer {access_token}"})
response = client.get(
"/api/admin/", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == status.HTTP_200_OK
assert response.json() == {"message": "Admin route"}
@patch("modules.admin.api.cleardb.delay") # Mock the celery task
@patch("modules.admin.api.cleardb.delay") # Mock the celery task
def test_clear_db_soft(mock_cleardb_delay, db: Session, client: TestClient) -> None:
"""Test soft clearing the database as admin."""
admin_user, password = generators.create_user(db, is_admin=True) # Use is_admin=True
admin_user, password = generators.create_user(
db, is_admin=True
) # Use is_admin=True
login_rsp = generators.login(db, admin_user.username, password)
access_token = login_rsp["access_token"]
response = client.post(
"/api/admin/cleardb",
headers={"Authorization": f"Bearer {access_token}"},
json={"hard": False}
json={"hard": False},
)
assert response.status_code == status.HTTP_200_OK
assert response.json() == {"message": "Clearing database in the background", "hard": False}
assert response.json() == {
"message": "Clearing database in the background",
"hard": False,
}
mock_cleardb_delay.assert_called_once_with(False)
@patch("modules.admin.api.cleardb.delay") # Mock the celery task
@patch("modules.admin.api.cleardb.delay") # Mock the celery task
def test_clear_db_hard(mock_cleardb_delay, db: Session, client: TestClient) -> None:
"""Test hard clearing the database as admin."""
admin_user, password = generators.create_user(db, is_admin=True) # Use is_admin=True
admin_user, password = generators.create_user(
db, is_admin=True
) # Use is_admin=True
login_rsp = generators.login(db, admin_user.username, password)
access_token = login_rsp["access_token"]
response = client.post(
"/api/admin/cleardb",
headers={"Authorization": f"Bearer {access_token}"},
json={"hard": True}
json={"hard": True},
)
assert response.status_code == status.HTTP_200_OK
assert response.json() == {"message": "Clearing database in the background", "hard": True}
assert response.json() == {
"message": "Clearing database in the background",
"hard": True,
}
mock_cleardb_delay.assert_called_once_with(True)
def test_clear_db_forbidden(db: Session, client: TestClient) -> None:
"""Test clearing the database as a non-admin user."""
user, password = generators.create_user(db, is_admin=False) # Use is_admin=False
user, password = generators.create_user(db, is_admin=False) # Use is_admin=False
login_rsp = generators.login(db, user.username, password)
access_token = login_rsp["access_token"]
response = client.post(
"/api/admin/cleardb",
headers={"Authorization": f"Bearer {access_token}"},
json={"hard": False}
json={"hard": False},
)
assert response.status_code == status.HTTP_403_FORBIDDEN

View File

@@ -34,6 +34,7 @@ def test_register(client: TestClient) -> None:
)
assert response.status_code == status.HTTP_201_CREATED
def test_login(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
@@ -51,17 +52,21 @@ def test_login(db: Session, client: TestClient) -> None:
assert "token_type" in response_data
assert response_data["token_type"] == "bearer"
def test_refresh_token(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
rsp = generators.login(db, user.username, unhashed_password)
access_token = rsp["access_token"]
refresh_token = rsp["refresh_token"]
time.sleep(1) # Sleep to ensure tokens won't be identical
time.sleep(1) # Sleep to ensure tokens won't be identical
response = client.post(
"/api/auth/refresh",
headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"},
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
},
json={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_200_OK
@@ -70,7 +75,10 @@ def test_refresh_token(db: Session, client: TestClient) -> None:
assert "access_token" in response_data
assert "token_type" in response_data
assert response_data["token_type"] == "bearer"
assert response_data["access_token"] != access_token # Ensure the token is refreshed
assert (
response_data["access_token"] != access_token
) # Ensure the token is refreshed
def test_logout(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
@@ -80,15 +88,20 @@ def test_logout(db: Session, client: TestClient) -> None:
response = client.post(
"/api/auth/logout",
headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"},
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
},
json={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_200_OK
# Verify that the token is blacklisted
blacklisted_token = db.query(TokenBlacklist).filter(TokenBlacklist.token == access_token).first()
blacklisted_token = (
db.query(TokenBlacklist).filter(TokenBlacklist.token == access_token).first()
)
assert blacklisted_token is not None
# Verify that we can't still actually do anything
response = client.get(
"/api/user/me",
@@ -98,7 +111,10 @@ def test_logout(db: Session, client: TestClient) -> None:
response = client.post(
"/api/auth/refresh",
headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"},
headers={
"Authorization": f"Bearer {access_token}",
"Content-Type": "application/json",
},
json={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
@@ -106,7 +122,9 @@ def test_logout(db: Session, client: TestClient) -> None:
def test_get_me(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
access_token = generators.login(db, user.username, unhashed_password)[
"access_token"
]
response = client.get(
"/api/user/me",
@@ -119,14 +137,18 @@ def test_get_me(db: Session, client: TestClient) -> None:
assert response_data["uuid"] == user.uuid
assert response_data["username"] == user.username
def test_get_me_unauthorized(client: TestClient) -> None:
### This test should fail (unauthorized) because the user isn't logged in
response = client.get("/api/user/me")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_get_user(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
access_token = generators.login(db, user.username, unhashed_password)[
"access_token"
]
response = client.get(
f"/api/user/{user.username}",
@@ -139,11 +161,14 @@ def test_get_user(db: Session, client: TestClient) -> None:
assert response_data["uuid"] == user.uuid
assert response_data["username"] == user.username
def test_get_user_unauthorized(db: Session, client: TestClient) -> None:
### This test should fail (unauthorized) because the user isn't us
user, unhashed_password = generators.create_user(db)
user2, _ = generators.create_user(db)
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
access_token = generators.login(db, user.username, unhashed_password)[
"access_token"
]
response = client.get(
f"/api/user/{user2.username}",
@@ -151,11 +176,14 @@ def test_get_user_unauthorized(db: Session, client: TestClient) -> None:
)
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_update_user(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
new_name = fake.name()
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
access_token = generators.login(db, user.username, unhashed_password)[
"access_token"
]
response = client.patch(
f"/api/user/{user.username}",
headers={"Authorization": f"Bearer {access_token}"},
@@ -168,7 +196,9 @@ def test_update_user(db: Session, client: TestClient) -> None:
def test_delete_user(db: Session, client: TestClient) -> None:
user, unhashed_password = generators.create_user(db)
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
access_token = generators.login(db, user.username, unhashed_password)[
"access_token"
]
response = client.delete(
f"/api/user/{user.username}",
headers={"Authorization": f"Bearer {access_token}"},
@@ -179,6 +209,7 @@ def test_delete_user(db: Session, client: TestClient) -> None:
deleted_user = db.query(User).filter(User.username == user.username).first()
assert deleted_user is None
def test_get_user_forbidden(db: Session, client: TestClient) -> None:
"""Test getting another user's profile (should be forbidden)."""
user1, password_user1 = generators.create_user(db, username="user1_get_forbidden")
@@ -195,9 +226,12 @@ def test_get_user_forbidden(db: Session, client: TestClient) -> None:
)
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_update_user_forbidden(db: Session, client: TestClient) -> None:
"""Test updating another user's profile (should be forbidden)."""
user1, password_user1 = generators.create_user(db, username="user1_update_forbidden")
user1, password_user1 = generators.create_user(
db, username="user1_update_forbidden"
)
user2, _ = generators.create_user(db, username="user2_update_forbidden")
new_name = fake.name()
@@ -213,9 +247,12 @@ def test_update_user_forbidden(db: Session, client: TestClient) -> None:
)
assert response.status_code == status.HTTP_403_FORBIDDEN
def test_delete_user_forbidden(db: Session, client: TestClient) -> None:
"""Test deleting another user's profile (should be forbidden)."""
user1, password_user1 = generators.create_user(db, username="user1_delete_forbidden")
user1, password_user1 = generators.create_user(
db, username="user1_delete_forbidden"
)
user2, _ = generators.create_user(db, username="user2_delete_forbidden")
# Log in as user1

View File

@@ -4,9 +4,10 @@ from sqlalchemy.orm import Session
from datetime import datetime, timedelta
from tests.helpers import generators
from modules.calendar.models import CalendarEvent # Assuming model exists
from modules.calendar.models import CalendarEvent # Assuming model exists
from tests.conftest import fake
# Helper function to create an event payload
def create_event_payload(start_offset_days=0, end_offset_days=1):
start_time = datetime.utcnow() + timedelta(days=start_offset_days)
@@ -14,19 +15,22 @@ def create_event_payload(start_offset_days=0, end_offset_days=1):
return {
"title": fake.sentence(nb_words=3),
"description": fake.text(),
"start": start_time.isoformat(), # Rename start_time to start
"end": end_time.isoformat(), # Rename end_time to end
"start": start_time.isoformat(), # Rename start_time to start
"end": end_time.isoformat(), # Rename end_time to end
"all_day": fake.boolean(),
}
# --- Test Create Event ---
def test_create_event_unauthorized(client: TestClient) -> None:
"""Test creating an event without authentication."""
payload = create_event_payload()
response = client.post("/api/calendar/events", json=payload)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_create_event_success(db: Session, client: TestClient) -> None:
"""Test creating a calendar event successfully."""
user, password = generators.create_user(db)
@@ -37,9 +41,11 @@ def test_create_event_success(db: Session, client: TestClient) -> None:
response = client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
json=payload
json=payload,
)
assert response.status_code == status.HTTP_201_CREATED # Change expected status to 201
assert (
response.status_code == status.HTTP_201_CREATED
) # Change expected status to 201
data = response.json()
assert data["title"] == payload["title"]
assert data["description"] == payload["description"]
@@ -56,13 +62,16 @@ def test_create_event_success(db: Session, client: TestClient) -> None:
assert event_in_db.user_id == user.id
assert event_in_db.title == payload["title"]
# --- Test Get Events ---
def test_get_events_unauthorized(client: TestClient) -> None:
"""Test getting events without authentication."""
response = client.get("/api/calendar/events")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_get_events_success(db: Session, client: TestClient) -> None:
"""Test getting all calendar events for a user."""
user, password = generators.create_user(db)
@@ -71,21 +80,31 @@ def test_get_events_success(db: Session, client: TestClient) -> None:
# Create a couple of events for the user
payload1 = create_event_payload(0, 1)
client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token}"}, json=payload1)
client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
json=payload1,
)
payload2 = create_event_payload(2, 3)
client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token}"}, json=payload2)
client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
json=payload2,
)
# Create an event for another user (should not be returned)
other_user, other_password = generators.create_user(db)
other_login_rsp = generators.login(db, other_user.username, other_password)
other_access_token = other_login_rsp["access_token"]
other_payload = create_event_payload(4, 5)
client.post("/api/calendar/events", headers={"Authorization": f"Bearer {other_access_token}"}, json=other_payload)
client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {other_access_token}"},
json=other_payload,
)
response = client.get(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"}
"/api/calendar/events", headers={"Authorization": f"Bearer {access_token}"}
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
@@ -103,12 +122,24 @@ def test_get_events_filtered(db: Session, client: TestClient) -> None:
access_token = login_rsp["access_token"]
# Create events
payload1 = create_event_payload(0, 1) # Today -> Tomorrow
client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token}"}, json=payload1)
payload2 = create_event_payload(5, 6) # In 5 days -> In 6 days
client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token}"}, json=payload2)
payload3 = create_event_payload(10, 11) # In 10 days -> In 11 days
client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token}"}, json=payload3)
payload1 = create_event_payload(0, 1) # Today -> Tomorrow
client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
json=payload1,
)
payload2 = create_event_payload(5, 6) # In 5 days -> In 6 days
client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
json=payload2,
)
payload3 = create_event_payload(10, 11) # In 10 days -> In 11 days
client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
json=payload3,
)
# Filter for events starting within the next week
start_filter = datetime.utcnow().isoformat()
@@ -117,11 +148,11 @@ def test_get_events_filtered(db: Session, client: TestClient) -> None:
response = client.get(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
params={"start": start_filter, "end": end_filter}
params={"start": start_filter, "end": end_filter},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert len(data) == 2 # Should get event 1 and 2
assert len(data) == 2 # Should get event 1 and 2
assert data[0]["title"] == payload1["title"]
assert data[1]["title"] == payload2["title"]
@@ -130,40 +161,50 @@ def test_get_events_filtered(db: Session, client: TestClient) -> None:
response = client.get(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
params={"start": start_filter_late}
params={"start": start_filter_late},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert len(data) == 1 # Should get event 3
assert len(data) == 1 # Should get event 3
assert data[0]["title"] == payload3["title"]
# --- Test Get Event By ID ---
def test_get_event_by_id_unauthorized(db: Session, client: TestClient) -> None:
"""Test getting a specific event without authentication."""
user, password = generators.create_user(db)
login_rsp = generators.login(db, user.username, password)
access_token = login_rsp["access_token"]
payload = create_event_payload()
create_response = client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token}"}, json=payload)
create_response = client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
json=payload,
)
event_id = create_response.json()["id"]
response = client.get(f"/api/calendar/events/{event_id}")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_get_event_by_id_success(db: Session, client: TestClient) -> None:
"""Test getting a specific event successfully."""
user, password = generators.create_user(db)
login_rsp = generators.login(db, user.username, password)
access_token = login_rsp["access_token"]
payload = create_event_payload()
create_response = client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token}"}, json=payload)
create_response = client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
json=payload,
)
event_id = create_response.json()["id"]
response = client.get(
f"/api/calendar/events/{event_id}",
headers={"Authorization": f"Bearer {access_token}"}
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
@@ -171,6 +212,7 @@ def test_get_event_by_id_success(db: Session, client: TestClient) -> None:
assert data["title"] == payload["title"]
assert data["user_id"] == user.id
def test_get_event_by_id_not_found(db: Session, client: TestClient) -> None:
"""Test getting a non-existent event."""
user, password = generators.create_user(db)
@@ -180,10 +222,11 @@ def test_get_event_by_id_not_found(db: Session, client: TestClient) -> None:
response = client.get(
f"/api/calendar/events/{non_existent_id}",
headers={"Authorization": f"Bearer {access_token}"}
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_get_event_by_id_forbidden(db: Session, client: TestClient) -> None:
"""Test getting another user's event."""
user1, password_user1 = generators.create_user(db)
@@ -193,7 +236,11 @@ def test_get_event_by_id_forbidden(db: Session, client: TestClient) -> None:
login_rsp1 = generators.login(db, user1.username, password_user1)
access_token1 = login_rsp1["access_token"]
payload = create_event_payload()
create_response = client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token1}"}, json=payload)
create_response = client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token1}"},
json=payload,
)
event_id = create_response.json()["id"]
# Log in as user2 and try to get user1's event
@@ -202,45 +249,60 @@ def test_get_event_by_id_forbidden(db: Session, client: TestClient) -> None:
response = client.get(
f"/api/calendar/events/{event_id}",
headers={"Authorization": f"Bearer {access_token2}"}
headers={"Authorization": f"Bearer {access_token2}"},
)
assert response.status_code == status.HTTP_404_NOT_FOUND # Service layer returns 404 if user_id doesn't match
assert (
response.status_code == status.HTTP_404_NOT_FOUND
) # Service layer returns 404 if user_id doesn't match
# --- Test Update Event ---
def test_update_event_unauthorized(db: Session, client: TestClient) -> None:
"""Test updating an event without authentication."""
user, password = generators.create_user(db)
login_rsp = generators.login(db, user.username, password)
access_token = login_rsp["access_token"]
payload = create_event_payload()
create_response = client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token}"}, json=payload)
create_response = client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
json=payload,
)
event_id = create_response.json()["id"]
update_payload = {"title": "Updated Title"}
response = client.patch(f"/api/calendar/events/{event_id}", json=update_payload)
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_update_event_success(db: Session, client: TestClient) -> None:
"""Test updating an event successfully."""
user, password = generators.create_user(db)
login_rsp = generators.login(db, user.username, password)
access_token = login_rsp["access_token"]
payload = create_event_payload()
create_response = client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token}"}, json=payload)
assert create_response.status_code == status.HTTP_201_CREATED # Ensure creation check uses 201
create_response = client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
json=payload,
)
assert (
create_response.status_code == status.HTTP_201_CREATED
) # Ensure creation check uses 201
event_id = create_response.json()["id"]
update_payload = {
"title": "Updated Title",
"description": "Updated description.",
"all_day": not payload["all_day"] # Toggle all_day
"all_day": not payload["all_day"], # Toggle all_day
}
response = client.patch(
f"/api/calendar/events/{event_id}",
headers={"Authorization": f"Bearer {access_token}"},
json=update_payload
json=update_payload,
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
@@ -248,7 +310,7 @@ def test_update_event_success(db: Session, client: TestClient) -> None:
assert data["title"] == update_payload["title"]
assert data["description"] == update_payload["description"]
assert data["all_day"] == update_payload["all_day"]
assert data["start"] == payload["start"] # Check correct field name 'start'
assert data["start"] == payload["start"] # Check correct field name 'start'
assert data["user_id"] == user.id
# Verify in DB
@@ -258,6 +320,7 @@ def test_update_event_success(db: Session, client: TestClient) -> None:
assert event_in_db.description == update_payload["description"]
assert event_in_db.all_day == update_payload["all_day"]
def test_update_event_not_found(db: Session, client: TestClient) -> None:
"""Test updating a non-existent event."""
user, password = generators.create_user(db)
@@ -269,10 +332,11 @@ def test_update_event_not_found(db: Session, client: TestClient) -> None:
response = client.patch(
f"/api/calendar/events/{non_existent_id}",
headers={"Authorization": f"Bearer {access_token}"},
json=update_payload
json=update_payload,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_update_event_forbidden(db: Session, client: TestClient) -> None:
"""Test updating another user's event."""
user1, password_user1 = generators.create_user(db)
@@ -282,7 +346,11 @@ def test_update_event_forbidden(db: Session, client: TestClient) -> None:
login_rsp1 = generators.login(db, user1.username, password_user1)
access_token1 = login_rsp1["access_token"]
payload = create_event_payload()
create_response = client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token1}"}, json=payload)
create_response = client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token1}"},
json=payload,
)
event_id = create_response.json()["id"]
# Log in as user2 and try to update user1's event
@@ -293,32 +361,47 @@ def test_update_event_forbidden(db: Session, client: TestClient) -> None:
response = client.patch(
f"/api/calendar/events/{event_id}",
headers={"Authorization": f"Bearer {access_token2}"},
json=update_payload
json=update_payload,
)
assert response.status_code == status.HTTP_404_NOT_FOUND # Service layer returns 404 if user_id doesn't match
assert (
response.status_code == status.HTTP_404_NOT_FOUND
) # Service layer returns 404 if user_id doesn't match
# --- Test Delete Event ---
def test_delete_event_unauthorized(db: Session, client: TestClient) -> None:
"""Test deleting an event without authentication."""
user, password = generators.create_user(db)
login_rsp = generators.login(db, user.username, password)
access_token = login_rsp["access_token"]
payload = create_event_payload()
create_response = client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token}"}, json=payload)
create_response = client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
json=payload,
)
event_id = create_response.json()["id"]
response = client.delete(f"/api/calendar/events/{event_id}")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_delete_event_success(db: Session, client: TestClient) -> None:
"""Test deleting an event successfully."""
user, password = generators.create_user(db)
login_rsp = generators.login(db, user.username, password)
access_token = login_rsp["access_token"]
payload = create_event_payload()
create_response = client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token}"}, json=payload)
assert create_response.status_code == status.HTTP_201_CREATED # Ensure creation check uses 201
create_response = client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token}"},
json=payload,
)
assert (
create_response.status_code == status.HTTP_201_CREATED
) # Ensure creation check uses 201
event_id = create_response.json()["id"]
# Verify event exists before delete
@@ -327,7 +410,7 @@ def test_delete_event_success(db: Session, client: TestClient) -> None:
response = client.delete(
f"/api/calendar/events/{event_id}",
headers={"Authorization": f"Bearer {access_token}"}
headers={"Authorization": f"Bearer {access_token}"},
)
assert response.status_code == status.HTTP_204_NO_CONTENT
@@ -338,7 +421,7 @@ def test_delete_event_success(db: Session, client: TestClient) -> None:
# Try getting the deleted event (should be 404)
get_response = client.get(
f"/api/calendar/events/{event_id}",
headers={"Authorization": f"Bearer {access_token}"}
headers={"Authorization": f"Bearer {access_token}"},
)
assert get_response.status_code == status.HTTP_404_NOT_FOUND
@@ -352,7 +435,7 @@ def test_delete_event_not_found(db: Session, client: TestClient) -> None:
response = client.delete(
f"/api/calendar/events/{non_existent_id}",
headers={"Authorization": f"Bearer {access_token}"}
headers={"Authorization": f"Bearer {access_token}"},
)
# The service layer raises NotFound, which should result in 404
assert response.status_code == status.HTTP_404_NOT_FOUND
@@ -367,7 +450,11 @@ def test_delete_event_forbidden(db: Session, client: TestClient) -> None:
login_rsp1 = generators.login(db, user1.username, password_user1)
access_token1 = login_rsp1["access_token"]
payload = create_event_payload()
create_response = client.post("/api/calendar/events", headers={"Authorization": f"Bearer {access_token1}"}, json=payload)
create_response = client.post(
"/api/calendar/events",
headers={"Authorization": f"Bearer {access_token1}"},
json=payload,
)
event_id = create_response.json()["id"]
# Log in as user2 and try to delete user1's event
@@ -376,7 +463,7 @@ def test_delete_event_forbidden(db: Session, client: TestClient) -> None:
response = client.delete(
f"/api/calendar/events/{event_id}",
headers={"Authorization": f"Bearer {access_token2}"}
headers={"Authorization": f"Bearer {access_token2}"},
)
# The service layer raises NotFound if user_id doesn't match, resulting in 404
assert response.status_code == status.HTTP_404_NOT_FOUND
@@ -385,4 +472,3 @@ def test_delete_event_forbidden(db: Session, client: TestClient) -> None:
event_in_db = db.query(CalendarEvent).filter(CalendarEvent.id == event_id).first()
assert event_in_db is not None
assert event_in_db.user_id == user1.id

View File

@@ -2,6 +2,7 @@ from fastapi.testclient import TestClient
# No database needed for this simple test
def test_health_check(client: TestClient):
"""Test the health check endpoint."""
response = client.get("/api/health")

View File

@@ -7,24 +7,37 @@ from datetime import datetime
from tests.helpers import generators
from modules.nlp.schemas import ProcessCommandResponse
from modules.nlp.models import MessageSender, ChatMessage # Import necessary models/enums
from modules.nlp.models import (
MessageSender,
ChatMessage,
) # Import necessary models/enums
# --- Mocks ---
# Mock the external AI call and internal service functions
@pytest.fixture(autouse=True)
def mock_nlp_services():
with patch("modules.nlp.api.process_request") as mock_process, \
patch("modules.nlp.api.ask_ai") as mock_ask, \
patch("modules.nlp.api.save_chat_message") as mock_save, \
patch("modules.nlp.api.get_chat_history") as mock_get_history, \
patch("modules.nlp.api.create_calendar_event") as mock_create_event, \
patch("modules.nlp.api.get_calendar_events") as mock_get_events, \
patch("modules.nlp.api.update_calendar_event") as mock_update_event, \
patch("modules.nlp.api.delete_calendar_event") as mock_delete_event, \
patch("modules.nlp.api.todo_service.create_todo") as mock_create_todo, \
patch("modules.nlp.api.todo_service.get_todos") as mock_get_todos, \
patch("modules.nlp.api.todo_service.update_todo") as mock_update_todo, \
patch("modules.nlp.api.todo_service.delete_todo") as mock_delete_todo:
with patch("modules.nlp.api.process_request") as mock_process, patch(
"modules.nlp.api.ask_ai"
) as mock_ask, patch("modules.nlp.api.save_chat_message") as mock_save, patch(
"modules.nlp.api.get_chat_history"
) as mock_get_history, patch(
"modules.nlp.api.create_calendar_event"
) as mock_create_event, patch(
"modules.nlp.api.get_calendar_events"
) as mock_get_events, patch(
"modules.nlp.api.update_calendar_event"
) as mock_update_event, patch(
"modules.nlp.api.delete_calendar_event"
) as mock_delete_event, patch(
"modules.nlp.api.todo_service.create_todo"
) as mock_create_todo, patch(
"modules.nlp.api.todo_service.get_todos"
) as mock_get_todos, patch(
"modules.nlp.api.todo_service.update_todo"
) as mock_update_todo, patch(
"modules.nlp.api.todo_service.delete_todo"
) as mock_delete_todo:
mocks = {
"process_request": mock_process,
"ask_ai": mock_ask,
@@ -41,21 +54,24 @@ def mock_nlp_services():
}
yield mocks
# --- Helper Function ---
def _login_user(db: Session, client: TestClient):
user, password = generators.create_user(db)
login_rsp = generators.login(db, user.username, password)
return user, login_rsp["access_token"], login_rsp["refresh_token"]
# --- Tests for /process-command ---
def test_process_command_ask_ai(client: TestClient, db: Session, mock_nlp_services):
user, access_token, refresh_token = _login_user(db, client)
user_input = "What is the capital of France?"
mock_nlp_services["process_request"].return_value = {
"intent": "ask_ai",
"params": {"request": user_input},
"response_text": "Let me check that for you."
"response_text": "Let me check that for you.",
}
mock_nlp_services["ask_ai"].return_value = "The capital of France is Paris."
@@ -63,25 +79,45 @@ def test_process_command_ask_ai(client: TestClient, db: Session, mock_nlp_servic
"/api/nlp/process-command",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json={"user_input": user_input}
json={"user_input": user_input},
)
assert response.status_code == status.HTTP_200_OK
assert response.json() == ProcessCommandResponse(responses=["Let me check that for you.", "The capital of France is Paris."]).model_dump()
assert (
response.json()
== ProcessCommandResponse(
responses=["Let me check that for you.", "The capital of France is Paris."]
).model_dump()
)
# Verify save calls: user message, initial AI response, final AI answer
assert mock_nlp_services["save_chat_message"].call_count == 3
mock_nlp_services["save_chat_message"].assert_any_call(db, user_id=user.id, sender=MessageSender.USER, text=user_input)
mock_nlp_services["save_chat_message"].assert_any_call(db, user_id=user.id, sender=MessageSender.AI, text="Let me check that for you.")
mock_nlp_services["save_chat_message"].assert_any_call(db, user_id=user.id, sender=MessageSender.AI, text="The capital of France is Paris.")
mock_nlp_services["save_chat_message"].assert_any_call(
db, user_id=user.id, sender=MessageSender.USER, text=user_input
)
mock_nlp_services["save_chat_message"].assert_any_call(
db, user_id=user.id, sender=MessageSender.AI, text="Let me check that for you."
)
mock_nlp_services["save_chat_message"].assert_any_call(
db,
user_id=user.id,
sender=MessageSender.AI,
text="The capital of France is Paris.",
)
mock_nlp_services["ask_ai"].assert_called_once_with(request=user_input)
def test_process_command_get_calendar(client: TestClient, db: Session, mock_nlp_services):
def test_process_command_get_calendar(
client: TestClient, db: Session, mock_nlp_services
):
user, access_token, refresh_token = _login_user(db, client)
user_input = "What are my events today?"
mock_nlp_services["process_request"].return_value = {
"intent": "get_calendar_events",
"params": {"start": "2024-01-01T00:00:00Z", "end": "2024-01-01T23:59:59Z"}, # Example params
"response_text": "Okay, fetching your events."
"params": {
"start": "2024-01-01T00:00:00Z",
"end": "2024-01-01T23:59:59Z",
}, # Example params
"response_text": "Okay, fetching your events.",
}
# Mock the actual event model returned by the service
mock_event = MagicMock()
@@ -94,26 +130,32 @@ def test_process_command_get_calendar(client: TestClient, db: Session, mock_nlp_
"/api/nlp/process-command",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json={"user_input": user_input}
json={"user_input": user_input},
)
assert response.status_code == status.HTTP_200_OK
expected_responses = [
"Okay, fetching your events.",
"Here are the events:",
"- Team Meeting (2024-01-01 10:00 - 11:00)"
"- Team Meeting (2024-01-01 10:00 - 11:00)",
]
assert response.json() == ProcessCommandResponse(responses=expected_responses).model_dump()
assert mock_nlp_services["save_chat_message"].call_count == 4 # User, Initial AI, Header, Event
assert (
response.json()
== ProcessCommandResponse(responses=expected_responses).model_dump()
)
assert (
mock_nlp_services["save_chat_message"].call_count == 4
) # User, Initial AI, Header, Event
mock_nlp_services["get_calendar_events"].assert_called_once()
def test_process_command_add_todo(client: TestClient, db: Session, mock_nlp_services):
user, access_token, refresh_token = _login_user(db, client)
user_input = "Add buy milk to my list"
mock_nlp_services["process_request"].return_value = {
"intent": "add_todo",
"params": {"task": "buy milk"},
"response_text": "Adding it now."
"response_text": "Adding it now.",
}
# Mock the actual Todo model returned by the service
mock_todo = MagicMock()
@@ -125,81 +167,119 @@ def test_process_command_add_todo(client: TestClient, db: Session, mock_nlp_serv
"/api/nlp/process-command",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json={"user_input": user_input}
json={"user_input": user_input},
)
assert response.status_code == status.HTTP_200_OK
expected_responses = ["Adding it now.", "Added TODO: 'buy milk' (ID: 1)."]
assert response.json() == ProcessCommandResponse(responses=expected_responses).model_dump()
assert mock_nlp_services["save_chat_message"].call_count == 3 # User, Initial AI, Confirmation AI
assert (
response.json()
== ProcessCommandResponse(responses=expected_responses).model_dump()
)
assert (
mock_nlp_services["save_chat_message"].call_count == 3
) # User, Initial AI, Confirmation AI
mock_nlp_services["create_todo"].assert_called_once()
def test_process_command_clarification(client: TestClient, db: Session, mock_nlp_services):
def test_process_command_clarification(
client: TestClient, db: Session, mock_nlp_services
):
user, access_token, refresh_token = _login_user(db, client)
user_input = "Delete the event"
clarification_text = "Which event do you mean? Please provide the ID."
mock_nlp_services["process_request"].return_value = {
"intent": "clarification_needed",
"params": {"request": user_input},
"response_text": clarification_text
"response_text": clarification_text,
}
response = client.post(
"/api/nlp/process-command",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json={"user_input": user_input}
json={"user_input": user_input},
)
assert response.status_code == status.HTTP_200_OK
assert response.json() == ProcessCommandResponse(responses=[clarification_text]).model_dump()
assert (
response.json()
== ProcessCommandResponse(responses=[clarification_text]).model_dump()
)
# Verify save calls: user message, clarification AI response
assert mock_nlp_services["save_chat_message"].call_count == 2
mock_nlp_services["save_chat_message"].assert_any_call(db, user_id=user.id, sender=MessageSender.USER, text=user_input)
mock_nlp_services["save_chat_message"].assert_any_call(db, user_id=user.id, sender=MessageSender.AI, text=clarification_text)
mock_nlp_services["save_chat_message"].assert_any_call(
db, user_id=user.id, sender=MessageSender.USER, text=user_input
)
mock_nlp_services["save_chat_message"].assert_any_call(
db, user_id=user.id, sender=MessageSender.AI, text=clarification_text
)
# Ensure no action services were called
mock_nlp_services["delete_calendar_event"].assert_not_called()
def test_process_command_error_intent(client: TestClient, db: Session, mock_nlp_services):
def test_process_command_error_intent(
client: TestClient, db: Session, mock_nlp_services
):
user, access_token, refresh_token = _login_user(db, client)
user_input = "Gibberish request"
error_text = "Sorry, I didn't understand that."
mock_nlp_services["process_request"].return_value = {
"intent": "error",
"params": {},
"response_text": error_text
"response_text": error_text,
}
response = client.post(
"/api/nlp/process-command",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json={"user_input": user_input}
json={"user_input": user_input},
)
assert response.status_code == status.HTTP_200_OK
assert response.json() == ProcessCommandResponse(responses=[error_text]).model_dump()
assert (
response.json() == ProcessCommandResponse(responses=[error_text]).model_dump()
)
# Verify save calls: user message, error AI response
assert mock_nlp_services["save_chat_message"].call_count == 2
mock_nlp_services["save_chat_message"].assert_any_call(db, user_id=user.id, sender=MessageSender.USER, text=user_input)
mock_nlp_services["save_chat_message"].assert_any_call(db, user_id=user.id, sender=MessageSender.AI, text=error_text)
mock_nlp_services["save_chat_message"].assert_any_call(
db, user_id=user.id, sender=MessageSender.USER, text=user_input
)
mock_nlp_services["save_chat_message"].assert_any_call(
db, user_id=user.id, sender=MessageSender.AI, text=error_text
)
# --- Tests for /history ---
def test_get_history(client: TestClient, db: Session, mock_nlp_services):
user, access_token, refresh_token = _login_user(db, client)
# Mock the history data returned by the service
mock_history = [
ChatMessage(id=1, user_id=user.id, sender=MessageSender.USER, text="Hello", timestamp=datetime.now()),
ChatMessage(id=2, user_id=user.id, sender=MessageSender.AI, text="Hi there!", timestamp=datetime.now())
ChatMessage(
id=1,
user_id=user.id,
sender=MessageSender.USER,
text="Hello",
timestamp=datetime.now(),
),
ChatMessage(
id=2,
user_id=user.id,
sender=MessageSender.AI,
text="Hi there!",
timestamp=datetime.now(),
),
]
mock_nlp_services["get_chat_history"].return_value = mock_history
response = client.get(
"/api/nlp/history",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token}
cookies={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_200_OK
@@ -208,11 +288,15 @@ def test_get_history(client: TestClient, db: Session, mock_nlp_services):
assert len(response_data) == 2
assert response_data[0]["text"] == "Hello"
assert response_data[1]["text"] == "Hi there!"
mock_nlp_services["get_chat_history"].assert_called_once_with(db, user_id=user.id, limit=50)
mock_nlp_services["get_chat_history"].assert_called_once_with(
db, user_id=user.id, limit=50
)
def test_get_history_unauthorized(client: TestClient):
response = client.get("/api/nlp/history")
assert response.status_code == status.HTTP_401_UNAUTHORIZED
# Add more tests for other intents (update/delete calendar/todo, unknown intent, etc.)
# Add tests for error handling within the API endpoint (e.g., missing IDs for update/delete)

View File

@@ -5,14 +5,17 @@ from datetime import date
from tests.helpers import generators
# Helper Function
def _login_user(db: Session, client: TestClient):
user, password = generators.create_user(db)
login_rsp = generators.login(db, user.username, password)
return user, login_rsp["access_token"], login_rsp["refresh_token"]
# --- Test CRUD Operations ---
def test_create_todo(client: TestClient, db: Session):
user, access_token, refresh_token = _login_user(db, client)
today_date = date.today()
@@ -20,14 +23,14 @@ def test_create_todo(client: TestClient, db: Session):
todo_data = {
"task": "Test TODO",
"date": f"{today_date.isoformat()}T00:00:00",
"remind": True
"remind": True,
}
response = client.post(
"/api/todos/",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json=todo_data
json=todo_data,
)
assert response.status_code == status.HTTP_201_CREATED
@@ -35,50 +38,66 @@ def test_create_todo(client: TestClient, db: Session):
assert data["task"] == todo_data["task"]
assert data["date"] == todo_data["date"]
assert data["remind"] == todo_data["remind"]
assert data["complete"] is False # Default
assert data["complete"] is False # Default
assert "id" in data
assert data["owner_id"] == user.id
def test_read_todos(client: TestClient, db: Session):
user, access_token, refresh_token = _login_user(db, client)
# Create some todos for the user
client.post("/api/todos/", headers={"Authorization": f"Bearer {access_token}"}, cookies={"refresh_token": refresh_token}, json={"task": "Todo 1"})
client.post("/api/todos/", headers={"Authorization": f"Bearer {access_token}"}, cookies={"refresh_token": refresh_token}, json={"task": "Todo 2"})
client.post(
"/api/todos/",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json={"task": "Todo 1"},
)
client.post(
"/api/todos/",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json={"task": "Todo 2"},
)
# Create a todo for another user
other_user, other_password = generators.create_user(db)
other_login_rsp = generators.login(db, other_user.username, other_password)
other_access_token = other_login_rsp["access_token"]
other_refresh_token = other_login_rsp["refresh_token"]
client.post("/api/todos/", headers={"Authorization": f"Bearer {other_access_token}"}, cookies={"refresh_token": other_refresh_token}, json={"task": "Other User Todo"})
client.post(
"/api/todos/",
headers={"Authorization": f"Bearer {other_access_token}"},
cookies={"refresh_token": other_refresh_token},
json={"task": "Other User Todo"},
)
response = client.get(
"/api/todos/",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token}
cookies={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_200_OK
data = response.json()
assert len(data) == 2 # Should only get todos for the logged-in user
assert len(data) == 2 # Should only get todos for the logged-in user
assert data[0]["task"] == "Todo 1"
assert data[1]["task"] == "Todo 2"
def test_read_single_todo(client: TestClient, db: Session):
user, access_token, refresh_token = _login_user(db, client)
create_response = client.post(
"/api/todos/",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json={"task": "Specific Todo"}
json={"task": "Specific Todo"},
)
todo_id = create_response.json()["id"]
response = client.get(
f"/api/todos/{todo_id}",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token}
cookies={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_200_OK
@@ -87,15 +106,17 @@ def test_read_single_todo(client: TestClient, db: Session):
assert data["task"] == "Specific Todo"
assert data["owner_id"] == user.id
def test_read_single_todo_not_found(client: TestClient, db: Session):
user, access_token, refresh_token = _login_user(db, client)
response = client.get(
"/api/todos/9999", # Non-existent ID
"/api/todos/9999", # Non-existent ID
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token}
cookies={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_read_single_todo_forbidden(client: TestClient, db: Session):
user, access_token, refresh_token = _login_user(db, client)
@@ -104,16 +125,26 @@ def test_read_single_todo_forbidden(client: TestClient, db: Session):
other_login_rsp = generators.login(db, other_user.username, other_password)
other_access_token = other_login_rsp["access_token"]
other_refresh_token = other_login_rsp["refresh_token"]
other_create_response = client.post("/api/todos/", headers={"Authorization": f"Bearer {other_access_token}"}, cookies={"refresh_token": other_refresh_token}, json={"task": "Other User Todo"})
other_create_response = client.post(
"/api/todos/",
headers={"Authorization": f"Bearer {other_access_token}"},
cookies={"refresh_token": other_refresh_token},
json={"task": "Other User Todo"},
)
other_todo_id = other_create_response.json()["id"]
# Try to access the other user's todo
response = client.get(
f"/api/todos/{other_todo_id}",
headers={"Authorization": f"Bearer {access_token}"}, # Using the first user's token
cookies={"refresh_token": refresh_token}
headers={
"Authorization": f"Bearer {access_token}"
}, # Using the first user's token
cookies={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_404_NOT_FOUND # Service raises 404 if not found for *this* user
assert (
response.status_code == status.HTTP_404_NOT_FOUND
) # Service raises 404 if not found for *this* user
def test_update_todo(client: TestClient, db: Session):
user, access_token, refresh_token = _login_user(db, client)
@@ -121,7 +152,7 @@ def test_update_todo(client: TestClient, db: Session):
"/api/todos/",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json={"task": "Update Me"}
json={"task": "Update Me"},
)
todo_id = create_response.json()["id"]
@@ -130,7 +161,7 @@ def test_update_todo(client: TestClient, db: Session):
f"/api/todos/{todo_id}",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json=update_data
json=update_data,
)
assert response.status_code == status.HTTP_200_OK
@@ -144,7 +175,7 @@ def test_update_todo(client: TestClient, db: Session):
get_response = client.get(
f"/api/todos/{todo_id}",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token}
cookies={"refresh_token": refresh_token},
)
assert get_response.json()["task"] == update_data["task"]
assert get_response.json()["complete"] == update_data["complete"]
@@ -154,55 +185,60 @@ def test_update_todo_not_found(client: TestClient, db: Session):
user, access_token, refresh_token = _login_user(db, client)
update_data = {"task": "Updated Task", "complete": True}
response = client.put(
"/api/todos/9999", # Non-existent ID
"/api/todos/9999", # Non-existent ID
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json=update_data
json=update_data,
)
assert response.status_code == status.HTTP_404_NOT_FOUND
def test_delete_todo(client: TestClient, db: Session):
user, access_token, refresh_token = _login_user(db, client)
create_response = client.post(
"/api/todos/",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token},
json={"task": "Delete Me"}
json={"task": "Delete Me"},
)
todo_id = create_response.json()["id"]
response = client.delete(
f"/api/todos/{todo_id}",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token}
cookies={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_200_OK # Delete returns the deleted item
assert response.status_code == status.HTTP_200_OK # Delete returns the deleted item
assert response.json()["id"] == todo_id
# Verify deletion by trying to read
get_response = client.get(
f"/api/todos/{todo_id}",
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token}
cookies={"refresh_token": refresh_token},
)
assert get_response.status_code == status.HTTP_404_NOT_FOUND
def test_delete_todo_not_found(client: TestClient, db: Session):
user, access_token, refresh_token = _login_user(db, client)
response = client.delete(
"/api/todos/9999", # Non-existent ID
"/api/todos/9999", # Non-existent ID
headers={"Authorization": f"Bearer {access_token}"},
cookies={"refresh_token": refresh_token}
cookies={"refresh_token": refresh_token},
)
assert response.status_code == status.HTTP_404_NOT_FOUND
# --- Test Authentication/Authorization ---
def test_create_todo_unauthorized(client: TestClient):
response = client.post("/api/todos/", json={"task": "No Auth"})
assert response.status_code == status.HTTP_401_UNAUTHORIZED
def test_read_todos_unauthorized(client: TestClient):
response = client.get("/api/todos/")
assert response.status_code == status.HTTP_401_UNAUTHORIZED