ADR-005: Email Integration System¶
Status¶
Implemented
Context¶
OmniButler needs to integrate with users' email accounts (currently focusing on Gmail) to provide comprehensive financial management capabilities. This integration requires:
- Processing emails when users first connect their Gmail account
- Periodically syncing new emails to keep data current
- Parsing, summarizing, tagging, and storing emails securely
- Managing this process efficiently across many users
The system must handle large volumes of emails without overwhelming resources, process emails incrementally, and maintain a clean architecture consistent with OmniButler's design principles.
Current Implementation¶
The email integration system currently consists of the following components:
Email Fetching & Processing¶
EmailService(indomain/services/email_service.py): Handles fetching emails from Gmail using OAuth tokensEmailProcessingService(indomain/email_processing/email_processing_service.py): Orchestrates the parsing, summarizing, tagging, and embedding of emails
Data Models¶
EmailMessage(inapplication/dtos/response/email_message.py): DTO for email messages (incorrectly placed in application layer)Email(ininfrastructure/integrations/email/models/email.py): SQL model for storing emails in Turso
Infrastructure Components¶
GmailMessageDownloader(ininfrastructure/email_providers/gmail_message_downloader.py): Downloads email content from GmailGmailMessageIdsFetcher(ininfrastructure/email_providers/gmail_message_ids_fetcher.py): Fetches email IDs from GmailGmail(ininfrastructure/email_providers/gmail.py): Wrapper for Gmail-specific operationsTursoEmailRepository(ininfrastructure/database/turso/turso_email_repository.py): Handles storage and retrieval of emails
Use Cases¶
FetchEmailsUseCase(inapplication/use_cases/fetch_emails.py)SummarizeEmailsUseCase(inapplication/use_cases/summarize_emails.py)TagEmailsUseCase(inapplication/use_cases/tag_emails.py)EmbedEmailsUseCase(inapplication/use_cases/embed_emails.py)
Execution¶
Currently, email processing is triggered manually through command-line scripts, with no automation or background processing.
Architectural Issues to Address¶
- Manual Processing: Email processing is currently manual, requiring command-line execution
- No Progress Tracking: The system doesn't track processing state or progress
- Clean Architecture Violations: Some components are in incorrect layers
- No Periodic Syncing: No mechanism for regular email updates
Decision¶
We will implement a two-part email integration system:
1. Initial Gmail Connection Processing¶
- Provider User Watcher: Extend the existing
ProviderUsersWatcherininfrastructure/database/firestore/watchers/provider_users.pyto detect when Gmail service is connected - Background Task Processing: Use FastAPI background tasks to process emails incrementally
- Batched Processing: Process in weekly batches, starting with the most recent week
- Chained Tasks: Each batch spawns the next batch for older emails until complete
- Progress Tracking: Store processing state in Firestore
2. Periodic Email Sync¶
- Simple Scheduler: Use APScheduler integrated with FastAPI to run hourly syncs
- Incremental Processing: Only fetch and process emails newer than the last sync
- Tracking: Maintain
last_sync_timestampper provider user
Technical Implementation¶
1. Clean Architecture Refactoring¶
- Move
EmailMessageDTO fromapplication/dtos/response/email_message.pytodomain/models/email_message.py - Rename components for clarity:
EmailService→GmailFetchingServiceto clarify its specific roleEmailProcessingService→EmailProcessingOrchestratorto better reflect its orchestration role
2. Enhanced Provider User Watcher¶
Extend the existing ProviderUsersWatcher to monitor Gmail connections:
def on_provider_users_update(
self,
_doc_snapshot: list[DocumentSnapshot],
changes: list[DocumentChange],
_read_time: str,
):
# Existing code for handling auth codes
# New code for detecting Gmail service activation
for change in changes:
if change.type == ChangeType.MODIFIED:
doc = change.document
doc_data = doc.to_dict()
# Check if Gmail service was just activated
if "services" in doc_data and doc_data["services"].get("gmail") is True:
# Get previous document state
old_doc = change.old_document().to_dict() if change.old_document() else {}
old_services = old_doc.get("services", {})
# If Gmail was previously not active
if not old_services.get("gmail"):
logger.info(f"Gmail service activated for provider user {doc.id}")
# Get provider user from repository
provider_user_repo = FirestoreProviderUserRepository()
provider_user = provider_user_repo.get_by_id(doc.id)
if not provider_user:
logger.error(f"Provider user {doc.id} not found")
continue
# Initialize email processing state
provider_user.email_processing_status = {
"started_at": datetime.now().isoformat(),
"in_progress": True,
"processed_until": None,
"total_processed": 0,
"last_sync_timestamp": None,
"error": None
}
provider_user_repo.update(provider_user)
# Trigger background processing
from application.background_tasks.email_processing import process_email_batch
# Get current date and calculate one week ago
end_date = datetime.now().isoformat()
start_date = (datetime.now() - timedelta(days=7)).isoformat()
# Start processing most recent emails
process_email_batch(
provider_user_id=doc.id,
start_date=start_date,
end_date=end_date,
batch_size=100
)
3. Email Processing Background Task¶
Create a new module for background tasks:
# application/background_tasks/email_processing.py
from datetime import datetime, timedelta
from typing import Optional
from fastapi import BackgroundTasks
from application.dependencies import get_background_tasks
from domain.email_processing.email_processing_orchestrator import EmailProcessingOrchestrator
from infrastructure.database.firestore.provider_user_repository import FirestoreProviderUserRepository
from infrastructure.database.firestore.app_user_repository import FirestoreAppUserRepository
from infrastructure.logging import get_logger
logger = get_logger(__name__)
async def process_email_batch(
provider_user_id: str,
start_date: str,
end_date: str,
batch_size: int = 100,
page_token: str | None = None,
background_tasks: Optional[BackgroundTasks] = None
):
"""Process emails for a specific date range and schedule next batch"""
if background_tasks is None:
background_tasks = get_background_tasks()
try:
# Get provider user data using repository pattern
provider_user_repo = FirestoreProviderUserRepository()
provider_user = provider_user_repo.get_by_id(provider_user_id)
if not provider_user:
logger.error(f"Provider user {provider_user_id} not found")
return
# Update processing status via repository
provider_user.email_processing_status = provider_user.email_processing_status or {}
provider_user.email_processing_status["in_progress"] = True
provider_user.email_processing_status["processed_until"] = start_date
provider_user_repo.update(provider_user)
# Get app user via repository
app_user_repo = FirestoreAppUserRepository()
app_user = app_user_repo.get_by_id(provider_user.appUserId)
if not app_user:
logger.error(f"App user {provider_user.appUserId} not found")
provider_user.email_processing_status["in_progress"] = False
provider_user.email_processing_status["error"] = "App user not found"
provider_user_repo.update(provider_user)
return
# Process emails for the date range
orchestrator = EmailProcessingOrchestrator(
app_user=app_user,
max_results=batch_size,
load_from_disk=False,
date_range=(start_date, end_date),
page_token=page_token
)
# Fetch and store emails
result = orchestrator.store()
# Check if we have a next page token
next_page_token = result.get("next_page_token")
processed_count = result.get("processed_count", 0)
# Summarize emails
orchestrator.summarize()
# Tag emails
orchestrator.tag()
# Embed emails
orchestrator.embed()
# Update status with completion info via repository
provider_user = provider_user_repo.get_by_id(provider_user_id) # Refresh to get latest state
current_processed = provider_user.email_processing_status.get("total_processed", 0)
provider_user.email_processing_status["total_processed"] = current_processed + processed_count
provider_user_repo.update(provider_user)
if next_page_token:
# Process next page of emails for the same date range
logger.info(f"Processing next page of emails for {provider_user_id} in date range {start_date} to {end_date}")
background_tasks.add_task(
process_email_batch,
provider_user_id=provider_user_id,
start_date=start_date,
end_date=end_date,
batch_size=batch_size,
page_token=next_page_token
)
else:
# All emails for current date range processed, move to next date range
# Calculate the next date range (one week earlier)
new_end_date = datetime.fromisoformat(start_date)
new_start_date = new_end_date - timedelta(days=7)
# Update processed until date via repository
provider_user = provider_user_repo.get_by_id(provider_user_id) # Refresh
provider_user.email_processing_status["processed_until"] = start_date
provider_user_repo.update(provider_user)
# Check if we need to process more (e.g., if emails older than 1 year, stop)
one_year_ago = datetime.now() - timedelta(days=365)
if new_start_date > one_year_ago:
# Schedule next batch
logger.info(f"Scheduling next batch for {provider_user_id}: {new_start_date} to {new_end_date}")
background_tasks.add_task(
process_email_batch,
provider_user_id=provider_user_id,
start_date=new_start_date.isoformat(),
end_date=new_end_date.isoformat(),
batch_size=batch_size
)
else:
# Mark processing as complete via repository
logger.info(f"Email processing completed for {provider_user_id}")
provider_user = provider_user_repo.get_by_id(provider_user_id) # Refresh
provider_user.email_processing_status["in_progress"] = False
provider_user.email_processing_status["last_sync_timestamp"] = datetime.now().isoformat()
provider_user_repo.update(provider_user)
except Exception as e:
logger.error(f"Error processing emails for {provider_user_id}: {str(e)}")
# Update error status via repository
provider_user_repo = FirestoreProviderUserRepository()
provider_user = provider_user_repo.get_by_id(provider_user_id)
if provider_user:
provider_user.email_processing_status = provider_user.email_processing_status or {}
provider_user.email_processing_status["in_progress"] = False
provider_user.email_processing_status["error"] = str(e)
provider_user_repo.update(provider_user)
4. Periodic Email Sync¶
Create a scheduler for periodic email synchronization:
# application/schedulers/email_sync.py
from datetime import datetime, timedelta
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from apscheduler.triggers.interval import IntervalTrigger
from fastapi import FastAPI
from infrastructure.database.firestore.provider_user_repository import FirestoreProviderUserRepository
from infrastructure.logging import get_logger
from application.background_tasks.email_processing import process_email_batch
logger = get_logger(__name__)
def setup_email_sync_scheduler(app: FastAPI):
"""Set up a scheduler for periodic email synchronization"""
scheduler = AsyncIOScheduler()
@scheduler.scheduled_job(IntervalTrigger(hours=1))
async def sync_new_emails():
"""Synchronize new emails for all users with Gmail activated"""
logger.info("Starting scheduled email sync")
# Get all provider users with Gmail service activated
provider_user_repo = FirestoreProviderUserRepository()
provider_users = provider_user_repo.get_all_with_gmail_service()
for provider_user in provider_users:
# Skip users with in-progress processing
if provider_user.email_processing_status and provider_user.email_processing_status.get("in_progress"):
logger.info(f"Skipping {provider_user.id} - processing already in progress")
continue
# Get last sync timestamp or default to 24 hours ago
last_sync = None
if provider_user.email_processing_status:
last_sync = provider_user.email_processing_status.get("last_sync_timestamp")
if not last_sync:
last_sync = datetime.now() - timedelta(hours=24)
# Process emails since last sync
end_date = datetime.now().isoformat()
start_date = last_sync.isoformat()
logger.info(f"Syncing emails for {provider_user.id} from {start_date} to {end_date}")
# Process in background
await process_email_batch(
provider_user_id=provider_user.id,
start_date=start_date,
end_date=end_date,
batch_size=100
)
# Start the scheduler
scheduler.start()
# Store scheduler in app state to prevent garbage collection
app.state.email_sync_scheduler = scheduler
# Register shutdown event to close scheduler gracefully
@app.on_event("shutdown")
def shutdown_scheduler():
if app.state.email_sync_scheduler:
app.state.email_sync_scheduler.shutdown()
5. Data Model Extensions¶
Add necessary fields to the ProviderUser model:
# domain/models/provider_user.py
class EmailProcessingStatus(BaseModel):
started_at: datetime | None = None
in_progress: bool = False
processed_until: datetime | None = None
total_processed: int = 0
last_sync_timestamp: datetime | None = None
error: str | None = None
class ProviderUser(BaseModel):
# Existing fields
# New fields
email_processing_status: EmailProcessingStatus | None = None
previous_services: dict = Field(default_factory=dict)
6. Extending EmailProcessingOrchestrator¶
Update the EmailProcessingOrchestrator (previously EmailProcessingService) to support pagination:
# domain/email_processing/email_processing_orchestrator.py
class EmailProcessingOrchestrator:
def __init__(
self,
app_user: AppUser,
load_from_disk: bool = False,
max_results: int = 100,
date_range: tuple[str, str] | None = None,
page_token: str | None = None
) -> None:
self.gmail_fetching_service = GmailFetchingService(
app_user=app_user,
load_from_disk=load_from_disk,
max_results=max_results,
date_range=date_range,
page_token=page_token
)
self.email_repository = TursoEmailRepository(
app_user_id=app_user.appUserId,
turso_db_access_token=app_user.database_info.access_token,
)
self.app_user = app_user
self.emails: list[EmailMessage] = []
self.processed_emails: list[EmailMessage] = []
self.page_token = page_token
# ... other methods ...
def store(self: Self, in_batches_of: int = 20) -> dict:
"""Store emails and return processing results"""
if not self.processed_emails:
self.process()
new_emails = [
email
for email in self.processed_emails
if email.id not in self._existing_email_ids()
]
# ... existing storage code ...
return {
"processed_count": len(new_emails),
"next_page_token": self.page_token
}
7. Application Startup Integration¶
Register the email sync scheduler during application startup:
# application/app.py
from application.schedulers.email_sync import setup_email_sync_scheduler
def create_app():
app = FastAPI()
# Register routers, middleware, etc.
# Set up email sync scheduler
setup_email_sync_scheduler(app)
return app
Consequences¶
Positive¶
- Automated Processing: Emails are processed automatically after Gmail connection and periodically thereafter
- Non-blocking Architecture: Processing happens in background tasks, avoiding request blocking
- Incremental Processing: Handles large mailboxes efficiently in manageable batches
- Current Data: Regular syncing ensures data stays up-to-date
- Clean Architecture: Refactoring ensures proper separation of concerns
- Progress Tracking: Status tracking enables monitoring and error recovery
- Resource Efficiency: Batched processing prevents overwhelming system resources
- Scalable Approach: Design can scale to more complex implementations if needed
- Progressive Enhancement: Allows for starting simple and enhancing features over time
- Complete Processing: Pagination ensures all emails within a time period are processed
Negative¶
- APScheduler Limitations: Not ideal for distributed environments; may need to upgrade to Celery/Redis for production scale
- Background Task Management: FastAPI background tasks aren't persistent; failures could lose progress
- Resource Considerations: Email processing is resource-intensive; requires monitoring
- Rate Limits: Must manage Gmail API rate limits carefully across users
- Complexity: Adds complexity compared to manual processing
Mitigations¶
- State Tracking: Store processing state in Firestore to enable resuming after failures
- Graceful Degradation: Implement timeouts and circuit breakers to prevent cascading failures
- Monitoring: Add comprehensive logging and metrics for observability
- Future Path: Design allows for replacing APScheduler with more robust solutions when needed
- Rate Limiting: Implement backoff strategies for API calls
Implementation Plan¶
Phase 1: Refactoring and Model Updates¶
- Move
EmailMessagefrom application to domain layer - Rename services for clarity
- Update
ProviderUsermodel to include email processing status
Phase 2: Background Processing¶
- Extend provider user watcher for Gmail activation detection
- Implement background email processing tasks
Phase 3: Scheduled Syncing¶
- Implement the email sync scheduler
- Integrate scheduler with FastAPI application
Phase 4: Monitoring and Optimization¶
- Add detailed logging for all email operations
- Implement rate limiting and error handling
- Add metrics for monitoring performance