From 24149710fee3eac18d4393152446c757a6a65e2c Mon Sep 17 00:00:00 2001 From: Matthew Zhou Date: Mon, 16 Jun 2025 11:23:46 -0700 Subject: [PATCH] feat: Asyncify message async routes and also add callback url (#2838) --- letta/server/rest_api/routers/v1/agents.py | 106 +++++++---- letta/services/job_manager.py | 42 +++-- tests/data/philosophical_question.txt | 7 + tests/integration_test_send_message.py | 201 ++++++++++++++++++++- 4 files changed, 294 insertions(+), 62 deletions(-) create mode 100644 tests/data/philosophical_question.txt diff --git a/letta/server/rest_api/routers/v1/agents.py b/letta/server/rest_api/routers/v1/agents.py index 0399266d..7af2d304 100644 --- a/letta/server/rest_api/routers/v1/agents.py +++ b/letta/server/rest_api/routers/v1/agents.py @@ -1,9 +1,10 @@ +import asyncio import json import traceback from datetime import datetime, timezone from typing import Annotated, Any, List, Optional -from fastapi import APIRouter, BackgroundTasks, Body, Depends, File, Header, HTTPException, Query, Request, UploadFile, status +from fastapi import APIRouter, Body, Depends, File, Header, HTTPException, Query, Request, UploadFile, status from fastapi.responses import JSONResponse from marshmallow import ValidationError from orjson import orjson @@ -847,29 +848,63 @@ async def process_message_background( include_return_message_types: Optional[List[MessageType]] = None, ) -> None: """Background task to process the message and update job status.""" + request_start_timestamp_ns = get_utc_timestamp_ns() try: - request_start_timestamp_ns = get_utc_timestamp_ns() - result = await server.send_message_to_agent( - agent_id=agent_id, - actor=actor, - input_messages=messages, - stream_steps=False, # NOTE(matt) - stream_tokens=False, - use_assistant_message=use_assistant_message, - assistant_message_tool_name=assistant_message_tool_name, - assistant_message_tool_kwarg=assistant_message_tool_kwarg, - metadata={"job_id": job_id}, # Pass job_id through metadata - request_start_timestamp_ns=request_start_timestamp_ns, - include_return_message_types=include_return_message_types, - ) + agent = await server.agent_manager.get_agent_by_id_async(agent_id, actor, include_relationships=["multi_agent_group"]) + agent_eligible = agent.multi_agent_group is None or agent.multi_agent_group.manager_type in ["sleeptime", "voice_sleeptime"] + model_compatible = agent.llm_config.model_endpoint_type in ["anthropic", "openai", "together", "google_ai", "google_vertex"] + if agent_eligible and model_compatible: + if agent.enable_sleeptime and agent.agent_type != AgentType.voice_convo_agent: + agent_loop = SleeptimeMultiAgentV2( + agent_id=agent_id, + message_manager=server.message_manager, + agent_manager=server.agent_manager, + block_manager=server.block_manager, + passage_manager=server.passage_manager, + group_manager=server.group_manager, + job_manager=server.job_manager, + actor=actor, + group=agent.multi_agent_group, + ) + else: + agent_loop = LettaAgent( + agent_id=agent_id, + message_manager=server.message_manager, + agent_manager=server.agent_manager, + block_manager=server.block_manager, + passage_manager=server.passage_manager, + actor=actor, + step_manager=server.step_manager, + telemetry_manager=server.telemetry_manager if settings.llm_api_logging else NoopTelemetryManager(), + ) + + result = await agent_loop.step( + messages, + max_steps=max_steps, + use_assistant_message=use_assistant_message, + request_start_timestamp_ns=request_start_timestamp_ns, + include_return_message_types=include_return_message_types, + ) + else: + result = await server.send_message_to_agent( + agent_id=agent_id, + actor=actor, + input_messages=messages, + stream_steps=False, + stream_tokens=False, + # Support for AssistantMessage + use_assistant_message=use_assistant_message, + assistant_message_tool_name=assistant_message_tool_name, + assistant_message_tool_kwarg=assistant_message_tool_kwarg, + include_return_message_types=include_return_message_types, + ) - # Update job status to completed job_update = JobUpdate( status=JobStatus.completed, completed_at=datetime.now(timezone.utc), - metadata={"result": result.model_dump(mode="json")}, # Store the result in metadata + metadata={"result": result.model_dump(mode="json")}, ) - server.job_manager.update_job_by_id(job_id=job_id, job_update=job_update, actor=actor) + await server.job_manager.update_job_by_id_async(job_id=job_id, job_update=job_update, actor=actor) except Exception as e: # Update job status to failed @@ -878,8 +913,7 @@ async def process_message_background( completed_at=datetime.now(timezone.utc), metadata={"error": str(e)}, ) - server.job_manager.update_job_by_id(job_id=job_id, job_update=job_update, actor=actor) - raise + await server.job_manager.update_job_by_id_async(job_id=job_id, job_update=job_update, actor=actor) @router.post( @@ -889,10 +923,10 @@ async def process_message_background( ) async def send_message_async( agent_id: str, - background_tasks: BackgroundTasks, server: SyncServer = Depends(get_letta_server), request: LettaRequest = Body(...), actor_id: Optional[str] = Header(None, alias="user_id"), + callback_url: Optional[str] = Query(None, description="Optional callback URL to POST to when the job completes"), ): """ Asynchronously process a user message and return a run object. @@ -905,6 +939,7 @@ async def send_message_async( run = Run( user_id=actor.id, status=JobStatus.created, + callback_url=callback_url, metadata={ "job_type": "send_message_async", "agent_id": agent_id, @@ -915,21 +950,22 @@ async def send_message_async( assistant_message_tool_kwarg=request.assistant_message_tool_kwarg, ), ) - run = server.job_manager.create_job(pydantic_job=run, actor=actor) + run = await server.job_manager.create_job_async(pydantic_job=run, actor=actor) - # Add the background task - background_tasks.add_task( - process_message_background, - job_id=run.id, - server=server, - actor=actor, - agent_id=agent_id, - messages=request.messages, - use_assistant_message=request.use_assistant_message, - assistant_message_tool_name=request.assistant_message_tool_name, - assistant_message_tool_kwarg=request.assistant_message_tool_kwarg, - max_steps=request.max_steps, - include_return_message_types=request.include_return_message_types, + # Create asyncio task for background processing + asyncio.create_task( + process_message_background( + job_id=run.id, + server=server, + actor=actor, + agent_id=agent_id, + messages=request.messages, + use_assistant_message=request.use_assistant_message, + assistant_message_tool_name=request.assistant_message_tool_name, + assistant_message_tool_kwarg=request.assistant_message_tool_kwarg, + max_steps=request.max_steps, + include_return_message_types=request.include_return_message_types, + ) ) return run diff --git a/letta/services/job_manager.py b/letta/services/job_manager.py index 541d7514..97c5bfe8 100644 --- a/letta/services/job_manager.py +++ b/letta/services/job_manager.py @@ -6,6 +6,7 @@ from sqlalchemy import select from sqlalchemy.orm import Session from letta.helpers.datetime_helpers import get_utc_time +from letta.log import get_logger from letta.orm.enums import JobType from letta.orm.errors import NoResultFound from letta.orm.job import Job as JobModel @@ -28,6 +29,8 @@ from letta.schemas.user import User as PydanticUser from letta.server.db import db_registry from letta.utils import enforce_types +logger = get_logger(__name__) + class JobManager: """Manager class to handle business logic related to Jobs.""" @@ -73,12 +76,15 @@ class JobManager: # Automatically update the completion timestamp if status is set to 'completed' for key, value in update_data.items(): + # Ensure completed_at is timezone-naive for database compatibility + if key == "completed_at" and value is not None and hasattr(value, "replace"): + value = value.replace(tzinfo=None) setattr(job, key, value) - if update_data.get("status") == JobStatus.completed and not job.completed_at: + if job_update.status in {JobStatus.completed, JobStatus.failed} and not job.completed_at: job.completed_at = get_utc_time().replace(tzinfo=None) if job.callback_url: - self._dispatch_callback(session, job) + self._dispatch_callback(job) # Save the updated job to the database job.update(db_session=session, actor=actor) @@ -98,12 +104,15 @@ class JobManager: # Automatically update the completion timestamp if status is set to 'completed' for key, value in update_data.items(): + # Ensure completed_at is timezone-naive for database compatibility + if key == "completed_at" and value is not None and hasattr(value, "replace"): + value = value.replace(tzinfo=None) setattr(job, key, value) - if update_data.get("status") == JobStatus.completed and not job.completed_at: + if job_update.status in {JobStatus.completed, JobStatus.failed} and not job.completed_at: job.completed_at = get_utc_time().replace(tzinfo=None) if job.callback_url: - await self._dispatch_callback_async(session, job) + await self._dispatch_callback_async(job) # Save the updated job to the database await job.update_async(db_session=session, actor=actor) @@ -586,7 +595,7 @@ class JobManager: request_config = job.request_config or LettaRequestConfig() return request_config - def _dispatch_callback(self, session: Session, job: JobModel) -> None: + def _dispatch_callback(self, job: JobModel) -> None: """ POST a standard JSON payload to job.callback_url and record timestamp + HTTP status. @@ -595,22 +604,21 @@ class JobManager: payload = { "job_id": job.id, "status": job.status, - "completed_at": job.completed_at.isoformat(), + "completed_at": job.completed_at.isoformat() if job.completed_at else None, + "metadata": job.metadata, } try: import httpx resp = httpx.post(job.callback_url, json=payload, timeout=5.0) - job.callback_sent_at = get_utc_time() + job.callback_sent_at = get_utc_time().replace(tzinfo=None) job.callback_status_code = resp.status_code - except Exception: - return + except Exception as e: + logger.error(f"Failed to dispatch callback for job {job.id} to {job.callback_url}: {str(e)}") + # Continue silently - callback failures should not affect job completion - session.add(job) - session.commit() - - async def _dispatch_callback_async(self, session, job: JobModel) -> None: + async def _dispatch_callback_async(self, job: JobModel) -> None: """ POST a standard JSON payload to job.callback_url and record timestamp + HTTP status asynchronously. """ @@ -618,6 +626,7 @@ class JobManager: "job_id": job.id, "status": job.status, "completed_at": job.completed_at.isoformat() if job.completed_at else None, + "metadata": job.metadata, } try: @@ -628,7 +637,6 @@ class JobManager: # Ensure timestamp is timezone-naive for DB compatibility job.callback_sent_at = get_utc_time().replace(tzinfo=None) job.callback_status_code = resp.status_code - except Exception: - # Silently fail on callback errors - job updates should still succeed - # In production, this would include proper error logging - pass + except Exception as e: + logger.error(f"Failed to dispatch callback for job {job.id} to {job.callback_url}: {str(e)}") + # Continue silently - callback failures should not affect job completion diff --git a/tests/data/philosophical_question.txt b/tests/data/philosophical_question.txt new file mode 100644 index 00000000..084e27b0 --- /dev/null +++ b/tests/data/philosophical_question.txt @@ -0,0 +1,7 @@ +You know, sometimes I wonder if the entire structure of our lives is built on a series of unexamined assumptions we just silently agreed to somewhere along the way—like how we all just decided that five days a week of work and two days of "rest" constitutes balance, or how 9-to-5 became the default rhythm of a meaningful life, or even how the idea of "success" got boiled down to job titles and property ownership and productivity metrics on a LinkedIn profile, when maybe none of that is actually what makes a life feel full, or grounded, or real. And then there's the weird paradox of ambition, how we're taught to chase it like a finish line that keeps moving, constantly redefining itself right as you're about to grasp it—because even when you get the job, or the degree, or the validation, there's always something next, something more, like a treadmill with invisible settings you didn't realize were turned up all the way. + +And have you noticed how we rarely stop to ask who set those definitions for us? Like was there ever a council that decided, yes, owning a home by thirty-five and retiring by sixty-five is the universal template for fulfillment? Or did it just accumulate like cultural sediment over generations, layered into us so deeply that questioning it feels uncomfortable, even dangerous? And isn't it strange that we spend so much of our lives trying to optimize things—our workflows, our diets, our sleep, our morning routines—as though the point of life is to operate more efficiently rather than to experience it more richly? We build these intricate systems, these rulebooks for being a "high-functioning" human, but where in all of that is the space for feeling lost, for being soft, for wandering without a purpose just because it's a sunny day and your heart is tugging you toward nowhere in particular? + +Sometimes I lie awake at night and wonder if all the noise we wrap around ourselves—notifications, updates, performance reviews, even our internal monologues—might be crowding out the questions we were meant to live into slowly, like how to love better, or how to forgive ourselves, or what the hell we're even doing here in the first place. And when you strip it all down—no goals, no KPIs, no curated identity—what's actually left of us? Are we just a sum of the roles we perform, or is there something quieter underneath that we've forgotten how to hear? + +And if there is something underneath all of it—something real, something worth listening to—then how do we begin to uncover it, gently, without rushing or reducing it to another task on our to-do list? \ No newline at end of file diff --git a/tests/integration_test_send_message.py b/tests/integration_test_send_message.py index 9a4ba908..a33fb855 100644 --- a/tests/integration_test_send_message.py +++ b/tests/integration_test_send_message.py @@ -4,6 +4,8 @@ import os import threading import time import uuid +from contextlib import contextmanager +from http.server import BaseHTTPRequestHandler, HTTPServer from typing import Any, Dict, List import httpx @@ -100,7 +102,6 @@ all_configs = [ "openai-o1-mini.json", "openai-o3.json", "openai-o3-mini.json", - # "azure-gpt-4o-mini.json", # TODO: Re-enable on new agent loop "azure-gpt-4o-mini.json", "claude-3-5-sonnet.json", "claude-3-7-sonnet.json", @@ -894,6 +895,192 @@ def test_async_greeting_with_assistant_message( assert_tool_response_dict_messages(messages) +class CallbackServer: + """Mock HTTP server for testing callback functionality.""" + + def __init__(self): + self.received_callbacks = [] + self.server = None + self.thread = None + self.port = None + + def start(self): + """Start the mock server on an available port.""" + + class CallbackHandler(BaseHTTPRequestHandler): + def __init__(self, callback_server, *args, **kwargs): + self.callback_server = callback_server + super().__init__(*args, **kwargs) + + def do_POST(self): + content_length = int(self.headers["Content-Length"]) + post_data = self.rfile.read(content_length) + try: + callback_data = json.loads(post_data.decode("utf-8")) + self.callback_server.received_callbacks.append( + {"data": callback_data, "headers": dict(self.headers), "timestamp": time.time()} + ) + # Respond with success + self.send_response(200) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "received"}).encode()) + except Exception as e: + # Respond with error + self.send_response(400) + self.send_header("Content-type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"error": str(e)}).encode()) + + def log_message(self, format, *args): + # Suppress log messages during tests + pass + + # Bind to available port + self.server = HTTPServer(("localhost", 0), lambda *args: CallbackHandler(self, *args)) + self.port = self.server.server_address[1] + + # Start server in background thread + self.thread = threading.Thread(target=self.server.serve_forever) + self.thread.daemon = True + self.thread.start() + + def stop(self): + """Stop the mock server.""" + if self.server: + self.server.shutdown() + self.server.server_close() + if self.thread: + self.thread.join(timeout=1) + + @property + def url(self): + """Get the callback URL for this server.""" + return f"http://localhost:{self.port}/callback" + + def wait_for_callback(self, timeout=10): + """Wait for at least one callback to be received.""" + start_time = time.time() + while time.time() - start_time < timeout: + if self.received_callbacks: + return True + time.sleep(0.1) + return False + + +@contextmanager +def callback_server(): + """Context manager for callback server.""" + server = CallbackServer() + try: + server.start() + yield server + finally: + server.stop() + + +# TODO: Add back in a bit +# @pytest.mark.parametrize( +# "llm_config", +# TESTED_LLM_CONFIGS, +# ids=[c.model for c in TESTED_LLM_CONFIGS], +# ) +# def test_async_greeting_with_callback_url( +# disable_e2b_api_key: Any, +# client: Letta, +# agent_state: AgentState, +# llm_config: LLMConfig, +# ) -> None: +# """ +# Tests sending a message as an asynchronous job with callback URL functionality. +# Validates that callbacks are properly sent with correct payload structure. +# """ +# client.agents.modify(agent_id=agent_state.id, llm_config=llm_config) +# +# with callback_server() as server: +# # Create async job with callback URL +# run = client.agents.messages.create_async( +# agent_id=agent_state.id, +# messages=USER_MESSAGE_FORCE_REPLY, +# callback_url=server.url, +# ) +# +# # Wait for job completion +# run = wait_for_run_completion(client, run.id) +# +# # Validate job completed successfully +# result = run.metadata.get("result") +# assert result is not None, "Run metadata missing 'result' key" +# messages = result["messages"] +# assert_tool_response_dict_messages(messages) +# +# # Validate callback was received +# assert server.wait_for_callback(timeout=15), "Callback was not received within timeout" +# assert len(server.received_callbacks) == 1, f"Expected 1 callback, got {len(server.received_callbacks)}" +# +# # Validate callback payload structure +# callback = server.received_callbacks[0] +# callback_data = callback["data"] +# +# # Check required fields +# assert "job_id" in callback_data, "Callback missing 'job_id' field" +# assert "status" in callback_data, "Callback missing 'status' field" +# assert "completed_at" in callback_data, "Callback missing 'completed_at' field" +# assert "metadata" in callback_data, "Callback missing 'metadata' field" +# +# # Validate field values +# assert callback_data["job_id"] == run.id, f"Job ID mismatch: {callback_data['job_id']} != {run.id}" +# assert callback_data["status"] == "completed", f"Expected status 'completed', got {callback_data['status']}" +# assert callback_data["completed_at"] is not None, "completed_at should not be None" +# assert callback_data["metadata"] is not None, "metadata should not be None" +# +# # Validate that callback metadata contains the result +# assert "result" in callback_data["metadata"], "Callback metadata missing 'result' field" +# callback_result = callback_data["metadata"]["result"] +# assert callback_result == result, "Callback result doesn't match job result" +# +# # Validate HTTP headers +# headers = callback["headers"] +# assert headers.get("Content-Type") == "application/json", "Callback should have JSON content type" +# +# +# @pytest.mark.parametrize( +# "llm_config", +# TESTED_LLM_CONFIGS, +# ids=[c.model for c in TESTED_LLM_CONFIGS], +# ) +# def test_async_callback_failure_scenarios( +# disable_e2b_api_key: Any, +# client: Letta, +# agent_state: AgentState, +# llm_config: LLMConfig, +# ) -> None: +# """ +# Tests that job completion works even when callback URLs fail. +# This ensures callback failures don't affect job processing. +# """ +# client.agents.modify(agent_id=agent_state.id, llm_config=llm_config) +# +# # Test with invalid callback URL - job should still complete +# run = client.agents.messages.create_async( +# agent_id=agent_state.id, +# messages=USER_MESSAGE_FORCE_REPLY, +# callback_url="http://invalid-domain-that-does-not-exist.com/callback", +# ) +# +# # Wait for job completion - should work despite callback failure +# run = wait_for_run_completion(client, run.id) +# +# # Validate job completed successfully +# result = run.metadata.get("result") +# assert result is not None, "Run metadata missing 'result' key" +# messages = result["messages"] +# assert_tool_response_dict_messages(messages) +# +# # Job should be marked as completed even if callback failed +# assert run.status == "completed", f"Expected status 'completed', got {run.status}" + + @pytest.mark.parametrize( "llm_config", TESTED_LLM_CONFIGS, @@ -915,15 +1102,9 @@ def test_auto_summarize(disable_e2b_api_key: Any, client: Letta, llm_config: LLM tags=["supervisor"], ) - philosophical_question = """ -You know, sometimes I wonder if the entire structure of our lives is built on a series of unexamined assumptions we just silently agreed to somewhere along the way—like how we all just decided that five days a week of work and two days of "rest" constitutes balance, or how 9-to-5 became the default rhythm of a meaningful life, or even how the idea of "success" got boiled down to job titles and property ownership and productivity metrics on a LinkedIn profile, when maybe none of that is actually what makes a life feel full, or grounded, or real. And then there's the weird paradox of ambition, how we're taught to chase it like a finish line that keeps moving, constantly redefining itself right as you're about to grasp it—because even when you get the job, or the degree, or the validation, there's always something next, something more, like a treadmill with invisible settings you didn't realize were turned up all the way. - -And have you noticed how we rarely stop to ask who set those definitions for us? Like was there ever a council that decided, yes, owning a home by thirty-five and retiring by sixty-five is the universal template for fulfillment? Or did it just accumulate like cultural sediment over generations, layered into us so deeply that questioning it feels uncomfortable, even dangerous? And isn't it strange that we spend so much of our lives trying to optimize things—our workflows, our diets, our sleep, our morning routines—as though the point of life is to operate more efficiently rather than to experience it more richly? We build these intricate systems, these rulebooks for being a "high-functioning" human, but where in all of that is the space for feeling lost, for being soft, for wandering without a purpose just because it's a sunny day and your heart is tugging you toward nowhere in particular? - -Sometimes I lie awake at night and wonder if all the noise we wrap around ourselves—notifications, updates, performance reviews, even our internal monologues—might be crowding out the questions we were meant to live into slowly, like how to love better, or how to forgive ourselves, or what the hell we're even doing here in the first place. And when you strip it all down—no goals, no KPIs, no curated identity—what's actually left of us? Are we just a sum of the roles we perform, or is there something quieter underneath that we've forgotten how to hear? - -And if there is something underneath all of it—something real, something worth listening to—then how do we begin to uncover it, gently, without rushing or reducing it to another task on our to-do list? - """ + philosophical_question_path = os.path.join(os.path.dirname(__file__), "data", "philosophical_question.txt") + with open(philosophical_question_path, "r", encoding="utf-8") as f: + philosophical_question = f.read().strip() MAX_ATTEMPTS = 10 prev_length = None