Skip to content

ADR-005: Email Integration System

Status

Implemented

Context

OmniButler needs to integrate with users' email accounts (currently focusing on Gmail) to provide comprehensive financial management capabilities. This integration requires:

  1. Processing emails when users first connect their Gmail account
  2. Periodically syncing new emails to keep data current
  3. Parsing, summarizing, tagging, and storing emails securely
  4. Managing this process efficiently across many users

The system must handle large volumes of emails without overwhelming resources, process emails incrementally, and maintain a clean architecture consistent with OmniButler's design principles.

Current Implementation

The email integration system currently consists of the following components:

Email Fetching & Processing

  • EmailService (in domain/services/email_service.py): Handles fetching emails from Gmail using OAuth tokens
  • EmailProcessingService (in domain/email_processing/email_processing_service.py): Orchestrates the parsing, summarizing, tagging, and embedding of emails

Data Models

  • EmailMessage (in application/dtos/response/email_message.py): DTO for email messages (incorrectly placed in application layer)
  • Email (in infrastructure/integrations/email/models/email.py): SQL model for storing emails in Turso

Infrastructure Components

  • GmailMessageDownloader (in infrastructure/email_providers/gmail_message_downloader.py): Downloads email content from Gmail
  • GmailMessageIdsFetcher (in infrastructure/email_providers/gmail_message_ids_fetcher.py): Fetches email IDs from Gmail
  • Gmail (in infrastructure/email_providers/gmail.py): Wrapper for Gmail-specific operations
  • TursoEmailRepository (in infrastructure/database/turso/turso_email_repository.py): Handles storage and retrieval of emails

Use Cases

  • FetchEmailsUseCase (in application/use_cases/fetch_emails.py)
  • SummarizeEmailsUseCase (in application/use_cases/summarize_emails.py)
  • TagEmailsUseCase (in application/use_cases/tag_emails.py)
  • EmbedEmailsUseCase (in application/use_cases/embed_emails.py)

Execution

Currently, email processing is triggered manually through command-line scripts, with no automation or background processing.

Architectural Issues to Address

  1. Manual Processing: Email processing is currently manual, requiring command-line execution
  2. No Progress Tracking: The system doesn't track processing state or progress
  3. Clean Architecture Violations: Some components are in incorrect layers
  4. No Periodic Syncing: No mechanism for regular email updates

Decision

We will implement a two-part email integration system:

1. Initial Gmail Connection Processing

  • Provider User Watcher: Extend the existing ProviderUsersWatcher in infrastructure/database/firestore/watchers/provider_users.py to detect when Gmail service is connected
  • Background Task Processing: Use FastAPI background tasks to process emails incrementally
  • Batched Processing: Process in weekly batches, starting with the most recent week
  • Chained Tasks: Each batch spawns the next batch for older emails until complete
  • Progress Tracking: Store processing state in Firestore

2. Periodic Email Sync

  • Simple Scheduler: Use APScheduler integrated with FastAPI to run hourly syncs
  • Incremental Processing: Only fetch and process emails newer than the last sync
  • Tracking: Maintain last_sync_timestamp per provider user

Technical Implementation

1. Clean Architecture Refactoring

  1. Move EmailMessage DTO from application/dtos/response/email_message.py to domain/models/email_message.py
  2. Rename components for clarity:
  3. EmailServiceGmailFetchingService to clarify its specific role
  4. EmailProcessingServiceEmailProcessingOrchestrator to better reflect its orchestration role

2. Enhanced Provider User Watcher

Extend the existing ProviderUsersWatcher to monitor Gmail connections:

def on_provider_users_update(
    self,
    _doc_snapshot: list[DocumentSnapshot],
    changes: list[DocumentChange],
    _read_time: str,
):
    # Existing code for handling auth codes

    # New code for detecting Gmail service activation
    for change in changes:
        if change.type == ChangeType.MODIFIED:
            doc = change.document
            doc_data = doc.to_dict()

            # Check if Gmail service was just activated
            if "services" in doc_data and doc_data["services"].get("gmail") is True:
                # Get previous document state
                old_doc = change.old_document().to_dict() if change.old_document() else {}
                old_services = old_doc.get("services", {})

                # If Gmail was previously not active
                if not old_services.get("gmail"):
                    logger.info(f"Gmail service activated for provider user {doc.id}")

                    # Get provider user from repository
                    provider_user_repo = FirestoreProviderUserRepository()
                    provider_user = provider_user_repo.get_by_id(doc.id)

                    if not provider_user:
                        logger.error(f"Provider user {doc.id} not found")
                        continue

                    # Initialize email processing state
                    provider_user.email_processing_status = {
                        "started_at": datetime.now().isoformat(),
                        "in_progress": True,
                        "processed_until": None,
                        "total_processed": 0,
                        "last_sync_timestamp": None,
                        "error": None
                    }
                    provider_user_repo.update(provider_user)

                    # Trigger background processing
                    from application.background_tasks.email_processing import process_email_batch

                    # Get current date and calculate one week ago
                    end_date = datetime.now().isoformat()
                    start_date = (datetime.now() - timedelta(days=7)).isoformat()

                    # Start processing most recent emails
                    process_email_batch(
                        provider_user_id=doc.id,
                        start_date=start_date,
                        end_date=end_date,
                        batch_size=100
                    )

3. Email Processing Background Task

Create a new module for background tasks:

# application/background_tasks/email_processing.py

from datetime import datetime, timedelta
from typing import Optional

from fastapi import BackgroundTasks

from application.dependencies import get_background_tasks
from domain.email_processing.email_processing_orchestrator import EmailProcessingOrchestrator
from infrastructure.database.firestore.provider_user_repository import FirestoreProviderUserRepository
from infrastructure.database.firestore.app_user_repository import FirestoreAppUserRepository
from infrastructure.logging import get_logger

logger = get_logger(__name__)

async def process_email_batch(
    provider_user_id: str,
    start_date: str,
    end_date: str,
    batch_size: int = 100,
    page_token: str | None = None,
    background_tasks: Optional[BackgroundTasks] = None
):
    """Process emails for a specific date range and schedule next batch"""
    if background_tasks is None:
        background_tasks = get_background_tasks()

    try:
        # Get provider user data using repository pattern
        provider_user_repo = FirestoreProviderUserRepository()
        provider_user = provider_user_repo.get_by_id(provider_user_id)

        if not provider_user:
            logger.error(f"Provider user {provider_user_id} not found")
            return

        # Update processing status via repository
        provider_user.email_processing_status = provider_user.email_processing_status or {}
        provider_user.email_processing_status["in_progress"] = True
        provider_user.email_processing_status["processed_until"] = start_date
        provider_user_repo.update(provider_user)

        # Get app user via repository
        app_user_repo = FirestoreAppUserRepository()
        app_user = app_user_repo.get_by_id(provider_user.appUserId)

        if not app_user:
            logger.error(f"App user {provider_user.appUserId} not found")
            provider_user.email_processing_status["in_progress"] = False
            provider_user.email_processing_status["error"] = "App user not found"
            provider_user_repo.update(provider_user)
            return

        # Process emails for the date range
        orchestrator = EmailProcessingOrchestrator(
            app_user=app_user,
            max_results=batch_size,
            load_from_disk=False,
            date_range=(start_date, end_date),
            page_token=page_token
        )

        # Fetch and store emails
        result = orchestrator.store()

        # Check if we have a next page token
        next_page_token = result.get("next_page_token")
        processed_count = result.get("processed_count", 0)

        # Summarize emails
        orchestrator.summarize()

        # Tag emails
        orchestrator.tag()

        # Embed emails
        orchestrator.embed()

        # Update status with completion info via repository
        provider_user = provider_user_repo.get_by_id(provider_user_id)  # Refresh to get latest state
        current_processed = provider_user.email_processing_status.get("total_processed", 0)
        provider_user.email_processing_status["total_processed"] = current_processed + processed_count
        provider_user_repo.update(provider_user)

        if next_page_token:
            # Process next page of emails for the same date range
            logger.info(f"Processing next page of emails for {provider_user_id} in date range {start_date} to {end_date}")
            background_tasks.add_task(
                process_email_batch,
                provider_user_id=provider_user_id,
                start_date=start_date,
                end_date=end_date,
                batch_size=batch_size,
                page_token=next_page_token
            )
        else:
            # All emails for current date range processed, move to next date range
            # Calculate the next date range (one week earlier)
            new_end_date = datetime.fromisoformat(start_date)
            new_start_date = new_end_date - timedelta(days=7)

            # Update processed until date via repository
            provider_user = provider_user_repo.get_by_id(provider_user_id)  # Refresh
            provider_user.email_processing_status["processed_until"] = start_date
            provider_user_repo.update(provider_user)

            # Check if we need to process more (e.g., if emails older than 1 year, stop)
            one_year_ago = datetime.now() - timedelta(days=365)
            if new_start_date > one_year_ago:
                # Schedule next batch
                logger.info(f"Scheduling next batch for {provider_user_id}: {new_start_date} to {new_end_date}")
                background_tasks.add_task(
                    process_email_batch,
                    provider_user_id=provider_user_id,
                    start_date=new_start_date.isoformat(),
                    end_date=new_end_date.isoformat(),
                    batch_size=batch_size
                )
            else:
                # Mark processing as complete via repository
                logger.info(f"Email processing completed for {provider_user_id}")
                provider_user = provider_user_repo.get_by_id(provider_user_id)  # Refresh
                provider_user.email_processing_status["in_progress"] = False
                provider_user.email_processing_status["last_sync_timestamp"] = datetime.now().isoformat()
                provider_user_repo.update(provider_user)

    except Exception as e:
        logger.error(f"Error processing emails for {provider_user_id}: {str(e)}")
        # Update error status via repository
        provider_user_repo = FirestoreProviderUserRepository()
        provider_user = provider_user_repo.get_by_id(provider_user_id)
        if provider_user:
            provider_user.email_processing_status = provider_user.email_processing_status or {}
            provider_user.email_processing_status["in_progress"] = False
            provider_user.email_processing_status["error"] = str(e)
            provider_user_repo.update(provider_user)

4. Periodic Email Sync

Create a scheduler for periodic email synchronization:

# application/schedulers/email_sync.py

from datetime import datetime, timedelta
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger

from fastapi import FastAPI
from infrastructure.database.firestore.provider_user_repository import FirestoreProviderUserRepository
from infrastructure.logging import get_logger
from application.background_tasks.email_processing import process_email_batch

logger = get_logger(__name__)

def setup_email_sync_scheduler(app: FastAPI):
    """Set up a scheduler for periodic email synchronization"""
    scheduler = AsyncIOScheduler()

    @scheduler.scheduled_job(IntervalTrigger(hours=1))
    async def sync_new_emails():
        """Synchronize new emails for all users with Gmail activated"""
        logger.info("Starting scheduled email sync")

        # Get all provider users with Gmail service activated
        provider_user_repo = FirestoreProviderUserRepository()
        provider_users = provider_user_repo.get_all_with_gmail_service()

        for provider_user in provider_users:
            # Skip users with in-progress processing
            if provider_user.email_processing_status and provider_user.email_processing_status.get("in_progress"):
                logger.info(f"Skipping {provider_user.id} - processing already in progress")
                continue

            # Get last sync timestamp or default to 24 hours ago
            last_sync = None
            if provider_user.email_processing_status:
                last_sync = provider_user.email_processing_status.get("last_sync_timestamp")

            if not last_sync:
                last_sync = datetime.now() - timedelta(hours=24)

            # Process emails since last sync
            end_date = datetime.now().isoformat()
            start_date = last_sync.isoformat()

            logger.info(f"Syncing emails for {provider_user.id} from {start_date} to {end_date}")

            # Process in background
            await process_email_batch(
                provider_user_id=provider_user.id,
                start_date=start_date,
                end_date=end_date,
                batch_size=100
            )

    # Start the scheduler
    scheduler.start()

    # Store scheduler in app state to prevent garbage collection
    app.state.email_sync_scheduler = scheduler

    # Register shutdown event to close scheduler gracefully
    @app.on_event("shutdown")
    def shutdown_scheduler():
        if app.state.email_sync_scheduler:
            app.state.email_sync_scheduler.shutdown()

5. Data Model Extensions

Add necessary fields to the ProviderUser model:

# domain/models/provider_user.py

class EmailProcessingStatus(BaseModel):
    started_at: datetime | None = None
    in_progress: bool = False
    processed_until: datetime | None = None
    total_processed: int = 0
    last_sync_timestamp: datetime | None = None
    error: str | None = None

class ProviderUser(BaseModel):
    # Existing fields

    # New fields
    email_processing_status: EmailProcessingStatus | None = None
    previous_services: dict = Field(default_factory=dict)

6. Extending EmailProcessingOrchestrator

Update the EmailProcessingOrchestrator (previously EmailProcessingService) to support pagination:

# domain/email_processing/email_processing_orchestrator.py

class EmailProcessingOrchestrator:
    def __init__(
        self,
        app_user: AppUser,
        load_from_disk: bool = False,
        max_results: int = 100,
        date_range: tuple[str, str] | None = None,
        page_token: str | None = None
    ) -> None:
        self.gmail_fetching_service = GmailFetchingService(
            app_user=app_user,
            load_from_disk=load_from_disk,
            max_results=max_results,
            date_range=date_range,
            page_token=page_token
        )

        self.email_repository = TursoEmailRepository(
            app_user_id=app_user.appUserId,
            turso_db_access_token=app_user.database_info.access_token,
        )

        self.app_user = app_user
        self.emails: list[EmailMessage] = []
        self.processed_emails: list[EmailMessage] = []
        self.page_token = page_token

    # ... other methods ...

    def store(self: Self, in_batches_of: int = 20) -> dict:
        """Store emails and return processing results"""
        if not self.processed_emails:
            self.process()

        new_emails = [
            email
            for email in self.processed_emails
            if email.id not in self._existing_email_ids()
        ]

        # ... existing storage code ...

        return {
            "processed_count": len(new_emails),
            "next_page_token": self.page_token
        }

7. Application Startup Integration

Register the email sync scheduler during application startup:

# application/app.py

from application.schedulers.email_sync import setup_email_sync_scheduler

def create_app():
    app = FastAPI()

    # Register routers, middleware, etc.

    # Set up email sync scheduler
    setup_email_sync_scheduler(app)

    return app

Consequences

Positive

  1. Automated Processing: Emails are processed automatically after Gmail connection and periodically thereafter
  2. Non-blocking Architecture: Processing happens in background tasks, avoiding request blocking
  3. Incremental Processing: Handles large mailboxes efficiently in manageable batches
  4. Current Data: Regular syncing ensures data stays up-to-date
  5. Clean Architecture: Refactoring ensures proper separation of concerns
  6. Progress Tracking: Status tracking enables monitoring and error recovery
  7. Resource Efficiency: Batched processing prevents overwhelming system resources
  8. Scalable Approach: Design can scale to more complex implementations if needed
  9. Progressive Enhancement: Allows for starting simple and enhancing features over time
  10. Complete Processing: Pagination ensures all emails within a time period are processed

Negative

  1. APScheduler Limitations: Not ideal for distributed environments; may need to upgrade to Celery/Redis for production scale
  2. Background Task Management: FastAPI background tasks aren't persistent; failures could lose progress
  3. Resource Considerations: Email processing is resource-intensive; requires monitoring
  4. Rate Limits: Must manage Gmail API rate limits carefully across users
  5. Complexity: Adds complexity compared to manual processing

Mitigations

  1. State Tracking: Store processing state in Firestore to enable resuming after failures
  2. Graceful Degradation: Implement timeouts and circuit breakers to prevent cascading failures
  3. Monitoring: Add comprehensive logging and metrics for observability
  4. Future Path: Design allows for replacing APScheduler with more robust solutions when needed
  5. Rate Limiting: Implement backoff strategies for API calls

Implementation Plan

Phase 1: Refactoring and Model Updates

  1. Move EmailMessage from application to domain layer
  2. Rename services for clarity
  3. Update ProviderUser model to include email processing status

Phase 2: Background Processing

  1. Extend provider user watcher for Gmail activation detection
  2. Implement background email processing tasks

Phase 3: Scheduled Syncing

  1. Implement the email sync scheduler
  2. Integrate scheduler with FastAPI application

Phase 4: Monitoring and Optimization

  1. Add detailed logging for all email operations
  2. Implement rate limiting and error handling
  3. Add metrics for monitoring performance

References