Skip to content

Email Processing Use Case

Overview

The Email Processing use case coordinates the comprehensive processing of email data across all application users. It orchestrates the entire email pipeline from fetching raw email content to generating AI-powered insights and semantic embeddings with full automation and real-time progress tracking.

System Components

Core Orchestrator

EmailProcessingOrchestrator

The central coordinator for all email processing operations:

class EmailProcessingOrchestrator:
    """Orchestrates the complete email processing pipeline.

    This class manages the end-to-end email processing workflow including
    fetching, parsing, storage, summarization, tagging, and embedding
    operations with support for pagination and batch processing.
    """

    def __init__(
        self,
        app_user: AppUser,
        load_from_disk: bool = False,
        max_results: int = 100,
        date_range: tuple[str, str] | None = None,
        page_token: str | None = None,
    ) -> None:
        """Initialize with user context and processing parameters."""

    def store(self) -> dict:
        """Store emails and return processing results with pagination info."""

    def summarize(self) -> dict:
        """Generate AI summaries for unsummarized emails."""

    def tag(self) -> dict:
        """Extract relevant tags using LLM analysis."""

    def embed(self) -> dict:
        """Create vector embeddings for semantic search."""

Automated Sync System

1. ProviderUser Watcher

Monitors Gmail service activation and triggers automatic sync:

def on_provider_users_update():
    """Watch for provider user updates and handle Gmail activation.

    This function sets up a Firestore listener for the providerUsers collection
    and handles Gmail service activation by triggering initial email sync.
    """

def _detect_gmail_activation(old_doc_data: dict, new_doc_data: dict) -> bool:
    """Detect if Gmail service was just activated."""

def _trigger_initial_email_sync(provider_user_id: str, doc_data: dict) -> None:
    """Trigger initial email sync for a provider user."""

2. Background Task Orchestration

Coordinates batched, incremental email processing:

async def process_email_batch(
    provider_user_id: str,
    start_date: str,
    end_date: str,
    batch_size: int = 100,
    page_token: str | None = None,
    sync_id: str | None = None,
    background_tasks: Optional[BackgroundTasks] = None
):
    """Process emails for a specific date range and schedule next batch.

    This function coordinates the complete email processing pipeline including
    fetching, storing, summarizing, tagging, and embedding emails for a given
    date range. It handles pagination and schedules subsequent batches.
    """

3. Periodic Sync Scheduler

Ensures ongoing email synchronization:

def setup_email_sync_scheduler(app: FastAPI) -> AsyncIOScheduler:
    """Set up a scheduler for periodic email synchronization.

    This function configures an APScheduler to run hourly email synchronization
    for all users with Gmail service activated.
    """

@scheduler.scheduled_job(IntervalTrigger(hours=1))
async def sync_new_emails():
    """Synchronize new emails for all users with Gmail activated."""

Progress Tracking System

SyncProgress Model

Comprehensive tracking of sync operations:

class SyncProgress(BaseModel):
    """Tracks progress of email sync operations.

    This model provides real-time progress tracking for frontend notifications
    and system monitoring of email synchronization operations.
    """
    sync_id: str
    app_user_id: str
    family_id: str | None
    provider_user_id: str
    type: str  # "email"
    status: str  # "pending" | "in_progress" | "completed" | "failed"
    progress: int  # 0-100
    error: str | None
    started_at: datetime
    updated_at: datetime
    details: dict

FirestoreSyncProgressRepository

Manages sync progress persistence:

class FirestoreSyncProgressRepository:
    """Repository for managing sync progress in Firestore.

    Provides methods for creating, updating, and querying sync progress
    records with proper security and indexing.
    """

    def create(self, sync_progress: SyncProgress) -> SyncProgress:
        """Create a new sync progress record."""

    def update(self, sync_progress: SyncProgress) -> SyncProgress:
        """Update an existing sync progress record."""

    def get_in_progress_by_provider_user_id(self, provider_user_id: str) -> SyncProgress | None:
        """Get any in-progress sync for a provider user."""

    def get_by_provider_user_id(self, provider_user_id: str) -> Generator[SyncProgress, None, None]:
        """Get all sync records for a provider user."""

Use Case Implementations

1. FetchEmailsUseCase (Legacy)

Handles manual email fetching (maintained for backward compatibility):

class FetchEmailsUseCase:
    """Coordinates email fetching across all users.

    This use case retrieves all app users and processes their emails
    through the fetching and storage pipeline, handling pagination
    and batch processing automatically.

    Note: This is primarily used for manual operations. Automated
    sync is handled by the background task system.
    """

    def execute(self, max_results: int = 100, load_from_disk: bool = False) -> None:
        """Execute email fetching for all users."""
        app_users = self.app_user_repository.get_all()

        for app_user_dict in app_users:
            for _app_user_id, app_user in app_user_dict.items():
                EmailProcessingOrchestrator(
                    app_user=app_user,
                    max_results=max_results,
                    load_from_disk=load_from_disk,
                ).store()

2. SummarizeEmailsUseCase (Legacy)

Generates AI-powered summaries for email content:

class SummarizeEmailsUseCase:
    """Coordinates email summarization across all users.

    This use case processes emails through AI summarization to create
    concise, digestible versions of email content while preserving
    key information and context.

    Note: Automated summarization is handled by process_email_batch.
    """

    def execute(self, max_results: int = 100, load_from_disk: bool = False) -> None:
        """Execute email summarization for all users."""
        # Processes only emails that are stored but not yet summarized

3. TagEmailsUseCase (Legacy)

Extracts structured tags from email content:

class TagEmailsUseCase:
    """Coordinates email tagging across all users.

    This use case analyzes email content to extract relevant tags
    and metadata using LLM-based extraction for improved
    categorization and searchability.

    Note: Automated tagging is handled by process_email_batch.
    """

    def execute(self, max_results: int = 100, load_from_disk: bool = False) -> None:
        """Execute email tagging for all users."""
        # Processes only emails that are stored but not yet tagged

4. EmbedEmailsUseCase (Legacy)

Creates vector embeddings for semantic search:

class EmbedEmailsUseCase:
    """Coordinates email embedding across all users.

    This use case creates vector embeddings for email content to
    enable semantic search capabilities and AI-powered email
    discovery and analysis.

    Note: Automated embedding is handled by process_email_batch.
    """

    def execute(self, max_results: int = 100, load_from_disk: bool = False) -> None:
        """Execute email embedding for all users."""
        # Processes only emails that are tagged but not yet embedded

Processing Pipeline

1. Automated Email Sync Flow

```mermaid graph TD A[Gmail Service Activated] → B[ProviderUser Watcher Detects] B → C[Create SyncProgress Record] C → D[Trigger Initial Sync] D → E[process_email_batch]

E --> F[Fetch Email Batch]
F --> G[Store Emails]
G --> H[Update Progress 25%]

H --> I[Summarize Emails]
I --> J[Update Progress 50%]

J --> K[Tag Emails]
K --> L[Update Progress 75%]

L --> M[Embed Emails]
M --> N[Update Progress 90%]

N --> O{More Pages?}
O -->|Yes| P[Schedule Next Page]
P --> E

O -->|No| Q{More Weeks?}
Q -->|Yes| R[Schedule Previous Week]
R --> E

Q -->|No| S[Mark Complete 100%]

T[Hourly Scheduler] --> U[Check All Gmail Users]
U --> V[Sync New Emails Since Last Run]
V --> E

```

2. Progress Tracking Flow

```mermaid graph TD A[Sync Operation Starts] → B[Create SyncProgress] B → C[Status: pending] C → D[Mark In Progress] D → E[Status: in_progress, Progress: 0%]

E --> F[Each Processing Step]
F --> G[Update Progress %]
G --> H[Update Details]

H --> I{Error Occurred?}
I -->|Yes| J[Mark Failed]
J --> K[Status: failed, Error Details]

I -->|No| L{Processing Complete?}
L -->|No| F
L -->|Yes| M[Mark Complete]
M --> N[Status: completed, Progress: 100%]

O[Frontend] --> P[Subscribe to SyncProgress]
P --> Q[Real-time Updates]

```

Data Flow

Processing States

Each email progresses through defined processing states with full tracking:

  1. Raw: Initial email content fetched from Gmail
  2. Processed: Parsed and stored in Turso database
  3. Summarized: AI summary generated and stored
  4. Tagged: Relevant tags extracted and stored
  5. Embedded: Vector embeddings created in Redis

State Tracking

class Email(SQLModel, table=True):
    # Content fields...
    processed: bool = False    # Raw -> Processed
    summarized: bool = False   # Processed -> Summarized
    tagged: bool = False       # Summarized -> Tagged
    embedded: bool = False     # Tagged -> Embedded

class SyncProgress(BaseModel):
    # Tracks overall sync operation progress
    status: str  # pending, in_progress, completed, failed
    progress: int  # 0-100 percentage
    details: dict  # Batch info, counts, error details

Background Processing Integration

Automatic Triggering

The email processing system is automatically triggered by:

  1. ProviderUsersWatcher: When Gmail service is activated

  2. APScheduler: For hourly synchronization of new emails

  3. Background Tasks: For batch processing continuation and pagination

Processing Coordination

# Automatic sync coordination
async def process_email_batch(provider_user_id: str, ...):
    """Coordinate all processing steps for a batch of emails."""

    # 1. Create/update sync progress
    sync_progress = create_or_get_sync_progress(...)

    # 2. Fetch and store emails
    result = orchestrator.store()
    sync_progress.update_progress(25, {"emails_fetched": result["processed_count"]})

    # 3. Generate summaries
    orchestrator.summarize()
    sync_progress.update_progress(50, {"summarization_completed": True})

    # 4. Extract tags
    orchestrator.tag()
    sync_progress.update_progress(75, {"tagging_completed": True})

    # 5. Create embeddings
    orchestrator.embed()
    sync_progress.update_progress(90, {"embedding_completed": True})

    # 6. Handle pagination and next batches
    if result.get("next_page_token"):
        schedule_next_page_batch(...)
    elif should_process_older_emails():
        schedule_previous_week_batch(...)
    else:
        sync_progress.mark_completed()

Sync Deduplication and Concurrency

# Prevent duplicate syncs
def _trigger_initial_email_sync(provider_user_id: str, doc_data: dict) -> None:
    """Trigger initial email sync with deduplication."""

    # Check if sync already in progress
    existing_sync = sync_progress_repo.get_in_progress_by_provider_user_id(provider_user_id)
    if existing_sync:
        logger.info(f"Sync already in progress for {provider_user_id}, skipping")
        return

    # Create new sync and proceed
    sync_progress = SyncProgress(...)
    sync_progress_repo.create(sync_progress)

    # Trigger background processing
    background_tasks.add_task(process_email_batch, ...)

Error Handling Strategy

Graceful Degradation

Each processing step handles errors independently with comprehensive tracking:

  1. Fetch Errors: Log and continue with available emails, update sync progress
  2. Parse Errors: Skip malformed emails, process others, track in details
  3. AI Errors: Fall back to original content, continue processing
  4. Storage Errors: Retry with exponential backoff, mark sync as failed if persistent

State Recovery

Processing state is tracked in SyncProgress to enable recovery:

# Resume processing from last successful state
if sync_progress.status == "failed":
    # Analyze failure details and determine recovery strategy
    if sync_progress.details.get("failed_step") == "fetch":
        # Retry from fetch step
        retry_from_fetch(sync_progress)
    elif sync_progress.details.get("failed_step") == "summarize":
        # Skip to tagging if emails were stored
        retry_from_tagging(sync_progress)

# Automatic retry logic in scheduler
@scheduler.scheduled_job(IntervalTrigger(hours=6))
async def retry_failed_syncs():
    """Retry failed syncs that might be recoverable."""
    failed_syncs = sync_progress_repo.get_failed_syncs_for_retry()
    for sync in failed_syncs:
        if should_retry_sync(sync):
            retry_sync_operation(sync)

Performance Considerations

Batch Processing

  • Email Fetching: 100 emails per API call with pagination
  • Summarization: 10 emails per LLM batch to optimize token usage
  • Tagging: 10 emails per LLM batch for efficient processing
  • Embedding: 10 documents per vector operation for memory management

Resource Management

  • User-specific database connections prevent interference
  • Vector embeddings use user-isolated Redis indexes

  • Background tasks prevent UI blocking during sync operations

  • Memory usage controlled through batch sizing and pagination
  • Progress tracking enables monitoring and resource allocation

Sync Optimization

  • Incremental Sync: Only process emails newer than last sync timestamp
  • Deduplication: Prevent multiple concurrent syncs for same user
  • Rate Limiting: Respect Gmail API quotas with backoff strategies
  • Prioritization: Initial sync processes recent emails first

Security and Privacy

Data Isolation

  • Each user has dedicated Turso database for email storage
  • Vector embeddings stored in user-specific Redis indexes
  • Processing state isolated per user in Firestore
  • Sync progress records include family_id for proper access control

Access Control

  • OAuth tokens securely managed per user with refresh handling
  • Processing only accessible to authenticated users
  • Email content encrypted at rest in databases
  • Sync progress records secured with Firestore security rules
  • Family-level access control for shared progress visibility

Data Retention

  • Email processing respects user privacy settings
  • Sync progress records include retention policies
  • Failed sync details sanitized to remove sensitive information
  • Automatic cleanup of old sync records after configurable period

Monitoring and Metrics

Processing Metrics

  • Emails processed per batch with detailed timing
  • Processing time per operation (fetch, summarize, tag, embed)
  • Error rates by processing step and user
  • API quota usage tracking and alerting
  • Sync completion rates and failure analysis

Health Monitoring

  • Processing status per user with real-time dashboards
  • Background task health and queue monitoring
  • Database connection status and performance
  • Vector store performance and memory usage
  • Scheduler job execution tracking

Progress Tracking Metrics

# Example sync progress details structure
{
    "trigger": "gmail_activation",  # or "periodic_sync"
    "started_at": "2024-01-15T10:00:00Z",
    "batch_info": {
        "current_batch": 3,
        "total_batches_estimated": 10,
        "emails_per_batch": 100
    },
    "processing_stats": {
        "emails_fetched": 250,
        "emails_summarized": 200,
        "emails_tagged": 150,
        "emails_embedded": 100
    },
    "performance": {
        "fetch_time_ms": 1500,
        "summarize_time_ms": 5000,
        "tag_time_ms": 3000,
        "embed_time_ms": 2000
    },
    "api_usage": {
        "gmail_api_calls": 3,
        "llm_api_calls": 20,
        "vector_operations": 10
    }
}

Testing Strategy

Unit Tests

  • Mock all external dependencies (Gmail API, LLM services, Firestore)
  • Test each processing step independently with proper error simulation
  • Validate error handling and recovery mechanisms
  • Test sync progress tracking and state transitions
  • Verify deduplication and concurrency control

Integration Tests

  • Test complete pipeline with test data and real repositories
  • Validate pagination handling across multiple batches
  • Test background task coordination and scheduling
  • Verify Firestore watcher integration and Gmail activation detection
  • Test scheduler integration with FastAPI lifecycle

Performance Tests

  • Load testing with large email volumes and concurrent users
  • Memory usage validation during batch processing
  • API rate limit handling and backoff strategies
  • Database performance under concurrent sync operations
  • Vector store performance with large embedding datasets

End-to-End Tests

class TestEmailSyncEndToEnd:
    """End-to-end tests for the complete email sync system."""

    async def test_gmail_activation_triggers_sync(self):
        """Test that Gmail activation automatically triggers email sync."""
        # Simulate Gmail service activation in Firestore
        # Verify watcher detects activation
        # Verify sync progress record created
        # Verify background task scheduled
        # Verify emails processed and progress updated

    async def test_periodic_sync_processes_new_emails(self):
        """Test that periodic sync processes only new emails."""
        # Setup user with completed initial sync
        # Add new emails to Gmail
        # Trigger periodic sync
        # Verify only new emails processed
        # Verify progress tracking accurate

    async def test_sync_failure_recovery(self):
        """Test sync failure handling and recovery."""
        # Simulate various failure scenarios
        # Verify error tracking in sync progress
        # Test retry mechanisms
        # Verify partial progress preservation

Migration and Deployment

Backward Compatibility

  • Legacy use cases maintained for manual operations
  • Existing email processing workflows continue to function
  • Gradual migration path from manual to automated sync
  • Configuration flags to enable/disable automated features

Deployment Considerations

  • APScheduler requires single-instance deployment or coordination
  • Firestore watchers need proper cleanup on shutdown
  • Background tasks require adequate worker capacity
  • Monitoring and alerting setup for production deployment

Configuration

# Email sync configuration
EMAIL_SYNC_CONFIG = {
    "scheduler_enabled": True,
    "sync_interval_hours": 1,
    "batch_size": 100,
    "max_concurrent_syncs": 5,
    "retry_failed_syncs": True,
    "retry_interval_hours": 6,
    "progress_retention_days": 30
}

Future Enhancements

Scalability Improvements

  • Replace APScheduler with distributed task queue (Celery + Redis)
  • Implement horizontal scaling for background workers
  • Add database sharding for high-volume users
  • Optimize vector operations for large-scale deployments

Feature Enhancements

  • Real-time email notifications via WebSocket
  • Advanced email filtering and categorization
  • Email thread analysis and relationship mapping
  • Integration with calendar events and transaction matching

Monitoring Enhancements

  • Real-time sync progress dashboards
  • Predictive failure detection and prevention
  • Advanced analytics on email processing patterns
  • Cost optimization tracking for API usage

References