Skip to content

Email Processing Code Audit

Overview

This document outlines potential issues found in the email processing code path that could cause crashes in staging. The audit focuses on syntax errors, incorrect function calls, and potential runtime issues in both the interface and its implementations.

Issues Found

1. Missing Error Handling in EmailProcessingOrchestrator

orchestrator = await EmailProcessingOrchestrator.create(
    app_user=app_user,
    email_fetching_service=email_fetching_service,  # New parameter
    max_results=batch_size,
    load_from_disk=False,
    date_range=(start_date, end_date),
    page_token=page_token,
)
The create method is called but there's no error handling around it. If it fails, it could crash the entire process. We should add proper error handling for both service creation and initialization.

2. Incorrect Date Handling

end_date = datetime.now(tz=UTC)
start_date = end_date - timedelta(days=7)
The dates are created as datetime objects but then converted to ISO format strings. However, when they're used in the orchestrator, they're passed as a tuple. This could cause type mismatches. We should add type validation in the EmailFetchingService interface.

3. Missing Repository Error Handling

sync_progress_repo = sync_progress_repo or FirestoreSyncProgressRepository()
provider_user_repo = provider_user_repo or FirestoreProviderUserRepository()
app_user_repo = app_user_repo or FirestoreAppUserRepository()
family_repo = family_repo or FirestoreFamilyRepository()
The repositories are created without error handling. If any of these fail to initialize, it could crash. This is a separate issue from our interface changes but should be addressed.

4. Potential Race Condition

RUNNING_EMAIL_SYNC_TASKS[sync_id] = True
This is a global dictionary that's modified without any synchronization. In a multi-threaded environment, this could cause race conditions. We should implement proper synchronization in the interface.

5. Missing Validation

if not provider_user:
    logger.error(f"Provider user {provider_user_id} not found")
    return
The function returns without updating the sync progress status, which could leave the sync in an inconsistent state. We should add validation in the EmailFetchingService interface.

6. Incorrect Error Propagation

except Exception as e:
    logger.error(f"[process_email_batch] Exception: {e!s}", exc_info=True)
    # Update error status
    if sync_id:
        try:
            sync_progress_repo = sync_progress_repo or FirestoreSyncProgressRepository()
The error handling creates a new repository instance instead of using the existing one, which could lead to inconsistencies. We should add proper error handling in the interface.

7. Missing Cleanup

RUNNING_EMAIL_SYNC_TASKS[sync_id] = False
This cleanup is only done in some error paths but not all. If an exception occurs in certain places, the task might never be marked as complete. We should add proper cleanup in the interface.

8. Potential Memory Leak

emails = list(emails_generator)
The code converts generators to lists without any size limits, which could cause memory issues with large datasets. We should add pagination in the EmailFetchingService interface.

9. Incorrect Async Task Management

asyncio.create_task(run_email_sync())
The task is created but never awaited or stored, which could lead to tasks being lost if the parent task is cancelled. We should add proper task management in the interface.

10. Missing Timeout Handling

sync_progress = await sync_progress_repo.get_by_id(sync_id)
There are no timeouts on any of the async operations, which could cause the process to hang indefinitely. We should add timeout handling in the interface.

Interface Improvements Needed

1. Error Handling

class EmailFetchingService(ABC):
    @abstractmethod
    async def fetch(self) -> list[EmailMessage]:
        """Fetch emails with proper error handling."""
        pass

2. Type Validation

class EmailFetchingService(ABC):
    @abstractmethod
    async def fetch_email_ids(
        self,
        date_range: tuple[str, str] | None = None,
    ) -> list[str]:
        """Fetch email IDs with type validation."""
        pass

3. Pagination

class EmailFetchingService(ABC):
    @abstractmethod
    async def fetch(
        self,
        batch_size: int = 100,
        page_token: str | None = None,
    ) -> list[EmailMessage]:
        """Fetch emails with pagination."""
        pass

4. Timeout Handling

class EmailFetchingService(ABC):
    @abstractmethod
    async def download_emails(
        self,
        email_ids: list[str],
        timeout: int = 30,
    ) -> list[EmailMessage]:
        """Download emails with timeout."""
        pass

Next Steps

Each of these issues should be addressed to improve the stability of the email processing system in staging. Priority should be given to: 1. Adding proper error handling in the interface and implementations 2. Implementing proper cleanup in all code paths 3. Adding timeouts to async operations 4. Fixing the date handling inconsistencies 5. Implementing proper task management for async operations 6. Adding pagination support in the interface 7. Implementing proper synchronization for shared resources 8. Adding type validation in the interface 9. Improving error propagation 10. Adding proper cleanup mechanisms