Skip to content

WhatsApp Message Handling

Related Documentation:

Overview

This document details how OmniButler processes and responds to WhatsApp messages. It covers the use cases, business logic, and specialized handling for different message types. The message handling system processes incoming WhatsApp messages, maintains conversation context, generates contextually relevant responses using language models, and delivers these responses back to users.

Key Features

  • Processing of different types of WhatsApp messages (text, media, location data)
  • Contextual conversation memory to maintain state across interactions
  • Multi-part message support for handling responses exceeding character limits
  • Error handling and recovery for failed message processing
  • Media processing capabilities (images, audio, location data)
  • Integration with Twilio API for message delivery

Flow Diagram

```mermaid sequenceDiagram participant User as WhatsApp User participant Twilio as Twilio API participant API as API Endpoint participant UseCase as WhatsappConversationUseCase participant Repository as WhatsAppUserMappingRepository participant LLM as LLMService participant Memory as Memory System participant TwilioSvc as TwilioMessagingService

User->>Twilio: Send WhatsApp message
Twilio->>API: Webhook with message data
API->>API: Validate Twilio signature
API->>UseCase: Process message (background task)

UseCase->>Repository: Get user mapping
Repository-->>UseCase: Return user mapping
UseCase->>Memory: Store message context
UseCase->>UseCase: Determine message type

alt Text Message
    UseCase->>LLM: Generate text response
else Image Message
    UseCase->>LLM: Process image and generate response
else Audio Message
    UseCase->>LLM: Transcribe audio and generate response
else Location Message
    UseCase->>LLM: Process location data and generate response
end

LLM->>Memory: Access conversation history
LLM-->>UseCase: Return AI-generated response

UseCase->>UseCase: Split long responses
UseCase->>TwilioSvc: Send message parts
TwilioSvc->>Twilio: Deliver messages
Twilio->>User: Display response in WhatsApp

```

Class Hierarchy

Primary Classes

  • WhatsappConversationUseCase
  • Main orchestrator for WhatsApp conversations
  • Handles the logic for processing different message types
  • Manages conversation flow and response generation

  • LLMService

  • Generates AI responses using language models
  • Supports different media types (text, audio, images)
  • Maintains conversation memory for context

  • TwilioMessagingService

  • Sends messages back to users via Twilio
  • Handles multi-part message delivery
  • Manages message formatting for WhatsApp

  • FirestoreWhatsAppUserMappingRepository

  • Stores and retrieves WhatsApp user mappings
  • Manages token generation and verification
  • Tracks message status and state transitions

Supporting Classes

  • WhatsappMessageDetails
  • Data structure containing all message metadata from Twilio
  • Includes sender/recipient information, message content, media URLs

  • WhatsAppUserMapping

  • Pydantic model representing the mapping between WhatsApp numbers and users
  • Tracks mapping state, tokens, and context

API and Interfaces

Key Methods

Process Message

async def process_request(self, call_details: WhatsappMessageDetails)

Purpose: Main entry point for processing incoming WhatsApp messages.

Parameters:

  • call_details: WhatsappMessageDetails object containing all message information

Processing Flow:

  1. Identifies message type (text, image, audio, location)
  2. Routes to specialized handlers for different media types
  3. Stores conversation context for future reference
  4. Generates response using LLM
  5. Delivers response back to the user

Error Handling:

  • Catches all exceptions during processing
  • Attempts to send a friendly error message if processing fails

Handle Media Messages

async def _handle_image_message(self, call_details: WhatsappMessageDetails)

Purpose: Processes image messages, extracting content and generating relevant responses.

Parameters:

  • call_details: WhatsappMessageDetails containing image metadata and URLs

Processing Flow:

  1. Retrieves image content from Twilio's media URL
  2. Processes the image using LLM service with image capabilities
  3. Generates a contextually relevant response about the image
  4. Sends the response back to the user

Send Response

async def _send_response(self, response_parts: list[str], call_details: WhatsappMessageDetails, **kwargs)

Purpose: Delivers responses back to the user via Twilio.

Parameters:

  • response_parts: List of message parts to send (for long responses)
  • call_details: Original message details for routing the response
  • **kwargs: Additional options for message delivery

Processing Flow:

  1. Determines recipient phone number
  2. Formats messages according to WhatsApp requirements
  3. Sends each message part in sequence
  4. Handles any delivery errors

Integration Points

Background Task Processing

The message handling is performed asynchronously in a background task to provide immediate response to Twilio's webhook:

async def process_whatsapp_message(call_details: WhatsappMessageDetails) -> None:
    """
    Process a WhatsApp message in a background task.

    Args:
        call_details: The message details from Twilio
    """
    try:
        # Get WhatsApp number from the From field
        whatsapp_number = call_details.From

        # Identify the user from WhatsApp number
        repository = FirestoreWhatsAppUserMappingRepository()
        mapping = await repository.get_by_whatsapp_number(whatsapp_number)

        if not mapping:
            # Handle unknown user
            # ...
            return

        # Create and execute use case
        use_case = WhatsappConversationUseCase(
            user_mapping_repository=repository,
            app_user_id=str(mapping.app_user_id)
        )

        await use_case.process_request(call_details)

    except Exception as e:
        logger.error(f"Error processing WhatsApp message: {e}", exc_info=True)

Language Model Integration

The use case integrates with the LLMService to generate AI-powered responses. This service can be configured to use different language models (OpenAI, TogetherAI) based on configuration.

self.llm_service = LLMService(
    user_id=user_id,
    format_=LLMOutputFormats.whatsapp,
)

Firestore Integration

User mappings are stored in Firestore, allowing for efficient retrieval and updates:

# Get user mapping
repository = FirestoreWhatsAppUserMappingRepository()
mapping = repository.get_by_whatsapp_number(whatsapp_number)

# Update mapping context
repository.update_context(mapping.id, new_context)

Implementation Details

Message Types and Specialized Handling

The use case detects the type of incoming message and routes it to specialized handlers:

  1. Text Messages: Processed directly by the LLM service
  2. Images: Processed by the _handle_image_message method
  3. Audio: Processed by setting the media type to MediaType.AUDIO
  4. Location: Processed by the _handle_location_message method

Long Response Handling

WhatsApp has a 1600 character limit per message. For longer responses, the format_response_for_whatsapp function divides them into multiple parts while preserving formatting:

def format_response_for_whatsapp(self, response: str) -> list[str]:
    """
    Format a response for WhatsApp, including splitting if necessary.

    WhatsApp has a 1600 character limit per message. This method:
    1. Checks if the response exceeds the limit
    2. If within limit, returns as a single message
    3. If exceeds limit, splits at appropriate boundaries (newlines or spaces)
    4. Adds part numbers to each segment (e.g., "(1/3) Message part...")

    Args:
        response (str): Response text to format

    Returns:
        List[str]: List of message parts ready to send
    """
    if not response:
        return ["I'm thinking about this. Please wait a moment."]

    # WhatsApp character limit
    MAX_LENGTH = 1600

    # If response is within limit, return as is
    if len(response) <= MAX_LENGTH:
        return [response]

    # Split response into parts
    parts = []
    remaining = response
    part_number = 1
    total_parts = (len(response) + MAX_LENGTH - 1) // MAX_LENGTH

    while remaining:
        # Find a good split point
        if len(remaining) <= MAX_LENGTH - 10:  # Account for prefix length
            parts.append(f"({part_number}/{total_parts}) {remaining}")
            break

        # Try to split at a newline, space, or other natural boundary
        split_point = MAX_LENGTH - 10  # Account for prefix length

        # Look for a newline before the limit
        newline_pos = remaining[:split_point].rfind("\n")
        if newline_pos > split_point * 0.5:  # Don't split too early
            split_point = newline_pos + 1  # Include the newline
        else:
            # Look for a space before the limit
            space_pos = remaining[:split_point].rfind(" ")
            if space_pos > split_point * 0.8:  # Don't split too early
                split_point = space_pos + 1  # Include the space

        part_content = remaining[:split_point]
        # Add part number prefix
        part_with_prefix = f"({part_number}/{total_parts}) {part_content}"
        parts.append(part_with_prefix)

        remaining = remaining[split_point:]
        part_number += 1

    return parts

Error Handling Strategy

The use case implements a comprehensive error handling strategy:

  1. Use Case Level: All exceptions in process_request are caught and logged
  2. Specialized Handler Level: Each handler has its own try/except blocks
  3. Graceful Degradation: If processing fails, a friendly error message is sent to the user
  4. Logging: All errors are logged with appropriate context for troubleshooting
try:
    # Process the message
    response = await self.llm_service.generate_response(call_details.Body)
    response_parts = self.format_response_for_whatsapp(response)
    await self._send_response(response_parts, call_details)
except Exception as e:
    logger.error(f"Error processing WhatsApp message: {e}", exc_info=True)

    # Send error message to user
    error_message = "I'm sorry, I encountered an error processing your message. Please try again later."
    await self._send_response([error_message], call_details)

    # Re-raise for higher-level handling
    raise

Context Preservation

WhatsApp conversations maintain context between messages using the mapping's context field:

# Update context with new information
current_context = mapping.context or {}
new_context = {
    "last_message": call_details.Body,
    "last_message_time": datetime.now(UTC).isoformat(),
    "message_count": current_context.get("message_count", 0) + 1,
    "recent_topics": recent_topics,
}

# Merge and update
updated_context = {**current_context, **new_context}
repository.update_context(mapping.id, updated_context)

Security Considerations

  • Message Validation: All incoming messages are validated before processing
  • Signature Verification: Twilio signatures are verified to prevent spoofing
  • User Identification: Messages are processed only for known users
  • Rate Limiting: Excessive message processing is prevented
  • Error Isolation: Message processing errors don't affect other system components

Testing

The message handling implementation includes comprehensive testing:

  1. Unit Tests: Test individual components in isolation
  2. Integration Tests: Test the complete flow with mocked external dependencies
  3. End-to-End Tests: Test the complete flow with real dependencies
@patch("domain.services.llm_service.LLMService.generate_response")
@patch("domain.services.twilio_messaging_service.TwilioMessagingService.send_messages_async")
async def test_process_text_message(self, mock_send, mock_generate):
    # Setup test data
    mock_generate.return_value = "Test response"
    mock_send.return_value = ["SM123"]

    # Create test message
    message = WhatsappMessageDetails(
        From="whatsapp:+1234567890",
        Body="Test message",
        MessageSid="SM123"
    )

    # Process message
    use_case = WhatsappConversationUseCase(user_id="test_user")
    await use_case.process_request(message)

    # Verify LLM was called
    mock_generate.assert_called_once_with("Test message")

    # Verify response was sent
    mock_send.assert_called_once_with(
        response_parts=["Test response"],
        to="whatsapp:+1234567890"
    )