diff --git a/letta/services/provider_trace_backends/socket.py b/letta/services/provider_trace_backends/socket.py index 1d375e57..2e2cab4a 100644 --- a/letta/services/provider_trace_backends/socket.py +++ b/letta/services/provider_trace_backends/socket.py @@ -4,6 +4,7 @@ import json import os import socket as socket_module import threading +import time from datetime import datetime, timezone from typing import Any @@ -18,7 +19,8 @@ logger = get_logger(__name__) # Bump this when making breaking changes to the record schema. # Must match ProtocolVersion in apps/crouton/main.go. # v2: Added user_id, compaction_settings (summarization), llm_config (non-summarization) -PROTOCOL_VERSION = 2 +# v3: Increased buffer to 128MB, native sidecar for deterministic startup +PROTOCOL_VERSION = 3 class SocketProviderTraceBackend(ProviderTraceBackendClient): @@ -106,17 +108,29 @@ class SocketProviderTraceBackend(ProviderTraceBackendClient): thread = threading.Thread(target=self._send_async, args=(record,), daemon=True) thread.start() - def _send_async(self, record: dict[str, Any]) -> None: + def _send_async(self, record: dict[str, Any], max_retries: int = 3) -> None: """Send record to Unix socket (runs in background thread).""" - try: - if not os.path.exists(self.socket_path): - logger.warning(f"Crouton socket not found at {self.socket_path}") - return + base_delay = 0.5 + for attempt in range(max_retries): + try: + if not os.path.exists(self.socket_path): + if attempt < max_retries - 1: + time.sleep(base_delay * (2**attempt)) + continue + logger.warning(f"Crouton socket not found at {self.socket_path} after {max_retries} attempts") + return - with socket_module.socket(socket_module.AF_UNIX, socket_module.SOCK_STREAM) as sock: - sock.settimeout(5.0) - sock.connect(self.socket_path) - payload = json.dumps(record, default=str) + "\n" - sock.sendall(payload.encode()) - except Exception as e: - logger.warning(f"Failed to send telemetry to Crouton: {e}") + with socket_module.socket(socket_module.AF_UNIX, socket_module.SOCK_STREAM) as sock: + sock.settimeout(5.0) + sock.connect(self.socket_path) + payload = json.dumps(record, default=str) + "\n" + sock.sendall(payload.encode()) + return + except BrokenPipeError: + if attempt < max_retries - 1: + time.sleep(base_delay * (2**attempt)) + continue + logger.warning(f"Failed to send telemetry to Crouton: broken pipe after {max_retries} attempts") + except Exception as e: + logger.warning(f"Failed to send telemetry to Crouton: {e}") + return diff --git a/tests/test_provider_trace_backends.py b/tests/test_provider_trace_backends.py index f1051d1c..83547c1d 100644 --- a/tests/test_provider_trace_backends.py +++ b/tests/test_provider_trace_backends.py @@ -288,8 +288,8 @@ class TestSocketProviderTraceBackend: assert captured_records[0]["error"] == "Rate limit exceeded" assert captured_records[0]["response"] is None - def test_record_includes_v2_protocol_fields(self): - """Test that v2 protocol fields are included in the socket record.""" + def test_record_includes_v3_protocol_fields(self): + """Test that v3 protocol fields are included in the socket record.""" trace = ProviderTrace( request_json={"model": "gpt-4"}, response_json={"id": "test"}, @@ -312,7 +312,7 @@ class TestSocketProviderTraceBackend: assert len(captured_records) == 1 record = captured_records[0] - assert record["protocol_version"] == 2 + assert record["protocol_version"] == 3 assert record["org_id"] == "org-456" assert record["user_id"] == "user-456" assert record["compaction_settings"] == {"mode": "sliding_window"}