From 022dafadbc92413310b29397678b7120d4615744 Mon Sep 17 00:00:00 2001 From: jnjpng Date: Mon, 16 Mar 2026 07:55:10 -0700 Subject: [PATCH] fix: add sendMessageStream boundary debug metadata (#1403) Co-authored-by: Letta Code --- src/agent/message.ts | 63 ++++++++++++++++++++++++++++++++++++++------ 1 file changed, 55 insertions(+), 8 deletions(-) diff --git a/src/agent/message.ts b/src/agent/message.ts index 1c966f5..a0e7150 100644 --- a/src/agent/message.ts +++ b/src/agent/message.ts @@ -14,6 +14,7 @@ import { captureToolExecutionContext, waitForToolsetReady, } from "../tools/manager"; +import { debugLog, debugWarn } from "../utils/debug"; import { isTimingsEnabled } from "../utils/timing"; import { type ApprovalNormalizationOptions, @@ -24,6 +25,7 @@ import { buildClientSkillsPayload } from "./clientSkills"; const streamRequestStartTimes = new WeakMap(); const streamToolContextIds = new WeakMap(); + export type StreamRequestContext = { conversationId: string; resolvedConversationId: string; @@ -164,16 +166,61 @@ export async function sendMessageStream( extraHeaders["X-Experimental-OpenAI-Responses-Websocket"] = "true"; } - const stream = await client.conversations.messages.create( + const messageSummary = messages + .map((item) => { + if (item.type === "approval") { + return `approval:${item.approvals?.length ?? 0}`; + } + if (item.type !== "message") { + return `unknown:${item.type}`; + } + const content = item.content; + if (typeof content === "string") { + return `message:str:${content.length}`; + } + return `message:parts:${content.length}`; + }) + .join(","); + + debugLog( + "send-message-stream", + "request_start conversation_id=%s agent_id=%s messages=%s stream_tokens=%s background=%s max_retries=%s", resolvedConversationId, - requestBody, - { - ...requestOptions, - headers: { - ...((requestOptions.headers as Record) ?? {}), - ...extraHeaders, + opts.agentId ?? "none", + messageSummary || "(empty)", + opts.streamTokens ?? true, + opts.background ?? true, + requestOptions.maxRetries ?? "default", + ); + + let stream: Stream; + try { + stream = await client.conversations.messages.create( + resolvedConversationId, + requestBody, + { + ...requestOptions, + headers: { + ...((requestOptions.headers as Record) ?? {}), + ...extraHeaders, + }, }, - }, + ); + } catch (error) { + debugWarn( + "send-message-stream", + "request_error conversation_id=%s status=%s error=%s", + resolvedConversationId, + (error as { status?: number })?.status ?? "none", + error instanceof Error ? error.message : String(error), + ); + throw error; + } + + debugLog( + "send-message-stream", + "request_ok conversation_id=%s", + resolvedConversationId, ); if (requestStartTime !== undefined) {