feat: support webhooks for step completions (#5904)
* chore: have letta fire a request after every step * chore: have letta fire a request after every step * chore: temporal * chore: temporal --------- Co-authored-by: Shubham Naik <shub@memgpt.ai>
This commit is contained in:
committed by
Caren Thomas
parent
6fa1a1d6c3
commit
53d2bd0443
194
WEBHOOK_SETUP.md
Normal file
194
WEBHOOK_SETUP.md
Normal file
@@ -0,0 +1,194 @@
|
|||||||
|
# Step Completion Webhook
|
||||||
|
|
||||||
|
This feature allows you to receive webhook notifications whenever an agent step completes in the Letta agent loop.
|
||||||
|
|
||||||
|
## Architecture
|
||||||
|
|
||||||
|
The webhook service integrates with Letta's execution architecture in two ways:
|
||||||
|
|
||||||
|
### 1. With Temporal (Recommended)
|
||||||
|
|
||||||
|
When using Temporal for agent workflows, webhook calls are wrapped as Temporal activities, providing:
|
||||||
|
- Built-in retry logic with configurable timeouts
|
||||||
|
- Full observability in Temporal UI
|
||||||
|
- Durability guarantees
|
||||||
|
- Consistent error handling
|
||||||
|
- Activity history and replay capability
|
||||||
|
|
||||||
|
Webhooks are triggered after the `create_step` activity completes in the Temporal workflow.
|
||||||
|
|
||||||
|
### 2. Without Temporal (Direct Execution)
|
||||||
|
|
||||||
|
For direct agent execution (non-Temporal), webhooks are called directly from the `StepManager` service methods:
|
||||||
|
- `update_step_success_async()` - When step completes successfully
|
||||||
|
- `update_step_error_async()` - When step fails with an error
|
||||||
|
- `update_step_cancelled_async()` - When step is cancelled
|
||||||
|
|
||||||
|
Webhooks are sent after the step status is committed to the database.
|
||||||
|
|
||||||
|
### Common Behavior
|
||||||
|
|
||||||
|
In **both** cases:
|
||||||
|
- ✅ Webhook failures do not prevent step completion
|
||||||
|
- ✅ Step is always marked as complete in the database first
|
||||||
|
- ✅ Webhook delivery is logged for debugging
|
||||||
|
- ✅ Same authentication and payload format
|
||||||
|
|
||||||
|
## Configuration
|
||||||
|
|
||||||
|
Set the following environment variables to enable webhook notifications:
|
||||||
|
|
||||||
|
### Required
|
||||||
|
|
||||||
|
- **`STEP_COMPLETE_WEBHOOK`**: The URL endpoint that will receive POST requests when steps complete.
|
||||||
|
- Example: `https://your-app.com/api/webhooks/step-complete`
|
||||||
|
|
||||||
|
### Optional
|
||||||
|
|
||||||
|
- **`STEP_COMPLETE_KEY`**: A secret key used for authentication.
|
||||||
|
- When set, the webhook service will include this in an `Authorization` header as `Bearer {key}`
|
||||||
|
- Example: `your-secret-webhook-key-12345`
|
||||||
|
|
||||||
|
## Webhook Payload
|
||||||
|
|
||||||
|
When a step completes, the webhook service will send a POST request with the following JSON payload:
|
||||||
|
|
||||||
|
```json
|
||||||
|
{
|
||||||
|
"step_id": "step-01234567-89ab-cdef-0123-456789abcdef"
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Authentication
|
||||||
|
|
||||||
|
If `STEP_COMPLETE_KEY` is configured, requests will include an Authorization header:
|
||||||
|
|
||||||
|
```
|
||||||
|
Authorization: Bearer your-secret-webhook-key-12345
|
||||||
|
```
|
||||||
|
|
||||||
|
Your webhook endpoint should validate this key to ensure requests are coming from your Letta instance.
|
||||||
|
|
||||||
|
## Example Webhook Endpoint
|
||||||
|
|
||||||
|
Here's a simple example of a webhook endpoint (using FastAPI):
|
||||||
|
|
||||||
|
```python
|
||||||
|
from fastapi import FastAPI, Header, HTTPException
|
||||||
|
from pydantic import BaseModel
|
||||||
|
import os
|
||||||
|
|
||||||
|
app = FastAPI()
|
||||||
|
|
||||||
|
class StepCompletePayload(BaseModel):
|
||||||
|
step_id: str
|
||||||
|
|
||||||
|
WEBHOOK_SECRET = os.getenv("STEP_COMPLETE_KEY")
|
||||||
|
|
||||||
|
@app.post("/api/webhooks/step-complete")
|
||||||
|
async def handle_step_complete(
|
||||||
|
payload: StepCompletePayload,
|
||||||
|
authorization: str = Header(None)
|
||||||
|
):
|
||||||
|
# Validate the webhook key
|
||||||
|
if WEBHOOK_SECRET:
|
||||||
|
if not authorization or not authorization.startswith("Bearer "):
|
||||||
|
raise HTTPException(status_code=401, detail="Missing authorization")
|
||||||
|
|
||||||
|
token = authorization.replace("Bearer ", "")
|
||||||
|
if token != WEBHOOK_SECRET:
|
||||||
|
raise HTTPException(status_code=401, detail="Invalid authorization")
|
||||||
|
|
||||||
|
# Process the step completion
|
||||||
|
print(f"Step completed: {payload.step_id}")
|
||||||
|
|
||||||
|
# You can now:
|
||||||
|
# - Log the step completion
|
||||||
|
# - Trigger downstream processes
|
||||||
|
# - Update your application state
|
||||||
|
# - Send notifications
|
||||||
|
|
||||||
|
return {"status": "success"}
|
||||||
|
```
|
||||||
|
|
||||||
|
## Usage Example
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Set environment variables
|
||||||
|
export STEP_COMPLETE_WEBHOOK="https://your-app.com/api/webhooks/step-complete"
|
||||||
|
export STEP_COMPLETE_KEY="your-secret-webhook-key-12345"
|
||||||
|
|
||||||
|
# Start your Letta server
|
||||||
|
python -m letta.server
|
||||||
|
```
|
||||||
|
|
||||||
|
## When Webhooks Are Sent
|
||||||
|
|
||||||
|
Webhooks are triggered when a step reaches a terminal state:
|
||||||
|
|
||||||
|
1. **Success** - Step completed successfully (`StepStatus.SUCCESS`)
|
||||||
|
2. **Error** - Step failed with an error (`StepStatus.FAILED`)
|
||||||
|
3. **Cancelled** - Step was cancelled (`StepStatus.CANCELLED`)
|
||||||
|
|
||||||
|
All three states trigger the webhook with the same payload containing just the `step_id`.
|
||||||
|
|
||||||
|
## Behavior
|
||||||
|
|
||||||
|
- **No webhook URL configured**: The service will skip sending notifications (logged at debug level)
|
||||||
|
- **Webhook call succeeds**: Returns status 200-299, logged at info level
|
||||||
|
- **Webhook timeout**: Returns error, logged at warning level (does not fail the step)
|
||||||
|
- **HTTP error**: Returns non-2xx status, logged at warning level (does not fail the step)
|
||||||
|
- **Other errors**: Logged at error level (does not fail the step)
|
||||||
|
|
||||||
|
**Important**: Webhook failures do not prevent step completion. The step will be marked as complete in the database regardless of webhook delivery status. This ensures system reliability - your webhook endpoint being down will not block agent execution.
|
||||||
|
|
||||||
|
## Testing
|
||||||
|
|
||||||
|
To test the webhook functionality:
|
||||||
|
|
||||||
|
1. Set up a webhook endpoint (you can use [webhook.site](https://webhook.site) for testing)
|
||||||
|
2. Configure the environment variables
|
||||||
|
3. Run an agent and observe webhook calls when steps complete
|
||||||
|
|
||||||
|
```bash
|
||||||
|
# Example using webhook.site
|
||||||
|
export STEP_COMPLETE_WEBHOOK="https://webhook.site/your-unique-url"
|
||||||
|
export STEP_COMPLETE_KEY="test-key-123"
|
||||||
|
|
||||||
|
# Run tests
|
||||||
|
python -m pytest apps/core/letta/services/webhook_service_test.py -v
|
||||||
|
```
|
||||||
|
|
||||||
|
## Implementation Details
|
||||||
|
|
||||||
|
The webhook notification is sent after:
|
||||||
|
1. The step is persisted to the database
|
||||||
|
2. Step metrics are recorded
|
||||||
|
|
||||||
|
This ensures that the step data is fully committed before external systems are notified.
|
||||||
|
|
||||||
|
### Temporal Integration
|
||||||
|
|
||||||
|
When using Temporal, the webhook call is executed as a separate activity (`send_step_complete_webhook`) with the following configuration:
|
||||||
|
|
||||||
|
- **Start-to-close timeout**: 15 seconds
|
||||||
|
- **Schedule-to-close timeout**: 30 seconds
|
||||||
|
- **Retry behavior**: Wrapped in try-catch to prevent workflow failure on webhook errors
|
||||||
|
|
||||||
|
This allows you to monitor webhook delivery in the Temporal UI and get detailed visibility into any failures.
|
||||||
|
|
||||||
|
### File Locations
|
||||||
|
|
||||||
|
**Core Service:**
|
||||||
|
- `apps/core/letta/services/webhook_service.py` - HTTP client for webhook delivery
|
||||||
|
|
||||||
|
**Temporal Integration:**
|
||||||
|
- `apps/core/letta/agents/temporal/activities/send_webhook.py` - Temporal activity wrapper
|
||||||
|
- `apps/core/letta/agents/temporal/temporal_agent_workflow.py` - Workflow integration
|
||||||
|
- `apps/core/letta/agents/temporal/constants.py` - Timeout constants
|
||||||
|
|
||||||
|
**Non-Temporal Integration:**
|
||||||
|
- `apps/core/letta/services/step_manager.py` - Direct calls in update_step_* methods
|
||||||
|
|
||||||
|
**Tests:**
|
||||||
|
- `apps/core/letta/services/webhook_service_test.py` - Unit tests
|
||||||
@@ -21,6 +21,7 @@ from letta.schemas.step import Step as PydanticStep
|
|||||||
from letta.schemas.step_metrics import StepMetrics as PydanticStepMetrics
|
from letta.schemas.step_metrics import StepMetrics as PydanticStepMetrics
|
||||||
from letta.schemas.user import User as PydanticUser
|
from letta.schemas.user import User as PydanticUser
|
||||||
from letta.server.db import db_registry
|
from letta.server.db import db_registry
|
||||||
|
from letta.services.webhook_service import WebhookService
|
||||||
from letta.utils import enforce_types
|
from letta.utils import enforce_types
|
||||||
from letta.validators import raise_on_invalid_id
|
from letta.validators import raise_on_invalid_id
|
||||||
|
|
||||||
@@ -361,7 +362,13 @@ class StepManager:
|
|||||||
step.stop_reason = stop_reason.stop_reason
|
step.stop_reason = stop_reason.stop_reason
|
||||||
|
|
||||||
await session.commit()
|
await session.commit()
|
||||||
return step.to_pydantic()
|
pydantic_step = step.to_pydantic()
|
||||||
|
|
||||||
|
# Send webhook notification for step completion
|
||||||
|
webhook_service = WebhookService()
|
||||||
|
await webhook_service.notify_step_complete(step_id)
|
||||||
|
|
||||||
|
return pydantic_step
|
||||||
|
|
||||||
@enforce_types
|
@enforce_types
|
||||||
@trace_method
|
@trace_method
|
||||||
@@ -402,7 +409,13 @@ class StepManager:
|
|||||||
step.stop_reason = stop_reason.stop_reason
|
step.stop_reason = stop_reason.stop_reason
|
||||||
|
|
||||||
await session.commit()
|
await session.commit()
|
||||||
return step.to_pydantic()
|
pydantic_step = step.to_pydantic()
|
||||||
|
|
||||||
|
# Send webhook notification for step completion
|
||||||
|
webhook_service = WebhookService()
|
||||||
|
await webhook_service.notify_step_complete(step_id)
|
||||||
|
|
||||||
|
return pydantic_step
|
||||||
|
|
||||||
@enforce_types
|
@enforce_types
|
||||||
@trace_method
|
@trace_method
|
||||||
@@ -438,7 +451,13 @@ class StepManager:
|
|||||||
step.stop_reason = stop_reason.stop_reason
|
step.stop_reason = stop_reason.stop_reason
|
||||||
|
|
||||||
await session.commit()
|
await session.commit()
|
||||||
return step.to_pydantic()
|
pydantic_step = step.to_pydantic()
|
||||||
|
|
||||||
|
# Send webhook notification for step completion
|
||||||
|
webhook_service = WebhookService()
|
||||||
|
await webhook_service.notify_step_complete(step_id)
|
||||||
|
|
||||||
|
return pydantic_step
|
||||||
|
|
||||||
@enforce_types
|
@enforce_types
|
||||||
@trace_method
|
@trace_method
|
||||||
|
|||||||
57
letta/services/webhook_service.py
Normal file
57
letta/services/webhook_service.py
Normal file
@@ -0,0 +1,57 @@
|
|||||||
|
import logging
|
||||||
|
import os
|
||||||
|
from typing import Optional
|
||||||
|
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
logger = logging.getLogger(__name__)
|
||||||
|
|
||||||
|
|
||||||
|
class WebhookService:
|
||||||
|
"""Service for sending webhook notifications when steps complete."""
|
||||||
|
|
||||||
|
def __init__(self):
|
||||||
|
self.webhook_url = os.getenv("STEP_COMPLETE_WEBHOOK")
|
||||||
|
self.webhook_key = os.getenv("STEP_COMPLETE_KEY")
|
||||||
|
|
||||||
|
async def notify_step_complete(self, step_id: str) -> bool:
|
||||||
|
"""
|
||||||
|
Send a POST request to the configured webhook URL when a step completes.
|
||||||
|
|
||||||
|
Args:
|
||||||
|
step_id: The ID of the completed step
|
||||||
|
|
||||||
|
Returns:
|
||||||
|
bool: True if notification was sent successfully, False otherwise
|
||||||
|
"""
|
||||||
|
if not self.webhook_url:
|
||||||
|
logger.debug("STEP_COMPLETE_WEBHOOK not configured, skipping webhook notification")
|
||||||
|
return False
|
||||||
|
|
||||||
|
try:
|
||||||
|
headers = {}
|
||||||
|
if self.webhook_key:
|
||||||
|
headers["Authorization"] = f"Bearer {self.webhook_key}"
|
||||||
|
|
||||||
|
payload = {"step_id": step_id}
|
||||||
|
|
||||||
|
async with httpx.AsyncClient(timeout=10.0) as client:
|
||||||
|
response = await client.post(
|
||||||
|
self.webhook_url,
|
||||||
|
json=payload,
|
||||||
|
headers=headers,
|
||||||
|
)
|
||||||
|
response.raise_for_status()
|
||||||
|
|
||||||
|
logger.info(f"Successfully sent step completion webhook for step {step_id}")
|
||||||
|
return True
|
||||||
|
|
||||||
|
except httpx.TimeoutException:
|
||||||
|
logger.warning(f"Timeout sending step completion webhook for step {step_id}")
|
||||||
|
return False
|
||||||
|
except httpx.HTTPStatusError as e:
|
||||||
|
logger.warning(f"HTTP error sending step completion webhook for step {step_id}: {e.response.status_code}")
|
||||||
|
return False
|
||||||
|
except Exception as e:
|
||||||
|
logger.error(f"Unexpected error sending step completion webhook for step {step_id}: {e}")
|
||||||
|
return False
|
||||||
119
letta/services/webhook_service_test.py
Normal file
119
letta/services/webhook_service_test.py
Normal file
@@ -0,0 +1,119 @@
|
|||||||
|
"""
|
||||||
|
Simple test to verify webhook service functionality.
|
||||||
|
|
||||||
|
To run this test:
|
||||||
|
python -m pytest letta/services/webhook_service_test.py -v
|
||||||
|
|
||||||
|
To test with actual webhook:
|
||||||
|
export STEP_COMPLETE_WEBHOOK=https://your-webhook-url.com/endpoint
|
||||||
|
export STEP_COMPLETE_KEY=your-secret-key
|
||||||
|
python -m pytest letta/services/webhook_service_test.py -v
|
||||||
|
|
||||||
|
These tests verify the webhook service works in both:
|
||||||
|
- Temporal mode (when webhooks are called as Temporal activities)
|
||||||
|
- Non-Temporal mode (when webhooks are called directly from StepManager)
|
||||||
|
"""
|
||||||
|
|
||||||
|
import os
|
||||||
|
from unittest.mock import AsyncMock, patch
|
||||||
|
|
||||||
|
import pytest
|
||||||
|
|
||||||
|
from letta.services.webhook_service import WebhookService
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_webhook_not_configured():
|
||||||
|
"""Test that webhook does not send when URL is not configured."""
|
||||||
|
with patch.dict(os.environ, {}, clear=True):
|
||||||
|
service = WebhookService()
|
||||||
|
result = await service.notify_step_complete("step_123")
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_webhook_success():
|
||||||
|
"""Test successful webhook notification."""
|
||||||
|
with patch.dict(
|
||||||
|
os.environ,
|
||||||
|
{"STEP_COMPLETE_WEBHOOK": "https://example.com/webhook", "STEP_COMPLETE_KEY": "test-key"},
|
||||||
|
):
|
||||||
|
service = WebhookService()
|
||||||
|
|
||||||
|
with patch("httpx.AsyncClient") as mock_client:
|
||||||
|
mock_response = AsyncMock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.raise_for_status = AsyncMock()
|
||||||
|
|
||||||
|
mock_post = AsyncMock(return_value=mock_response)
|
||||||
|
mock_client.return_value.__aenter__.return_value.post = mock_post
|
||||||
|
|
||||||
|
result = await service.notify_step_complete("step_123")
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
mock_post.assert_called_once()
|
||||||
|
call_args = mock_post.call_args
|
||||||
|
assert call_args.kwargs["json"] == {"step_id": "step_123"}
|
||||||
|
assert call_args.kwargs["headers"]["Authorization"] == "Bearer test-key"
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_webhook_without_auth():
|
||||||
|
"""Test webhook notification without authentication key."""
|
||||||
|
with patch.dict(os.environ, {"STEP_COMPLETE_WEBHOOK": "https://example.com/webhook"}, clear=True):
|
||||||
|
service = WebhookService()
|
||||||
|
|
||||||
|
with patch("httpx.AsyncClient") as mock_client:
|
||||||
|
mock_response = AsyncMock()
|
||||||
|
mock_response.status_code = 200
|
||||||
|
mock_response.raise_for_status = AsyncMock()
|
||||||
|
|
||||||
|
mock_post = AsyncMock(return_value=mock_response)
|
||||||
|
mock_client.return_value.__aenter__.return_value.post = mock_post
|
||||||
|
|
||||||
|
result = await service.notify_step_complete("step_123")
|
||||||
|
|
||||||
|
assert result is True
|
||||||
|
call_args = mock_post.call_args
|
||||||
|
# Should not have Authorization header
|
||||||
|
assert "Authorization" not in call_args.kwargs["headers"]
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_webhook_timeout():
|
||||||
|
"""Test webhook notification timeout handling."""
|
||||||
|
with patch.dict(os.environ, {"STEP_COMPLETE_WEBHOOK": "https://example.com/webhook"}):
|
||||||
|
service = WebhookService()
|
||||||
|
|
||||||
|
with patch("httpx.AsyncClient") as mock_client:
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
mock_post = AsyncMock(side_effect=httpx.TimeoutException("Request timed out"))
|
||||||
|
mock_client.return_value.__aenter__.return_value.post = mock_post
|
||||||
|
|
||||||
|
result = await service.notify_step_complete("step_123")
|
||||||
|
|
||||||
|
assert result is False
|
||||||
|
|
||||||
|
|
||||||
|
@pytest.mark.asyncio
|
||||||
|
async def test_webhook_http_error():
|
||||||
|
"""Test webhook notification HTTP error handling."""
|
||||||
|
with patch.dict(os.environ, {"STEP_COMPLETE_WEBHOOK": "https://example.com/webhook"}):
|
||||||
|
service = WebhookService()
|
||||||
|
|
||||||
|
with patch("httpx.AsyncClient") as mock_client:
|
||||||
|
import httpx
|
||||||
|
|
||||||
|
mock_response = AsyncMock()
|
||||||
|
mock_response.status_code = 500
|
||||||
|
mock_response.raise_for_status = AsyncMock(
|
||||||
|
side_effect=httpx.HTTPStatusError("Server error", request=None, response=mock_response)
|
||||||
|
)
|
||||||
|
|
||||||
|
mock_post = AsyncMock(return_value=mock_response)
|
||||||
|
mock_client.return_value.__aenter__.return_value.post = mock_post
|
||||||
|
|
||||||
|
result = await service.notify_step_complete("step_123")
|
||||||
|
|
||||||
|
assert result is False
|
||||||
Reference in New Issue
Block a user