Files
MAIA/backend/modules/calendar/tasks.py
c-d-p 62d6b8bdfd [V1.0] Working application, added notifications.
Ready to upload to store.
2025-04-27 00:39:52 +02:00

234 lines
9.8 KiB
Python

# backend/modules/calendar/tasks.py
import logging
import asyncio
from datetime import datetime, timedelta, time, timezone
from celery import shared_task
from celery.exceptions import Ignore
from core.celery_app import celery_app
from core.database import get_db
from modules.calendar.models import CalendarEvent
from modules.notifications.service import send_push_notification
from modules.auth.models import User # Assuming user model is in modules/user/models.py
logger = logging.getLogger(__name__)
# Key prefix for storing scheduled task IDs in Redis (or Celery backend)
SCHEDULED_TASK_KEY_PREFIX = "calendar_event_tasks:"
def get_scheduled_task_key(event_id: int) -> str:
return f"{SCHEDULED_TASK_KEY_PREFIX}{event_id}"
@shared_task(bind=True)
def schedule_event_notifications(self, event_id: int):
"""Schedules reminder notifications for a calendar event."""
db_gen = get_db()
db = next(db_gen)
try:
event = db.query(CalendarEvent).filter(CalendarEvent.id == event_id).first()
if not event:
logger.warning(
f"Calendar event {event_id} not found for scheduling notifications."
)
raise Ignore() # Don't retry if event doesn't exist
user = db.query(User).filter(User.id == event.user_id).first()
if not user or not user.expo_push_token:
logger.warning(
f"User {event.user_id} or their push token not found for event {event_id}. Skipping notification scheduling."
)
# Cancel any potentially existing tasks for this event if user/token is now invalid
cancel_event_notifications(event_id)
raise Ignore() # Don't retry if user/token missing
# Cancel any existing notifications for this event first
cancel_event_notifications(event_id) # Run synchronously within this task
scheduled_task_ids = []
now_utc = datetime.now(timezone.utc)
if event.all_day:
# Schedule one notification at 6:00 AM in the event's original timezone (or UTC if naive)
event_start_date = event.start.date()
notification_time_local = datetime.combine(
event_start_date, time(6, 0), tzinfo=event.start.tzinfo
)
# Convert scheduled time to UTC for Celery ETA
notification_time_utc = notification_time_local.astimezone(timezone.utc)
if notification_time_utc > now_utc:
task = send_event_notification.apply_async(
args=[event.id, user.expo_push_token, "all_day"],
eta=notification_time_utc,
)
scheduled_task_ids.append(task.id)
logger.info(
f"Scheduled all-day notification for event {event_id} at {notification_time_utc} (Task ID: {task.id})"
)
else:
logger.info(
f"All-day notification time {notification_time_utc} for event {event_id} is in the past. Skipping."
)
else:
# Ensure event start time is timezone-aware (assume UTC if naive)
event_start_utc = event.start
if event_start_utc.tzinfo is None:
event_start_utc = event_start_utc.replace(tzinfo=timezone.utc)
else:
event_start_utc = event_start_utc.astimezone(timezone.utc)
times_before = {
"1_hour": timedelta(hours=1),
"30_min": timedelta(minutes=30),
}
for label, delta in times_before.items():
notification_time_utc = event_start_utc - delta
if notification_time_utc > now_utc:
task = send_event_notification.apply_async(
args=[event.id, user.expo_push_token, label],
eta=notification_time_utc,
)
scheduled_task_ids.append(task.id)
logger.info(
f"Scheduled {label} notification for event {event_id} at {notification_time_utc} (Task ID: {task.id})"
)
else:
logger.info(
f"{label} notification time {notification_time_utc} for event {event_id} is in the past. Skipping."
)
# Store the new task IDs using Celery backend (Redis)
if scheduled_task_ids:
key = get_scheduled_task_key(event_id)
# Store as a simple comma-separated string
celery_app.backend.set(key, ",".join(scheduled_task_ids))
logger.debug(f"Stored task IDs for event {event_id}: {scheduled_task_ids}")
except Exception as e:
logger.exception(f"Error scheduling notifications for event {event_id}: {e}")
# Optional: Add retry logic if appropriate
# self.retry(exc=e, countdown=60)
finally:
next(db_gen, None) # Ensure db session is closed
# Note: This task calls an async function. Ensure your Celery worker
# is configured to handle async tasks (e.g., using gevent, eventlet, or uvicorn worker).
@shared_task(bind=True)
def send_event_notification(
self, event_id: int, user_push_token: str, notification_type: str
):
"""Sends a single reminder notification for a calendar event."""
db_gen = get_db()
db = next(db_gen)
try:
event = db.query(CalendarEvent).filter(CalendarEvent.id == event_id).first()
if not event:
logger.warning(
f"Calendar event {event_id} not found for sending {notification_type} notification."
)
raise Ignore() # Don't retry if event doesn't exist
# Double-check user and token validity at the time of sending
user = db.query(User).filter(User.id == event.user_id).first()
if not user or user.expo_push_token != user_push_token:
logger.warning(
f"User {event.user_id} token mismatch or user not found for event {event_id} at notification time. Skipping."
)
raise Ignore()
title = f"Upcoming: {event.title}"
if notification_type == "all_day":
body = f"Today: {event.title}"
if event.description:
body += f" - {event.description[:50]}" # Add part of description
elif notification_type == "1_hour":
local_start_time = event.start.astimezone().strftime(
"%I:%M %p"
) # Convert to local time for display
body = f"Starts at {local_start_time} (in 1 hour)"
elif notification_type == "30_min":
local_start_time = event.start.astimezone().strftime("%I:%M %p")
body = f"Starts at {local_start_time} (in 30 mins)"
else:
body = "Check your calendar for details." # Fallback
logger.info(
f"Sending {notification_type} notification for event {event_id} to token {user_push_token[:10]}..."
)
try:
# Call the async notification service
success = asyncio.run(
send_push_notification(
push_token=user_push_token,
title=title,
body=body,
data={"eventId": event.id, "type": "calendar_reminder"},
)
)
if not success:
logger.error(
f"Failed to send {notification_type} notification for event {event_id} via service."
)
# Optional: self.retry(countdown=60) # Retry sending if failed
else:
logger.info(
f"Successfully sent {notification_type} notification for event {event_id}."
)
except Exception as e:
logger.exception(
f"Error calling send_push_notification for event {event_id}: {e}"
)
# Optional: self.retry(exc=e, countdown=60)
except Exception as e:
logger.exception(
f"General error sending {notification_type} notification for event {event_id}: {e}"
)
# Optional: self.retry(exc=e, countdown=60)
finally:
next(db_gen, None) # Ensure db session is closed
# This is run synchronously when called, or can be called as a task itself
# @shared_task # Uncomment if you want to call this asynchronously e.g., .delay()
def cancel_event_notifications(event_id: int):
"""Cancels all scheduled reminder notifications for a calendar event."""
key = get_scheduled_task_key(event_id)
try:
task_ids_bytes = celery_app.backend.get(key)
if task_ids_bytes:
# Decode from bytes (assuming Redis backend)
task_ids_str = task_ids_bytes.decode("utf-8")
task_ids = task_ids_str.split(",")
logger.info(f"Cancelling scheduled tasks for event {event_id}: {task_ids}")
revoked_count = 0
for task_id in task_ids:
if task_id: # Ensure not empty string
try:
celery_app.control.revoke(
task_id.strip(), terminate=True, signal="SIGKILL"
)
revoked_count += 1
except Exception as revoke_err:
logger.error(
f"Error revoking task {task_id} for event {event_id}: {revoke_err}"
)
# Delete the key from Redis after attempting revocation
celery_app.backend.delete(key)
logger.debug(
f"Revoked {revoked_count} tasks and removed task ID key {key} from backend for event {event_id}."
)
else:
logger.debug(
f"No scheduled tasks found in backend to cancel for event {event_id} (key: {key})."
)
except Exception as e:
logger.exception(f"Error cancelling notifications for event {event_id}: {e}")