Skip to content

Provider Users Watcher Audit

Overview

This document outlines potential issues found in the Provider Users Watcher code that could cause crashes in staging. The watcher is a critical component that handles OAuth token exchange, service activation, and email sync initialization.

Critical Components

1. OAuth Token Exchange

credentials = await self.event_loop.run_in_executor(None, fetch_token)
refresh_token = credentials.refresh_token
access_token = credentials.token
granted_scopes = credentials.scopes
Issues: - No timeout on token exchange operation - No retry mechanism for transient failures - No validation of token expiration - No handling of revoked tokens

2. Service State Management

self.previous_services[provider_user_id] = current_services.copy()
Issues: - In-memory state could be lost on service restart - No cleanup of stale entries - Potential memory leak if provider users are deleted - No validation of service state transitions

3. Document Change Handling

def on_provider_users_update(
    self,
    doc_snapshot: list[DocumentSnapshot],
    changes: list[DocumentChange],
    read_time: str,
) -> None:
Issues: - No handling of Firestore connection drops - No backoff strategy for reconnection - Potential missed updates during reconnection - No validation of document schema changes

4. Background Task Management

asyncio.create_task(run_email_sync())
Issues: - Tasks are created but never tracked - No limit on concurrent tasks - No task cancellation on watcher shutdown - No task timeout handling

5. Error Handling and Recovery

except Exception as e:
    self.logger.report(
        f"Error in handle_modified for provider user {provider_user_id}",
        e,
        extra={"provider_user_id": provider_user_id},
    )
Issues: - Generic exception handling masks specific errors - No recovery mechanism for failed operations - No circuit breaker for repeated failures - Inconsistent error state cleanup

6. Repository Operations

provider_user = await self.provider_user_repo.get_by_id(doc.id)
if not provider_user:
    raise ValueError(f"Provider user {doc.id} not found")
Issues: - No transaction handling for multi-step operations - No retry mechanism for transient failures - No validation of repository responses - No handling of concurrent updates

7. Event Loop Management

future = asyncio.run_coroutine_threadsafe(
    self.handle_change(change), self.event_loop
)
Issues: - No handling of event loop shutdown - Potential deadlock in synchronous callbacks - No timeout on coroutine execution - No handling of event loop exceptions

8. Memory Management

self._futures: list[asyncio.Future] = []
Issues: - Futures list grows unbounded - No cleanup of completed futures - No memory limits on in-memory state - Potential memory leaks in long-running operations

9. Initialization and Shutdown

def unsubscribe() -> None:
    listener.unsubscribe()
    for future in watcher._futures:
        if not future.done():
            future.cancel()
Issues: - No graceful shutdown of in-progress operations - No cleanup of resources on shutdown - No handling of partial initialization - No validation of initialization state

10. Logging and Monitoring

self.logger.report(
    "Error in on_provider_users_update", e, extra={"error": str(e)}
)
Issues: - Inconsistent error logging patterns - No structured logging for monitoring - No metrics for operation success/failure - No alerting for critical failures

Next Steps

Priority should be given to: 1. Implementing proper error recovery mechanisms 2. Adding timeouts and retries for external operations 3. Improving task management and resource cleanup 4. Adding proper transaction handling 5. Implementing comprehensive monitoring 6. Adding proper shutdown handling 7. Improving memory management 8. Adding proper validation at all levels 9. Implementing proper state management 10. Adding proper logging and metrics