# modules/user/api.py from typing import Annotated from fastapi import APIRouter, Depends from sqlalchemy.orm import Session from core.database import get_db from core.exceptions import unauthorized_exception, not_found_exception, forbidden_exception from modules.auth.schemas import UserPatch, UserResponse from modules.auth.dependencies import get_current_user from modules.auth.models import User router = APIRouter(prefix="/user", tags=["user"]) @router.get("/me", response_model=UserResponse) def me(db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)]) -> UserResponse: """ Get the current user. Requires user to be logged in. Returns the user object. """ return current_user @router.get("/{username}", response_model=UserResponse) def get_user(username: str, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)]) -> UserResponse: """ Get a user by username. Returns the user object. """ if current_user.username != username: raise forbidden_exception("You can only view your own profile") user = db.query(User).filter(User.username == username).first() if not user: raise not_found_exception("User not found") return user @router.patch("/{username}", response_model=UserResponse) def update_user(username: str, user_data: UserPatch, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)]) -> UserResponse: """ Update a user by username. Returns the updated user object. """ if current_user.username != username: raise forbidden_exception("You can only update your own profile") user = db.query(User).filter(User.username == username).first() if not user: raise not_found_exception("User not found") # Define fields that should not be updated non_updateable_fields = {"uuid", "role", "username"} print("BEFORE: ", user_data.model_dump(exclude_unset=True)) # Update only allowed fields for key, value in user_data.model_dump(exclude_unset=True).items(): if key not in non_updateable_fields: setattr(user, key, value) print("AFTER:", user_data.model_dump(exclude_unset=True)) db.commit() db.refresh(user) return user @router.delete("/{username}", response_model=UserResponse) def delete_user(username: str, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)]) -> UserResponse: """ Delete a user by username. Returns the deleted user object. """ if current_user.username != username: raise forbidden_exception("You can only delete your own profile") user = db.query(User).filter(User.username == username).first() if not user: raise not_found_exception("User not found") db.delete(user) db.commit() return user