# modules/calendar/service.py from sqlalchemy.orm import Session from sqlalchemy import or_ # Import or_ from datetime import datetime from modules.calendar.models import CalendarEvent from core.exceptions import not_found_exception from modules.calendar.schemas import ( CalendarEventCreate, CalendarEventUpdate, ) # Import the celery app instance instead of the task functions directly from core.celery_app import celery_app # Keep task imports if cancel_event_notifications is still called directly and synchronously from modules.calendar.tasks import cancel_event_notifications def create_calendar_event(db: Session, user_id: int, event_data: CalendarEventCreate): # Ensure tags is None if not provided or empty list, matching model tags_to_store = event_data.tags if event_data.tags else None event = CalendarEvent( **event_data.model_dump( exclude={"tags"} ), # Use model_dump and exclude tags initially tags=tags_to_store, # Set tags separately user_id=user_id, ) db.add(event) db.commit() db.refresh(event) # Schedule notifications using send_task celery_app.send_task( "modules.calendar.tasks.schedule_event_notifications", # Task name as string args=[event.id], ) return event def get_calendar_events( db: Session, user_id: int, start: datetime | None, end: datetime | None ): """ Retrieves calendar events for a user, optionally filtered by a date range. Args: db: The database session. user_id: The ID of the user whose events are to be retrieved. start: The start datetime of the filter range (inclusive). end: The end datetime of the filter range (exclusive). Returns: A list of CalendarEvent objects matching the criteria, ordered by start time. """ print(f"Getting calendar events for user {user_id} in range [{start}, {end})") query = db.query(CalendarEvent).filter(CalendarEvent.user_id == user_id) # If start and end dates are provided, filter for events overlapping the range [start, end). if start and end: # An event overlaps the range [start, end) if: # 1. It has a duration (end is not None) AND its interval [event.start, event.end) # intersects with [start, end). Intersection occurs if: # event.start < end AND event.end > start # 2. It's a point event (end is None) AND its start time falls within the range: # start <= event.start < end query = query.filter( or_( # Case 1: Event has duration and overlaps (CalendarEvent.end != None) # Use SQLAlchemy comparison # noqa: E711 & (CalendarEvent.start < end) & (CalendarEvent.end > start), # Case 2: Event is a point event within the range (CalendarEvent.end == None) # Use SQLAlchemy comparison # noqa: E711 & (CalendarEvent.start >= start) & (CalendarEvent.start < end), ) ) # If only start is provided, filter events starting on or after start elif start: # Includes events with duration starting >= start # Includes point events occurring >= start query = query.filter(CalendarEvent.start >= start) # If only end is provided, filter events ending before end, or point events occurring before end elif end: # Includes events with duration ending <= end (or starting before end if end is None) # Includes point events occurring < end query = query.filter( or_( # Event ends before the specified end time (CalendarEvent.end is not None) & (CalendarEvent.end <= end), # Point event occurs before the specified end time (CalendarEvent.end is None) & (CalendarEvent.start < end), ) ) # Alternative interpretation for "ending before end": include events that *start* before end # query = query.filter(CalendarEvent.start < end) return query.order_by(CalendarEvent.start).all() # Order by start time def get_calendar_event_by_id(db: Session, user_id: int, event_id: int): event = ( db.query(CalendarEvent) .filter(CalendarEvent.id == event_id, CalendarEvent.user_id == user_id) .first() ) if not event: raise not_found_exception() return event def update_calendar_event( db: Session, user_id: int, event_id: int, event_data: CalendarEventUpdate ): event = get_calendar_event_by_id(db, user_id, event_id) # Reuse get_by_id for check # Use model_dump with exclude_unset=True to only update provided fields update_data = event_data.model_dump(exclude_unset=True) for key, value in update_data.items(): # Ensure tags is handled correctly (set to None if empty list provided) if key == "tags" and isinstance(value, list) and not value: setattr(event, key, None) else: setattr(event, key, value) db.commit() db.refresh(event) # Re-schedule notifications using send_task celery_app.send_task( "modules.calendar.tasks.schedule_event_notifications", args=[event.id] ) return event def delete_calendar_event(db: Session, user_id: int, event_id: int): event = get_calendar_event_by_id(db, user_id, event_id) # Reuse get_by_id for check # Cancel any scheduled notifications before deleting # Run synchronously here or make cancel_event_notifications an async task cancel_event_notifications(event_id) db.delete(event) db.commit()