feat: Add callback_error and fix callback logic (#2842)
This commit is contained in:
@@ -0,0 +1,31 @@
|
||||
"""Add callback error field to jobs
|
||||
|
||||
Revision ID: 90fd814d0cda
|
||||
Revises: c0ef3ff26306
|
||||
Create Date: 2025-06-16 13:04:53.496195
|
||||
|
||||
"""
|
||||
|
||||
from typing import Sequence, Union
|
||||
|
||||
import sqlalchemy as sa
|
||||
|
||||
from alembic import op
|
||||
|
||||
# revision identifiers, used by Alembic.
|
||||
revision: str = "90fd814d0cda"
|
||||
down_revision: Union[str, None] = "c0ef3ff26306"
|
||||
branch_labels: Union[str, Sequence[str], None] = None
|
||||
depends_on: Union[str, Sequence[str], None] = None
|
||||
|
||||
|
||||
def upgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.add_column("jobs", sa.Column("callback_error", sa.String(), nullable=True))
|
||||
# ### end Alembic commands ###
|
||||
|
||||
|
||||
def downgrade() -> None:
|
||||
# ### commands auto generated by Alembic - please adjust! ###
|
||||
op.drop_column("jobs", "callback_error")
|
||||
# ### end Alembic commands ###
|
||||
@@ -43,6 +43,9 @@ class Job(SqlalchemyBase, UserMixin):
|
||||
callback_url: Mapped[Optional[str]] = mapped_column(String, nullable=True, doc="When set, POST to this URL after job completion.")
|
||||
callback_sent_at: Mapped[Optional[datetime]] = mapped_column(nullable=True, doc="Timestamp when the callback was last attempted.")
|
||||
callback_status_code: Mapped[Optional[int]] = mapped_column(nullable=True, doc="HTTP status code returned by the callback endpoint.")
|
||||
callback_error: Mapped[Optional[str]] = mapped_column(
|
||||
nullable=True, doc="Optional error message from attempting to POST the callback endpoint."
|
||||
)
|
||||
|
||||
# relationships
|
||||
user: Mapped["User"] = relationship("User", back_populates="jobs")
|
||||
|
||||
@@ -19,6 +19,7 @@ class JobBase(OrmMetadataBase):
|
||||
callback_url: Optional[str] = Field(None, description="If set, POST to this URL when the job completes.")
|
||||
callback_sent_at: Optional[datetime] = Field(None, description="Timestamp when the callback was last attempted.")
|
||||
callback_status_code: Optional[int] = Field(None, description="HTTP status code returned by the callback endpoint.")
|
||||
callback_error: Optional[str] = Field(None, description="Optional error message from attempting to POST the callback endpoint.")
|
||||
|
||||
|
||||
class Job(JobBase):
|
||||
|
||||
@@ -70,6 +70,7 @@ class JobManager:
|
||||
with db_registry.session() as session:
|
||||
# Fetch the job by ID
|
||||
job = self._verify_job_access(session=session, job_id=job_id, actor=actor, access=["write"])
|
||||
not_completed_before = not bool(job.completed_at)
|
||||
|
||||
# Update job attributes with only the fields that were explicitly set
|
||||
update_data = job_update.model_dump(to_orm=True, exclude_unset=True, exclude_none=True)
|
||||
@@ -81,7 +82,7 @@ class JobManager:
|
||||
value = value.replace(tzinfo=None)
|
||||
setattr(job, key, value)
|
||||
|
||||
if job_update.status in {JobStatus.completed, JobStatus.failed} and not job.completed_at:
|
||||
if job_update.status in {JobStatus.completed, JobStatus.failed} and not_completed_before:
|
||||
job.completed_at = get_utc_time().replace(tzinfo=None)
|
||||
if job.callback_url:
|
||||
self._dispatch_callback(job)
|
||||
@@ -98,6 +99,7 @@ class JobManager:
|
||||
async with db_registry.async_session() as session:
|
||||
# Fetch the job by ID
|
||||
job = await self._verify_job_access_async(session=session, job_id=job_id, actor=actor, access=["write"])
|
||||
not_completed_before = not bool(job.completed_at)
|
||||
|
||||
# Update job attributes with only the fields that were explicitly set
|
||||
update_data = job_update.model_dump(to_orm=True, exclude_unset=True, exclude_none=True)
|
||||
@@ -109,7 +111,7 @@ class JobManager:
|
||||
value = value.replace(tzinfo=None)
|
||||
setattr(job, key, value)
|
||||
|
||||
if job_update.status in {JobStatus.completed, JobStatus.failed} and not job.completed_at:
|
||||
if job_update.status in {JobStatus.completed, JobStatus.failed} and not_completed_before:
|
||||
job.completed_at = get_utc_time().replace(tzinfo=None)
|
||||
if job.callback_url:
|
||||
await self._dispatch_callback_async(job)
|
||||
@@ -605,7 +607,7 @@ class JobManager:
|
||||
"job_id": job.id,
|
||||
"status": job.status,
|
||||
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
|
||||
"metadata": job.metadata,
|
||||
"metadata": job.metadata_,
|
||||
}
|
||||
try:
|
||||
import httpx
|
||||
@@ -615,7 +617,11 @@ class JobManager:
|
||||
job.callback_status_code = resp.status_code
|
||||
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to dispatch callback for job {job.id} to {job.callback_url}: {str(e)}")
|
||||
error_message = f"Failed to dispatch callback for job {job.id} to {job.callback_url}: {str(e)}"
|
||||
logger.error(error_message)
|
||||
# Record the failed attempt
|
||||
job.callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
job.callback_error = error_message
|
||||
# Continue silently - callback failures should not affect job completion
|
||||
|
||||
async def _dispatch_callback_async(self, job: JobModel) -> None:
|
||||
@@ -626,7 +632,7 @@ class JobManager:
|
||||
"job_id": job.id,
|
||||
"status": job.status,
|
||||
"completed_at": job.completed_at.isoformat() if job.completed_at else None,
|
||||
"metadata": job.metadata,
|
||||
"metadata": job.metadata_,
|
||||
}
|
||||
|
||||
try:
|
||||
@@ -638,5 +644,9 @@ class JobManager:
|
||||
job.callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
job.callback_status_code = resp.status_code
|
||||
except Exception as e:
|
||||
logger.error(f"Failed to dispatch callback for job {job.id} to {job.callback_url}: {str(e)}")
|
||||
error_message = f"Failed to dispatch callback for job {job.id} to {job.callback_url}: {str(e)}"
|
||||
logger.error(error_message)
|
||||
# Record the failed attempt
|
||||
job.callback_sent_at = get_utc_time().replace(tzinfo=None)
|
||||
job.callback_error = error_message
|
||||
# Continue silently - callback failures should not affect job completion
|
||||
|
||||
526
poetry.lock
generated
526
poetry.lock
generated
File diff suppressed because it is too large
Load Diff
@@ -72,7 +72,7 @@ llama-index = "^0.12.2"
|
||||
llama-index-embeddings-openai = "^0.3.1"
|
||||
e2b-code-interpreter = {version = "^1.0.3", optional = true}
|
||||
anthropic = "^0.49.0"
|
||||
letta_client = "^0.1.155"
|
||||
letta_client = "^0.1.160"
|
||||
openai = "^1.60.0"
|
||||
opentelemetry-api = "1.30.0"
|
||||
opentelemetry-sdk = "1.30.0"
|
||||
|
||||
@@ -979,106 +979,113 @@ def callback_server():
|
||||
server.stop()
|
||||
|
||||
|
||||
# TODO: Add back in a bit
|
||||
# @pytest.mark.parametrize(
|
||||
# "llm_config",
|
||||
# TESTED_LLM_CONFIGS,
|
||||
# ids=[c.model for c in TESTED_LLM_CONFIGS],
|
||||
# )
|
||||
# def test_async_greeting_with_callback_url(
|
||||
# disable_e2b_api_key: Any,
|
||||
# client: Letta,
|
||||
# agent_state: AgentState,
|
||||
# llm_config: LLMConfig,
|
||||
# ) -> None:
|
||||
# """
|
||||
# Tests sending a message as an asynchronous job with callback URL functionality.
|
||||
# Validates that callbacks are properly sent with correct payload structure.
|
||||
# """
|
||||
# client.agents.modify(agent_id=agent_state.id, llm_config=llm_config)
|
||||
#
|
||||
# with callback_server() as server:
|
||||
# # Create async job with callback URL
|
||||
# run = client.agents.messages.create_async(
|
||||
# agent_id=agent_state.id,
|
||||
# messages=USER_MESSAGE_FORCE_REPLY,
|
||||
# callback_url=server.url,
|
||||
# )
|
||||
#
|
||||
# # Wait for job completion
|
||||
# run = wait_for_run_completion(client, run.id)
|
||||
#
|
||||
# # Validate job completed successfully
|
||||
# result = run.metadata.get("result")
|
||||
# assert result is not None, "Run metadata missing 'result' key"
|
||||
# messages = result["messages"]
|
||||
# assert_tool_response_dict_messages(messages)
|
||||
#
|
||||
# # Validate callback was received
|
||||
# assert server.wait_for_callback(timeout=15), "Callback was not received within timeout"
|
||||
# assert len(server.received_callbacks) == 1, f"Expected 1 callback, got {len(server.received_callbacks)}"
|
||||
#
|
||||
# # Validate callback payload structure
|
||||
# callback = server.received_callbacks[0]
|
||||
# callback_data = callback["data"]
|
||||
#
|
||||
# # Check required fields
|
||||
# assert "job_id" in callback_data, "Callback missing 'job_id' field"
|
||||
# assert "status" in callback_data, "Callback missing 'status' field"
|
||||
# assert "completed_at" in callback_data, "Callback missing 'completed_at' field"
|
||||
# assert "metadata" in callback_data, "Callback missing 'metadata' field"
|
||||
#
|
||||
# # Validate field values
|
||||
# assert callback_data["job_id"] == run.id, f"Job ID mismatch: {callback_data['job_id']} != {run.id}"
|
||||
# assert callback_data["status"] == "completed", f"Expected status 'completed', got {callback_data['status']}"
|
||||
# assert callback_data["completed_at"] is not None, "completed_at should not be None"
|
||||
# assert callback_data["metadata"] is not None, "metadata should not be None"
|
||||
#
|
||||
# # Validate that callback metadata contains the result
|
||||
# assert "result" in callback_data["metadata"], "Callback metadata missing 'result' field"
|
||||
# callback_result = callback_data["metadata"]["result"]
|
||||
# assert callback_result == result, "Callback result doesn't match job result"
|
||||
#
|
||||
# # Validate HTTP headers
|
||||
# headers = callback["headers"]
|
||||
# assert headers.get("Content-Type") == "application/json", "Callback should have JSON content type"
|
||||
#
|
||||
#
|
||||
# @pytest.mark.parametrize(
|
||||
# "llm_config",
|
||||
# TESTED_LLM_CONFIGS,
|
||||
# ids=[c.model for c in TESTED_LLM_CONFIGS],
|
||||
# )
|
||||
# def test_async_callback_failure_scenarios(
|
||||
# disable_e2b_api_key: Any,
|
||||
# client: Letta,
|
||||
# agent_state: AgentState,
|
||||
# llm_config: LLMConfig,
|
||||
# ) -> None:
|
||||
# """
|
||||
# Tests that job completion works even when callback URLs fail.
|
||||
# This ensures callback failures don't affect job processing.
|
||||
# """
|
||||
# client.agents.modify(agent_id=agent_state.id, llm_config=llm_config)
|
||||
#
|
||||
# # Test with invalid callback URL - job should still complete
|
||||
# run = client.agents.messages.create_async(
|
||||
# agent_id=agent_state.id,
|
||||
# messages=USER_MESSAGE_FORCE_REPLY,
|
||||
# callback_url="http://invalid-domain-that-does-not-exist.com/callback",
|
||||
# )
|
||||
#
|
||||
# # Wait for job completion - should work despite callback failure
|
||||
# run = wait_for_run_completion(client, run.id)
|
||||
#
|
||||
# # Validate job completed successfully
|
||||
# result = run.metadata.get("result")
|
||||
# assert result is not None, "Run metadata missing 'result' key"
|
||||
# messages = result["messages"]
|
||||
# assert_tool_response_dict_messages(messages)
|
||||
#
|
||||
# # Job should be marked as completed even if callback failed
|
||||
# assert run.status == "completed", f"Expected status 'completed', got {run.status}"
|
||||
@pytest.mark.parametrize(
|
||||
"llm_config",
|
||||
TESTED_LLM_CONFIGS,
|
||||
ids=[c.model for c in TESTED_LLM_CONFIGS],
|
||||
)
|
||||
def test_async_greeting_with_callback_url(
|
||||
disable_e2b_api_key: Any,
|
||||
client: Letta,
|
||||
agent_state: AgentState,
|
||||
llm_config: LLMConfig,
|
||||
) -> None:
|
||||
"""
|
||||
Tests sending a message as an asynchronous job with callback URL functionality.
|
||||
Validates that callbacks are properly sent with correct payload structure.
|
||||
"""
|
||||
client.agents.modify(agent_id=agent_state.id, llm_config=llm_config)
|
||||
|
||||
with callback_server() as server:
|
||||
# Create async job with callback URL
|
||||
run = client.agents.messages.create_async(
|
||||
agent_id=agent_state.id,
|
||||
messages=USER_MESSAGE_FORCE_REPLY,
|
||||
callback_url=server.url,
|
||||
)
|
||||
|
||||
# Wait for job completion
|
||||
run = wait_for_run_completion(client, run.id)
|
||||
|
||||
# Validate job completed successfully
|
||||
result = run.metadata.get("result")
|
||||
assert result is not None, "Run metadata missing 'result' key"
|
||||
messages = result["messages"]
|
||||
assert_tool_response_dict_messages(messages)
|
||||
|
||||
# Validate callback was received
|
||||
assert server.wait_for_callback(timeout=15), "Callback was not received within timeout"
|
||||
assert len(server.received_callbacks) == 1, f"Expected 1 callback, got {len(server.received_callbacks)}"
|
||||
|
||||
# Validate callback payload structure
|
||||
callback = server.received_callbacks[0]
|
||||
callback_data = callback["data"]
|
||||
|
||||
# Check required fields
|
||||
assert "job_id" in callback_data, "Callback missing 'job_id' field"
|
||||
assert "status" in callback_data, "Callback missing 'status' field"
|
||||
assert "completed_at" in callback_data, "Callback missing 'completed_at' field"
|
||||
assert "metadata" in callback_data, "Callback missing 'metadata' field"
|
||||
|
||||
# Validate field values
|
||||
assert callback_data["job_id"] == run.id, f"Job ID mismatch: {callback_data['job_id']} != {run.id}"
|
||||
assert callback_data["status"] == "completed", f"Expected status 'completed', got {callback_data['status']}"
|
||||
assert callback_data["completed_at"] is not None, "completed_at should not be None"
|
||||
assert callback_data["metadata"] is not None, "metadata should not be None"
|
||||
|
||||
# Validate that callback metadata contains the result
|
||||
assert "result" in callback_data["metadata"], "Callback metadata missing 'result' field"
|
||||
callback_result = callback_data["metadata"]["result"]
|
||||
assert callback_result == result, "Callback result doesn't match job result"
|
||||
|
||||
# Validate HTTP headers
|
||||
headers = callback["headers"]
|
||||
assert headers.get("Content-Type") == "application/json", "Callback should have JSON content type"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
"llm_config",
|
||||
TESTED_LLM_CONFIGS,
|
||||
ids=[c.model for c in TESTED_LLM_CONFIGS],
|
||||
)
|
||||
def test_async_callback_failure_scenarios(
|
||||
disable_e2b_api_key: Any,
|
||||
client: Letta,
|
||||
agent_state: AgentState,
|
||||
llm_config: LLMConfig,
|
||||
) -> None:
|
||||
"""
|
||||
Tests that job completion works even when callback URLs fail.
|
||||
This ensures callback failures don't affect job processing.
|
||||
"""
|
||||
client.agents.modify(agent_id=agent_state.id, llm_config=llm_config)
|
||||
|
||||
# Test with invalid callback URL - job should still complete
|
||||
run = client.agents.messages.create_async(
|
||||
agent_id=agent_state.id,
|
||||
messages=USER_MESSAGE_FORCE_REPLY,
|
||||
callback_url="http://invalid-domain-that-does-not-exist.com/callback",
|
||||
)
|
||||
|
||||
# Wait for job completion - should work despite callback failure
|
||||
run = wait_for_run_completion(client, run.id)
|
||||
|
||||
# Validate job completed successfully
|
||||
result = run.metadata.get("result")
|
||||
assert result is not None, "Run metadata missing 'result' key"
|
||||
messages = result["messages"]
|
||||
assert_tool_response_dict_messages(messages)
|
||||
|
||||
# Job should be marked as completed even if callback failed
|
||||
assert run.status == "completed", f"Expected status 'completed', got {run.status}"
|
||||
|
||||
# Validate callback failure was properly recorded
|
||||
assert run.callback_sent_at is not None, "callback_sent_at should be set even for failed callbacks"
|
||||
assert run.callback_error is not None, "callback_error should be set to error message for failed callbacks"
|
||||
assert isinstance(run.callback_error, str), "callback_error should be error message string for failed callbacks"
|
||||
assert "Failed to dispatch callback" in run.callback_error, "callback_error should contain error details"
|
||||
assert run.id in run.callback_error, "callback_error should contain job ID"
|
||||
assert "invalid-domain-that-does-not-exist.com" in run.callback_error, "callback_error should contain failed URL"
|
||||
|
||||
|
||||
@pytest.mark.parametrize(
|
||||
|
||||
@@ -404,7 +404,7 @@ def test_send_system_message(client: LettaSDKClient, agent: AgentState):
|
||||
assert send_system_message_response, "Sending message failed"
|
||||
|
||||
|
||||
def test_function_return_limit(client: LettaSDKClient, agent: AgentState):
|
||||
def test_function_return_limit(disable_e2b_api_key, client: LettaSDKClient, agent: AgentState):
|
||||
"""Test to see if the function return limit works"""
|
||||
|
||||
def big_return():
|
||||
|
||||
Reference in New Issue
Block a user