Skip to content

Email Integration Flow

Overview

The email integration system provides a unified interface for fetching and processing emails from various email providers. The system is designed to be extensible, allowing new email providers to be added easily.

Components

Core Components

  • EmailFetchingService (abstract base class)
  • Defines the interface for email fetching operations
  • Provides common functionality for all email providers
  • Enables easy addition of new email providers

Provider Implementations

  • GmailFetchingService (implements EmailFetchingService)
  • Handles Gmail-specific email fetching
  • Manages Gmail API authentication and requests
  • Implements pagination and filtering

Processing Components

  • EmailProcessingOrchestrator
  • Coordinates the email processing pipeline
  • Manages email storage, summarization, and embedding
  • Uses EmailFetchingService interface for provider-agnostic email fetching

Flow Diagram

mermaid graph TD A[App User] --> B[Email Processing Orchestrator] B --> C[Email Fetching Service Interface] C --> D[Gmail Fetching Service] D --> E[Gmail API] B --> F[Email Repository] B --> G[Email Parser] B --> H[Email Summarizer] B --> I[Email Tagger] B --> J[Email Embedder]

Implementation Details

1. EmailFetchingService Interface

The EmailFetchingService abstract base class defines the contract for email fetching:

class EmailFetchingService(ABC):
    async def fetch(self) -> list[EmailMessage]
    async def fetch_email_ids(self) -> list[str]
    async def download_emails(self, email_ids: list[str]) -> list[EmailMessage]

2. GmailFetchingService Implementation

The GmailFetchingService implements the interface for Gmail:

class GmailFetchingService(EmailFetchingService):
    def __init__(self, app_user: AppUser, ...):
        self.gmail = Gmail(app_user=app_user, ...)

    async def fetch(self) -> list[EmailMessage]:
        email_ids = await self.fetch_email_ids()
        return await self.download_emails(email_ids)

3. EmailProcessingOrchestrator

The orchestrator uses the interface for provider-agnostic email fetching:

class EmailProcessingOrchestrator:
    def __init__(self, email_fetching_service: EmailFetchingService, ...):
        self.email_fetching_service = email_fetching_service

    async def download(self):
        self.emails = await self.email_fetching_service.fetch()

Adding New Email Providers

To add support for a new email provider:

  1. Create a new class implementing EmailFetchingService
  2. Implement the required methods:
  3. fetch()
  4. fetch_email_ids()
  5. download_emails()
  6. Use the new implementation with EmailProcessingOrchestrator

Example:

class OutlookFetchingService(EmailFetchingService):
    async def fetch(self) -> list[EmailMessage]:
        # Implement Outlook-specific fetching
        pass

Error Handling

  • Each implementation handles provider-specific errors
  • Common errors are defined in the interface
  • The orchestrator handles processing errors

Testing

  • Mock the EmailFetchingService interface for testing
  • Create test implementations for provider-specific tests
  • Use dependency injection for flexible testing

Future Improvements

  • Add support for more email providers
  • Implement caching for frequently accessed emails
  • Add rate limiting and retry mechanisms
  • Improve error handling and recovery