Email Integration Flow¶
Overview¶
The email integration system provides a unified interface for fetching and processing emails from various email providers. The system is designed to be extensible, allowing new email providers to be added easily.
Components¶
Core Components¶
EmailFetchingService(abstract base class)- Defines the interface for email fetching operations
- Provides common functionality for all email providers
- Enables easy addition of new email providers
Provider Implementations¶
GmailFetchingService(implementsEmailFetchingService)- Handles Gmail-specific email fetching
- Manages Gmail API authentication and requests
- Implements pagination and filtering
Processing Components¶
EmailProcessingOrchestrator- Coordinates the email processing pipeline
- Manages email storage, summarization, and embedding
- Uses
EmailFetchingServiceinterface for provider-agnostic email fetching
Flow Diagram¶
mermaid
graph TD
A[App User] --> B[Email Processing Orchestrator]
B --> C[Email Fetching Service Interface]
C --> D[Gmail Fetching Service]
D --> E[Gmail API]
B --> F[Email Repository]
B --> G[Email Parser]
B --> H[Email Summarizer]
B --> I[Email Tagger]
B --> J[Email Embedder]
Implementation Details¶
1. EmailFetchingService Interface¶
The EmailFetchingService abstract base class defines the contract for email fetching:
class EmailFetchingService(ABC):
async def fetch(self) -> list[EmailMessage]
async def fetch_email_ids(self) -> list[str]
async def download_emails(self, email_ids: list[str]) -> list[EmailMessage]
2. GmailFetchingService Implementation¶
The GmailFetchingService implements the interface for Gmail:
class GmailFetchingService(EmailFetchingService):
def __init__(self, app_user: AppUser, ...):
self.gmail = Gmail(app_user=app_user, ...)
async def fetch(self) -> list[EmailMessage]:
email_ids = await self.fetch_email_ids()
return await self.download_emails(email_ids)
3. EmailProcessingOrchestrator¶
The orchestrator uses the interface for provider-agnostic email fetching:
class EmailProcessingOrchestrator:
def __init__(self, email_fetching_service: EmailFetchingService, ...):
self.email_fetching_service = email_fetching_service
async def download(self):
self.emails = await self.email_fetching_service.fetch()
Adding New Email Providers¶
To add support for a new email provider:
- Create a new class implementing
EmailFetchingService - Implement the required methods:
fetch()fetch_email_ids()download_emails()- Use the new implementation with
EmailProcessingOrchestrator
Example:
class OutlookFetchingService(EmailFetchingService):
async def fetch(self) -> list[EmailMessage]:
# Implement Outlook-specific fetching
pass
Error Handling¶
- Each implementation handles provider-specific errors
- Common errors are defined in the interface
- The orchestrator handles processing errors
Testing¶
- Mock the
EmailFetchingServiceinterface for testing - Create test implementations for provider-specific tests
- Use dependency injection for flexible testing
Future Improvements¶
- Add support for more email providers
- Implement caching for frequently accessed emails
- Add rate limiting and retry mechanisms
- Improve error handling and recovery