diff --git a/WEBHOOK_SETUP.md b/WEBHOOK_SETUP.md new file mode 100644 index 00000000..ee3a264d --- /dev/null +++ b/WEBHOOK_SETUP.md @@ -0,0 +1,194 @@ +# Step Completion Webhook + +This feature allows you to receive webhook notifications whenever an agent step completes in the Letta agent loop. + +## Architecture + +The webhook service integrates with Letta's execution architecture in two ways: + +### 1. With Temporal (Recommended) + +When using Temporal for agent workflows, webhook calls are wrapped as Temporal activities, providing: +- Built-in retry logic with configurable timeouts +- Full observability in Temporal UI +- Durability guarantees +- Consistent error handling +- Activity history and replay capability + +Webhooks are triggered after the `create_step` activity completes in the Temporal workflow. + +### 2. Without Temporal (Direct Execution) + +For direct agent execution (non-Temporal), webhooks are called directly from the `StepManager` service methods: +- `update_step_success_async()` - When step completes successfully +- `update_step_error_async()` - When step fails with an error +- `update_step_cancelled_async()` - When step is cancelled + +Webhooks are sent after the step status is committed to the database. + +### Common Behavior + +In **both** cases: +- ✅ Webhook failures do not prevent step completion +- ✅ Step is always marked as complete in the database first +- ✅ Webhook delivery is logged for debugging +- ✅ Same authentication and payload format + +## Configuration + +Set the following environment variables to enable webhook notifications: + +### Required + +- **`STEP_COMPLETE_WEBHOOK`**: The URL endpoint that will receive POST requests when steps complete. + - Example: `https://your-app.com/api/webhooks/step-complete` + +### Optional + +- **`STEP_COMPLETE_KEY`**: A secret key used for authentication. + - When set, the webhook service will include this in an `Authorization` header as `Bearer {key}` + - Example: `your-secret-webhook-key-12345` + +## Webhook Payload + +When a step completes, the webhook service will send a POST request with the following JSON payload: + +```json +{ + "step_id": "step-01234567-89ab-cdef-0123-456789abcdef" +} +``` + +## Authentication + +If `STEP_COMPLETE_KEY` is configured, requests will include an Authorization header: + +``` +Authorization: Bearer your-secret-webhook-key-12345 +``` + +Your webhook endpoint should validate this key to ensure requests are coming from your Letta instance. + +## Example Webhook Endpoint + +Here's a simple example of a webhook endpoint (using FastAPI): + +```python +from fastapi import FastAPI, Header, HTTPException +from pydantic import BaseModel +import os + +app = FastAPI() + +class StepCompletePayload(BaseModel): + step_id: str + +WEBHOOK_SECRET = os.getenv("STEP_COMPLETE_KEY") + +@app.post("/api/webhooks/step-complete") +async def handle_step_complete( + payload: StepCompletePayload, + authorization: str = Header(None) +): + # Validate the webhook key + if WEBHOOK_SECRET: + if not authorization or not authorization.startswith("Bearer "): + raise HTTPException(status_code=401, detail="Missing authorization") + + token = authorization.replace("Bearer ", "") + if token != WEBHOOK_SECRET: + raise HTTPException(status_code=401, detail="Invalid authorization") + + # Process the step completion + print(f"Step completed: {payload.step_id}") + + # You can now: + # - Log the step completion + # - Trigger downstream processes + # - Update your application state + # - Send notifications + + return {"status": "success"} +``` + +## Usage Example + +```bash +# Set environment variables +export STEP_COMPLETE_WEBHOOK="https://your-app.com/api/webhooks/step-complete" +export STEP_COMPLETE_KEY="your-secret-webhook-key-12345" + +# Start your Letta server +python -m letta.server +``` + +## When Webhooks Are Sent + +Webhooks are triggered when a step reaches a terminal state: + +1. **Success** - Step completed successfully (`StepStatus.SUCCESS`) +2. **Error** - Step failed with an error (`StepStatus.FAILED`) +3. **Cancelled** - Step was cancelled (`StepStatus.CANCELLED`) + +All three states trigger the webhook with the same payload containing just the `step_id`. + +## Behavior + +- **No webhook URL configured**: The service will skip sending notifications (logged at debug level) +- **Webhook call succeeds**: Returns status 200-299, logged at info level +- **Webhook timeout**: Returns error, logged at warning level (does not fail the step) +- **HTTP error**: Returns non-2xx status, logged at warning level (does not fail the step) +- **Other errors**: Logged at error level (does not fail the step) + +**Important**: Webhook failures do not prevent step completion. The step will be marked as complete in the database regardless of webhook delivery status. This ensures system reliability - your webhook endpoint being down will not block agent execution. + +## Testing + +To test the webhook functionality: + +1. Set up a webhook endpoint (you can use [webhook.site](https://webhook.site) for testing) +2. Configure the environment variables +3. Run an agent and observe webhook calls when steps complete + +```bash +# Example using webhook.site +export STEP_COMPLETE_WEBHOOK="https://webhook.site/your-unique-url" +export STEP_COMPLETE_KEY="test-key-123" + +# Run tests +python -m pytest apps/core/letta/services/webhook_service_test.py -v +``` + +## Implementation Details + +The webhook notification is sent after: +1. The step is persisted to the database +2. Step metrics are recorded + +This ensures that the step data is fully committed before external systems are notified. + +### Temporal Integration + +When using Temporal, the webhook call is executed as a separate activity (`send_step_complete_webhook`) with the following configuration: + +- **Start-to-close timeout**: 15 seconds +- **Schedule-to-close timeout**: 30 seconds +- **Retry behavior**: Wrapped in try-catch to prevent workflow failure on webhook errors + +This allows you to monitor webhook delivery in the Temporal UI and get detailed visibility into any failures. + +### File Locations + +**Core Service:** +- `apps/core/letta/services/webhook_service.py` - HTTP client for webhook delivery + +**Temporal Integration:** +- `apps/core/letta/agents/temporal/activities/send_webhook.py` - Temporal activity wrapper +- `apps/core/letta/agents/temporal/temporal_agent_workflow.py` - Workflow integration +- `apps/core/letta/agents/temporal/constants.py` - Timeout constants + +**Non-Temporal Integration:** +- `apps/core/letta/services/step_manager.py` - Direct calls in update_step_* methods + +**Tests:** +- `apps/core/letta/services/webhook_service_test.py` - Unit tests diff --git a/letta/services/step_manager.py b/letta/services/step_manager.py index ba991d97..2b9a182c 100644 --- a/letta/services/step_manager.py +++ b/letta/services/step_manager.py @@ -21,6 +21,7 @@ from letta.schemas.step import Step as PydanticStep from letta.schemas.step_metrics import StepMetrics as PydanticStepMetrics from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry +from letta.services.webhook_service import WebhookService from letta.utils import enforce_types from letta.validators import raise_on_invalid_id @@ -361,7 +362,13 @@ class StepManager: step.stop_reason = stop_reason.stop_reason await session.commit() - return step.to_pydantic() + pydantic_step = step.to_pydantic() + + # Send webhook notification for step completion + webhook_service = WebhookService() + await webhook_service.notify_step_complete(step_id) + + return pydantic_step @enforce_types @trace_method @@ -402,7 +409,13 @@ class StepManager: step.stop_reason = stop_reason.stop_reason await session.commit() - return step.to_pydantic() + pydantic_step = step.to_pydantic() + + # Send webhook notification for step completion + webhook_service = WebhookService() + await webhook_service.notify_step_complete(step_id) + + return pydantic_step @enforce_types @trace_method @@ -438,7 +451,13 @@ class StepManager: step.stop_reason = stop_reason.stop_reason await session.commit() - return step.to_pydantic() + pydantic_step = step.to_pydantic() + + # Send webhook notification for step completion + webhook_service = WebhookService() + await webhook_service.notify_step_complete(step_id) + + return pydantic_step @enforce_types @trace_method diff --git a/letta/services/webhook_service.py b/letta/services/webhook_service.py new file mode 100644 index 00000000..56509289 --- /dev/null +++ b/letta/services/webhook_service.py @@ -0,0 +1,57 @@ +import logging +import os +from typing import Optional + +import httpx + +logger = logging.getLogger(__name__) + + +class WebhookService: + """Service for sending webhook notifications when steps complete.""" + + def __init__(self): + self.webhook_url = os.getenv("STEP_COMPLETE_WEBHOOK") + self.webhook_key = os.getenv("STEP_COMPLETE_KEY") + + async def notify_step_complete(self, step_id: str) -> bool: + """ + Send a POST request to the configured webhook URL when a step completes. + + Args: + step_id: The ID of the completed step + + Returns: + bool: True if notification was sent successfully, False otherwise + """ + if not self.webhook_url: + logger.debug("STEP_COMPLETE_WEBHOOK not configured, skipping webhook notification") + return False + + try: + headers = {} + if self.webhook_key: + headers["Authorization"] = f"Bearer {self.webhook_key}" + + payload = {"step_id": step_id} + + async with httpx.AsyncClient(timeout=10.0) as client: + response = await client.post( + self.webhook_url, + json=payload, + headers=headers, + ) + response.raise_for_status() + + logger.info(f"Successfully sent step completion webhook for step {step_id}") + return True + + except httpx.TimeoutException: + logger.warning(f"Timeout sending step completion webhook for step {step_id}") + return False + except httpx.HTTPStatusError as e: + logger.warning(f"HTTP error sending step completion webhook for step {step_id}: {e.response.status_code}") + return False + except Exception as e: + logger.error(f"Unexpected error sending step completion webhook for step {step_id}: {e}") + return False diff --git a/letta/services/webhook_service_test.py b/letta/services/webhook_service_test.py new file mode 100644 index 00000000..49206899 --- /dev/null +++ b/letta/services/webhook_service_test.py @@ -0,0 +1,119 @@ +""" +Simple test to verify webhook service functionality. + +To run this test: + python -m pytest letta/services/webhook_service_test.py -v + +To test with actual webhook: + export STEP_COMPLETE_WEBHOOK=https://your-webhook-url.com/endpoint + export STEP_COMPLETE_KEY=your-secret-key + python -m pytest letta/services/webhook_service_test.py -v + +These tests verify the webhook service works in both: +- Temporal mode (when webhooks are called as Temporal activities) +- Non-Temporal mode (when webhooks are called directly from StepManager) +""" + +import os +from unittest.mock import AsyncMock, patch + +import pytest + +from letta.services.webhook_service import WebhookService + + +@pytest.mark.asyncio +async def test_webhook_not_configured(): + """Test that webhook does not send when URL is not configured.""" + with patch.dict(os.environ, {}, clear=True): + service = WebhookService() + result = await service.notify_step_complete("step_123") + assert result is False + + +@pytest.mark.asyncio +async def test_webhook_success(): + """Test successful webhook notification.""" + with patch.dict( + os.environ, + {"STEP_COMPLETE_WEBHOOK": "https://example.com/webhook", "STEP_COMPLETE_KEY": "test-key"}, + ): + service = WebhookService() + + with patch("httpx.AsyncClient") as mock_client: + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.raise_for_status = AsyncMock() + + mock_post = AsyncMock(return_value=mock_response) + mock_client.return_value.__aenter__.return_value.post = mock_post + + result = await service.notify_step_complete("step_123") + + assert result is True + mock_post.assert_called_once() + call_args = mock_post.call_args + assert call_args.kwargs["json"] == {"step_id": "step_123"} + assert call_args.kwargs["headers"]["Authorization"] == "Bearer test-key" + + +@pytest.mark.asyncio +async def test_webhook_without_auth(): + """Test webhook notification without authentication key.""" + with patch.dict(os.environ, {"STEP_COMPLETE_WEBHOOK": "https://example.com/webhook"}, clear=True): + service = WebhookService() + + with patch("httpx.AsyncClient") as mock_client: + mock_response = AsyncMock() + mock_response.status_code = 200 + mock_response.raise_for_status = AsyncMock() + + mock_post = AsyncMock(return_value=mock_response) + mock_client.return_value.__aenter__.return_value.post = mock_post + + result = await service.notify_step_complete("step_123") + + assert result is True + call_args = mock_post.call_args + # Should not have Authorization header + assert "Authorization" not in call_args.kwargs["headers"] + + +@pytest.mark.asyncio +async def test_webhook_timeout(): + """Test webhook notification timeout handling.""" + with patch.dict(os.environ, {"STEP_COMPLETE_WEBHOOK": "https://example.com/webhook"}): + service = WebhookService() + + with patch("httpx.AsyncClient") as mock_client: + import httpx + + mock_post = AsyncMock(side_effect=httpx.TimeoutException("Request timed out")) + mock_client.return_value.__aenter__.return_value.post = mock_post + + result = await service.notify_step_complete("step_123") + + assert result is False + + +@pytest.mark.asyncio +async def test_webhook_http_error(): + """Test webhook notification HTTP error handling.""" + with patch.dict(os.environ, {"STEP_COMPLETE_WEBHOOK": "https://example.com/webhook"}): + service = WebhookService() + + with patch("httpx.AsyncClient") as mock_client: + import httpx + + mock_response = AsyncMock() + mock_response.status_code = 500 + mock_response.raise_for_status = AsyncMock( + side_effect=httpx.HTTPStatusError("Server error", request=None, response=mock_response) + ) + + mock_post = AsyncMock(return_value=mock_response) + mock_client.return_value.__aenter__.return_value.post = mock_post + + result = await service.notify_step_complete("step_123") + + assert result is False