from typing import Annotated, Optional from fastapi import APIRouter, Depends, HTTPException, status from sqlalchemy.orm import Session from pydantic import BaseModel from core.database import get_db from core.exceptions import not_found_exception, forbidden_exception from modules.auth.schemas import UserPatch, UserResponse from modules.auth.dependencies import get_current_user from modules.auth.models import User router = APIRouter(prefix="/user", tags=["user"]) # --- Pydantic Schema for Push Token --- # class PushTokenData(BaseModel): token: str device_name: Optional[str] = None token_type: str # Expecting 'expo' @router.post("/push-token", status_code=status.HTTP_200_OK) def save_push_token( token_data: PushTokenData, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ): """ Save the Expo push token for the current user. Requires user to be logged in. """ if token_data.token_type != "expo": raise HTTPException( status_code=status.HTTP_400_BAD_REQUEST, detail="Invalid token_type. Only 'expo' is supported.", ) # Update the user's push token current_user.expo_push_token = token_data.token # Optionally, you could store device_name somewhere if needed, perhaps in a separate table # For now, we just update the token on the user model db.add(current_user) db.commit() db.refresh(current_user) return {"message": "Push token saved successfully"} @router.get("/me", response_model=UserResponse) def me( db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ) -> UserResponse: """ Get the current user. Requires user to be logged in. Returns the user object. """ return current_user @router.get("/{username}", response_model=UserResponse) def get_user( username: str, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ) -> UserResponse: """ Get a user by username. Returns the user object. """ if current_user.username != username: raise forbidden_exception("You can only view your own profile") user = db.query(User).filter(User.username == username).first() if not user: raise not_found_exception("User not found") return user @router.patch("/{username}", response_model=UserResponse) def update_user( username: str, user_data: UserPatch, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ) -> UserResponse: """ Update a user by username. Returns the updated user object. """ if current_user.username != username: raise forbidden_exception("You can only update your own profile") user = db.query(User).filter(User.username == username).first() if not user: raise not_found_exception("User not found") # Define fields that should not be updated non_updateable_fields = {"uuid", "role", "username"} print("BEFORE: ", user_data.model_dump(exclude_unset=True)) # Update only allowed fields for key, value in user_data.model_dump(exclude_unset=True).items(): if key not in non_updateable_fields: setattr(user, key, value) print("AFTER:", user_data.model_dump(exclude_unset=True)) db.commit() db.refresh(user) return user @router.delete("/{username}", response_model=UserResponse) def delete_user( username: str, db: Annotated[Session, Depends(get_db)], current_user: Annotated[User, Depends(get_current_user)], ) -> UserResponse: """ Delete a user by username. Returns the deleted user object. """ if current_user.username != username: raise forbidden_exception("You can only delete your own profile") user = db.query(User).filter(User.username == username).first() if not user: raise not_found_exception("User not found") db.delete(user) db.commit() return user