231 lines
8.4 KiB
Python
231 lines
8.4 KiB
Python
# Main test file for the authentication process.
|
|
# uses conftest -> db_session as an in-memory db.
|
|
|
|
# Goes through the whole authentication process:
|
|
# 1. Register a user
|
|
# 2. Login the user
|
|
# 3. Refresh the token
|
|
# 4. Logout the user
|
|
# 5. Verify that the user is logged out
|
|
# 6. Verify that the user cannot refresh the token
|
|
# 7. Verify that the user cannot login again
|
|
# 8. Verify that the user cannot register again
|
|
# 9. Verify that the user cannot access protected routes (/admin)
|
|
|
|
import time
|
|
from fastapi import status
|
|
from fastapi.testclient import TestClient
|
|
from sqlalchemy.orm import Session
|
|
|
|
from modules.auth.models import TokenBlacklist, User
|
|
from tests.conftest import fake
|
|
|
|
from .helpers import generators
|
|
|
|
|
|
def test_register(client: TestClient) -> None:
|
|
response = client.post(
|
|
"/api/auth/register",
|
|
json={
|
|
"username": fake.user_name(),
|
|
"password": fake.password(),
|
|
"name": fake.name(),
|
|
},
|
|
)
|
|
assert response.status_code == status.HTTP_201_CREATED
|
|
|
|
def test_login(db: Session, client: TestClient) -> None:
|
|
user, unhashed_password = generators.create_user(db)
|
|
|
|
response = client.post(
|
|
"/api/auth/login",
|
|
data={
|
|
"username": user.username,
|
|
"password": unhashed_password,
|
|
},
|
|
)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
response_data = response.json()
|
|
assert "access_token" in response_data
|
|
assert "token_type" in response_data
|
|
assert response_data["token_type"] == "bearer"
|
|
|
|
def test_refresh_token(db: Session, client: TestClient) -> None:
|
|
user, unhashed_password = generators.create_user(db)
|
|
rsp = generators.login(db, user.username, unhashed_password)
|
|
access_token = rsp["access_token"]
|
|
refresh_token = rsp["refresh_token"]
|
|
|
|
time.sleep(1) # Sleep to ensure tokens won't be identical
|
|
|
|
response = client.post(
|
|
"/api/auth/refresh",
|
|
headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"},
|
|
json={"refresh_token": refresh_token},
|
|
)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
response_data = response.json()
|
|
assert "access_token" in response_data
|
|
assert "token_type" in response_data
|
|
assert response_data["token_type"] == "bearer"
|
|
assert response_data["access_token"] != access_token # Ensure the token is refreshed
|
|
|
|
def test_logout(db: Session, client: TestClient) -> None:
|
|
user, unhashed_password = generators.create_user(db)
|
|
rsp = generators.login(db, user.username, unhashed_password)
|
|
access_token = rsp["access_token"]
|
|
refresh_token = rsp["refresh_token"]
|
|
|
|
response = client.post(
|
|
"/api/auth/logout",
|
|
headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"},
|
|
json={"refresh_token": refresh_token},
|
|
)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
# Verify that the token is blacklisted
|
|
blacklisted_token = db.query(TokenBlacklist).filter(TokenBlacklist.token == access_token).first()
|
|
assert blacklisted_token is not None
|
|
|
|
# Verify that we can't still actually do anything
|
|
response = client.get(
|
|
"/api/user/me",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
response = client.post(
|
|
"/api/auth/refresh",
|
|
headers={"Authorization": f"Bearer {access_token}", "Content-Type": "application/json"},
|
|
json={"refresh_token": refresh_token},
|
|
)
|
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
|
|
def test_get_me(db: Session, client: TestClient) -> None:
|
|
user, unhashed_password = generators.create_user(db)
|
|
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
|
|
|
|
response = client.get(
|
|
"/api/user/me",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
response_data = response.json()
|
|
|
|
assert response_data["uuid"] == user.uuid
|
|
assert response_data["username"] == user.username
|
|
|
|
def test_get_me_unauthorized(client: TestClient) -> None:
|
|
### This test should fail (unauthorized) because the user isn't logged in
|
|
response = client.get("/api/user/me")
|
|
assert response.status_code == status.HTTP_401_UNAUTHORIZED
|
|
|
|
def test_get_user(db: Session, client: TestClient) -> None:
|
|
user, unhashed_password = generators.create_user(db)
|
|
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
|
|
|
|
response = client.get(
|
|
f"/api/user/{user.username}",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
response_data = response.json()
|
|
|
|
assert response_data["uuid"] == user.uuid
|
|
assert response_data["username"] == user.username
|
|
|
|
def test_get_user_unauthorized(db: Session, client: TestClient) -> None:
|
|
### This test should fail (unauthorized) because the user isn't us
|
|
user, unhashed_password = generators.create_user(db)
|
|
user2, _ = generators.create_user(db)
|
|
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
|
|
|
|
response = client.get(
|
|
f"/api/user/{user2.username}",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
def test_update_user(db: Session, client: TestClient) -> None:
|
|
user, unhashed_password = generators.create_user(db)
|
|
new_name = fake.name()
|
|
|
|
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
|
|
response = client.patch(
|
|
f"/api/user/{user.username}",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
json={"name": new_name},
|
|
)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
response_data = response.json()
|
|
assert response_data["name"] == new_name
|
|
|
|
|
|
def test_delete_user(db: Session, client: TestClient) -> None:
|
|
user, unhashed_password = generators.create_user(db)
|
|
access_token = generators.login(db, user.username, unhashed_password)["access_token"]
|
|
response = client.delete(
|
|
f"/api/user/{user.username}",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert response.status_code == status.HTTP_200_OK
|
|
|
|
# Verify that the user is deleted
|
|
deleted_user = db.query(User).filter(User.username == user.username).first()
|
|
assert deleted_user is None
|
|
|
|
def test_get_user_forbidden(db: Session, client: TestClient) -> None:
|
|
"""Test getting another user's profile (should be forbidden)."""
|
|
user1, password_user1 = generators.create_user(db, username="user1_get_forbidden")
|
|
user2, _ = generators.create_user(db, username="user2_get_forbidden")
|
|
|
|
# Log in as user1
|
|
login_rsp = generators.login(db, user1.username, password_user1)
|
|
access_token = login_rsp["access_token"]
|
|
|
|
# Try to get user2's profile
|
|
response = client.get(
|
|
f"/api/user/{user2.username}",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
def test_update_user_forbidden(db: Session, client: TestClient) -> None:
|
|
"""Test updating another user's profile (should be forbidden)."""
|
|
user1, password_user1 = generators.create_user(db, username="user1_update_forbidden")
|
|
user2, _ = generators.create_user(db, username="user2_update_forbidden")
|
|
new_name = fake.name()
|
|
|
|
# Log in as user1
|
|
login_rsp = generators.login(db, user1.username, password_user1)
|
|
access_token = login_rsp["access_token"]
|
|
|
|
# Try to update user2's profile
|
|
response = client.patch(
|
|
f"/api/user/{user2.username}",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
json={"name": new_name},
|
|
)
|
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|
|
|
|
def test_delete_user_forbidden(db: Session, client: TestClient) -> None:
|
|
"""Test deleting another user's profile (should be forbidden)."""
|
|
user1, password_user1 = generators.create_user(db, username="user1_delete_forbidden")
|
|
user2, _ = generators.create_user(db, username="user2_delete_forbidden")
|
|
|
|
# Log in as user1
|
|
login_rsp = generators.login(db, user1.username, password_user1)
|
|
access_token = login_rsp["access_token"]
|
|
|
|
# Try to delete user2's profile
|
|
response = client.delete(
|
|
f"/api/user/{user2.username}",
|
|
headers={"Authorization": f"Bearer {access_token}"},
|
|
)
|
|
assert response.status_code == status.HTTP_403_FORBIDDEN
|