diff --git a/src/queue/queueRuntime.ts b/src/queue/queueRuntime.ts index 498efc2..8a5639e 100644 --- a/src/queue/queueRuntime.ts +++ b/src/queue/queueRuntime.ts @@ -16,6 +16,10 @@ type QueueItemBase = { id: string; /** Optional client-side message correlation ID from submit payloads. */ clientMessageId?: string; + /** Optional agent scope for listener-mode attribution. */ + agentId?: string; + /** Optional conversation scope for listener-mode attribution. */ + conversationId?: string; source: QueueItemSource; enqueuedAt: number; }; @@ -55,6 +59,13 @@ export function isCoalescable(kind: QueueItemKind): boolean { return kind === "message" || kind === "task_notification"; } +function hasSameScope(a: QueueItem, b: QueueItem): boolean { + return ( + (a.agentId ?? null) === (b.agentId ?? null) && + (a.conversationId ?? null) === (b.conversationId ?? null) + ); +} + // ── Batch / callbacks ──────────────────────────────────────────── export interface DequeuedBatch { @@ -77,7 +88,11 @@ export interface QueueCallbacks { * Only fires when queue is non-empty. */ onBlocked?: (reason: QueueBlockedReason, queueLen: number) => void; - onCleared?: (reason: QueueClearedReason, clearedCount: number) => void; + onCleared?: ( + reason: QueueClearedReason, + clearedCount: number, + items: QueueItem[], + ) => void; /** * Fired when an item is dropped. * queueLen is the post-operation queue depth: @@ -226,9 +241,12 @@ export class QueueRuntime { // Drain contiguous coalescable items from head const batch: QueueItem[] = []; + const first = this.store[0]; while ( + first !== undefined && this.store.length > 0 && - isCoalescable(this.store[0]?.kind ?? "approval_result") + isCoalescable(this.store[0]?.kind ?? "approval_result") && + hasSameScope(first, this.store[0] as QueueItem) ) { const item = this.store.shift(); if (item) batch.push(item); @@ -300,10 +318,11 @@ export class QueueRuntime { /** Remove all items and fire onCleared. */ clear(reason: QueueClearedReason): void { const count = this.store.length; + const clearedItems = this.store.slice(); this.store.length = 0; this.lastEmittedBlockedReason = null; this.blockedEmittedForNonEmpty = false; - this.safeCallback("onCleared", reason, count); + this.safeCallback("onCleared", reason, count, clearedItems); } // ── Accessors ────────────────────────────────────────────────── diff --git a/src/tests/queue/queueRuntime.test.ts b/src/tests/queue/queueRuntime.test.ts index bbd29db..65114be 100644 --- a/src/tests/queue/queueRuntime.test.ts +++ b/src/tests/queue/queueRuntime.test.ts @@ -12,6 +12,20 @@ function makeMsg(text = "hello"): Omit { return { kind: "message", source: "user", content: text }; } +function makeScopedMsg(params: { + text?: string; + agentId?: string; + conversationId?: string; +}): Omit { + return { + kind: "message", + source: "user", + content: params.text ?? "hello", + agentId: params.agentId, + conversationId: params.conversationId, + }; +} + function makeTask( text = "", ): Omit< @@ -194,6 +208,32 @@ describe("dequeue coalescable items", () => { expect(b?.batchId).toMatch(/^batch-\d+$/); }); + test("does not coalesce adjacent items from different conversation scopes", () => { + const q = new QueueRuntime(); + q.enqueue( + makeScopedMsg({ + text: "a", + agentId: "agent-1", + conversationId: "default", + }), + ); + q.enqueue( + makeScopedMsg({ + text: "b", + agentId: "agent-2", + conversationId: "default", + }), + ); + + const first = q.tryDequeue(null); + expect(first?.items).toHaveLength(1); + expect((first?.items[0] as MessageQueueItem).content).toBe("a"); + + const second = q.tryDequeue(null); + expect(second?.items).toHaveLength(1); + expect((second?.items[0] as MessageQueueItem).content).toBe("b"); + }); + test("length is 0 after full dequeue", () => { const q = new QueueRuntime(); q.enqueue(makeMsg()); diff --git a/src/tests/websocket/listen-client-protocol.test.ts b/src/tests/websocket/listen-client-protocol.test.ts index 8a5d897..8881030 100644 --- a/src/tests/websocket/listen-client-protocol.test.ts +++ b/src/tests/websocket/listen-client-protocol.test.ts @@ -6,6 +6,7 @@ import type { ApprovalCreate } from "@letta-ai/letta-client/resources/agents/mes import WebSocket from "ws"; import { buildConversationMessagesCreateRequestBody } from "../../agent/message"; import { INTERRUPTED_BY_USER } from "../../constants"; +import type { MessageQueueItem } from "../../queue/queueRuntime"; import type { ControlRequest, ControlResponseBody } from "../../types/protocol"; import { __listenClientTestUtils, @@ -244,6 +245,84 @@ describe("listen-client requestApprovalOverWS", () => { }); }); +describe("listen-client conversation-scoped protocol events", () => { + test("queue lifecycle events carry agent_id and conversation_id from the queued item", () => { + const runtime = __listenClientTestUtils.createRuntime(); + const socket = new MockSocket(WebSocket.OPEN); + runtime.socket = socket as unknown as WebSocket; + + const input: Omit = { + kind: "message", + source: "user", + content: "hello", + clientMessageId: "cm-queue-1", + agentId: "agent-default", + conversationId: "default", + }; + const item = runtime.queueRuntime.enqueue(input); + expect(item).not.toBeNull(); + + runtime.queueRuntime.tryDequeue("runtime_busy"); + + const enqueued = JSON.parse(socket.sentPayloads[0] as string); + expect(enqueued.type).toBe("queue_item_enqueued"); + expect(enqueued.agent_id).toBe("agent-default"); + expect(enqueued.conversation_id).toBe("default"); + + const blocked = JSON.parse(socket.sentPayloads[1] as string); + expect(blocked.type).toBe("queue_blocked"); + expect(blocked.agent_id).toBe("agent-default"); + expect(blocked.conversation_id).toBe("default"); + }); + + test("cancel_ack includes agent_id and conversation_id", () => { + const runtime = __listenClientTestUtils.createRuntime(); + const socket = new MockSocket(WebSocket.OPEN); + runtime.activeAgentId = "agent-123"; + runtime.activeConversationId = "default"; + runtime.activeRunId = "run-123"; + + __listenClientTestUtils.emitCancelAck( + socket as unknown as WebSocket, + runtime, + { + requestId: "cancel-1", + accepted: true, + }, + ); + + const sent = JSON.parse(socket.sentPayloads[0] as string); + expect(sent.type).toBe("cancel_ack"); + expect(sent.agent_id).toBe("agent-123"); + expect(sent.conversation_id).toBe("default"); + expect(sent.run_id).toBe("run-123"); + }); + + test("queue_batch_dequeued keeps the batch scope", () => { + const runtime = __listenClientTestUtils.createRuntime(); + const socket = new MockSocket(WebSocket.OPEN); + runtime.socket = socket as unknown as WebSocket; + + const input: Omit = { + kind: "message", + source: "user", + content: "hello", + clientMessageId: "cm-queue-2", + agentId: "agent-xyz", + conversationId: "conv-xyz", + }; + + runtime.queueRuntime.enqueue(input); + + runtime.queueRuntime.tryDequeue(null); + + const dequeued = JSON.parse(socket.sentPayloads[1] as string); + expect(dequeued.type).toBe("queue_batch_dequeued"); + expect(dequeued.agent_id).toBe("agent-xyz"); + expect(dequeued.conversation_id).toBe("conv-xyz"); + }); +}); + describe("listen-client state_response control protocol", () => { test("always advertises control_response capability", () => { const runtime = __listenClientTestUtils.createRuntime(); diff --git a/src/types/protocol.ts b/src/types/protocol.ts index a27c277..eed0f0f 100644 --- a/src/types/protocol.ts +++ b/src/types/protocol.ts @@ -75,6 +75,10 @@ export interface MessageEnvelope { uuid: string; /** Monotonic per-session event sequence. Optional for backward compatibility. */ event_seq?: number; + /** Agent that triggered this event. Used with default conversation scoping. */ + agent_id?: string; + /** Conversation that triggered this event. Used for conversation-scoped filtering. */ + conversation_id?: string; } // ═══════════════════════════════════════════════════════════════ @@ -162,6 +166,8 @@ export type MessageWire = { type: "message"; session_id: string; uuid: string; + agent_id?: string; + conversation_id?: string; } & LettaStreamingResponse; // ═══════════════════════════════════════════════════════════════ @@ -422,6 +428,10 @@ export interface ControlRequest { type: "control_request"; request_id: string; request: ControlRequestBody; + /** Agent that triggered this control request. */ + agent_id?: string; + /** Conversation that triggered this control request. */ + conversation_id?: string; } // SDK → CLI request subtypes diff --git a/src/websocket/listen-client.ts b/src/websocket/listen-client.ts index a4b5e9a..1c69b8a 100644 --- a/src/websocket/listen-client.ts +++ b/src/websocket/listen-client.ts @@ -120,6 +120,8 @@ interface RunStartedMessage { batch_id: string; event_seq?: number; session_id?: string; + agent_id?: string; + conversation_id?: string; } interface RunRequestErrorMessage { @@ -132,6 +134,8 @@ interface RunRequestErrorMessage { batch_id?: string; event_seq?: number; session_id?: string; + agent_id?: string; + conversation_id?: string; } interface ModeChangeMessage { @@ -471,6 +475,35 @@ const MAX_RETRY_DURATION_MS = 5 * 60 * 1000; // 5 minutes const INITIAL_RETRY_DELAY_MS = 1000; // 1 second const MAX_RETRY_DELAY_MS = 30000; // 30 seconds +function getQueueItemScope(item?: QueueItem | null): { + agent_id?: string; + conversation_id?: string; +} { + if (!item) { + return {}; + } + return { + agent_id: item.agentId, + conversation_id: item.conversationId, + }; +} + +function getQueueItemsScope(items: QueueItem[]): { + agent_id?: string; + conversation_id?: string; +} { + const first = items[0]; + if (!first) { + return {}; + } + const sameScope = items.every( + (item) => + (item.agentId ?? null) === (first.agentId ?? null) && + (item.conversationId ?? null) === (first.conversationId ?? null), + ); + return sameScope ? getQueueItemScope(first) : {}; +} + function createRuntime(): ListenerRuntime { const bootWorkingDirectory = process.env.USER_CWD || process.cwd(); const runtime: ListenerRuntime = { @@ -523,6 +556,7 @@ function createRuntime(): ListenerRuntime { queue_len: queueLen, session_id: runtime.sessionId, uuid: `q-enq-${item.id}`, + ...getQueueItemScope(item), }); } }, @@ -536,6 +570,7 @@ function createRuntime(): ListenerRuntime { queue_len_after: batch.queueLenAfter, session_id: runtime.sessionId, uuid: `q-deq-${batch.batchId}`, + ...getQueueItemsScope(batch.items), }); } }, @@ -547,10 +582,11 @@ function createRuntime(): ListenerRuntime { queue_len: queueLen, session_id: runtime.sessionId, uuid: `q-blk-${crypto.randomUUID()}`, + ...getQueueItemScope(runtime.queueRuntime.items[0]), }); } }, - onCleared: (reason, clearedCount) => { + onCleared: (reason, clearedCount, items) => { if (runtime.socket?.readyState === WebSocket.OPEN) { emitToWS(runtime.socket, { type: "queue_cleared", @@ -558,6 +594,7 @@ function createRuntime(): ListenerRuntime { cleared_count: clearedCount, session_id: runtime.sessionId, uuid: `q-clr-${crypto.randomUUID()}`, + ...getQueueItemsScope(items), }); } }, @@ -571,6 +608,7 @@ function createRuntime(): ListenerRuntime { queue_len: queueLen, session_id: runtime.sessionId, uuid: `q-drp-${item.id}`, + ...getQueueItemScope(item), }); } }, @@ -942,6 +980,8 @@ function emitCancelAck( accepted: boolean; reason?: string; runId?: string | null; + agentId?: string | null; + conversationId?: string | null; }, ): void { emitToWS(socket, { @@ -950,6 +990,9 @@ function emitCancelAck( accepted: params.accepted, reason: params.reason, run_id: params.runId ?? runtime.activeRunId, + agent_id: params.agentId ?? runtime.activeAgentId ?? undefined, + conversation_id: + params.conversationId ?? runtime.activeConversationId ?? undefined, session_id: runtime.sessionId, uuid: `cancel-ack-${params.requestId}`, } as CancelAckMessage); @@ -1441,6 +1484,7 @@ function emitInterruptToolReturnMessage( id: `message-${crypto.randomUUID()}`, date: new Date().toISOString(), run_id: resolvedRunId, + agent_id: runtime.activeAgentId ?? undefined, tool_returns: [ { tool_call_id: toolReturn.tool_call_id, @@ -1452,6 +1496,7 @@ function emitInterruptToolReturnMessage( ], session_id: runtime.sessionId, uuid: `${uuidPrefix}-${crypto.randomUUID()}`, + conversation_id: runtime.activeConversationId ?? undefined, } as unknown as MessageWire); } } @@ -1815,6 +1860,8 @@ async function sendMessageStreamWithRetry( delay_ms: delayMs, session_id: runtime.sessionId, uuid: `retry-${crypto.randomUUID()}`, + agent_id: runtime.activeAgentId ?? undefined, + conversation_id: conversationId, } as RetryMessage); await new Promise((resolve) => setTimeout(resolve, delayMs)); @@ -1840,6 +1887,8 @@ async function sendMessageStreamWithRetry( delay_ms: delayMs, session_id: runtime.sessionId, uuid: `retry-${crypto.randomUUID()}`, + agent_id: runtime.activeAgentId ?? undefined, + conversation_id: conversationId, } as RetryMessage); await new Promise((resolve) => setTimeout(resolve, delayMs)); @@ -1982,6 +2031,8 @@ async function recoverPendingApprovals( stop_reason: "error", session_id: runtime.sessionId, uuid: `error-${crypto.randomUUID()}`, + agent_id: agentId, + conversation_id: conversationId, }); runtime.lastStopReason = "requires_approval"; return; @@ -2031,6 +2082,8 @@ async function recoverPendingApprovals( : "auto-approved", session_id: runtime.sessionId, uuid: `auto-approval-${ac.approval.toolCallId}`, + agent_id: agentId, + conversation_id: conversationId, } as AutoApprovalMessage); } @@ -2072,6 +2125,8 @@ async function recoverPendingApprovals( blocked_path: null, ...(diffs.length > 0 ? { diffs } : {}), }, + agent_id: agentId, + conversation_id: conversationId, }; const responseBody = await requestApprovalOverWS( @@ -2105,6 +2160,8 @@ async function recoverPendingApprovals( matched_rule: "canUseTool callback", session_id: runtime.sessionId, uuid: `auto-approval-${ac.approval.toolCallId}`, + agent_id: agentId, + conversation_id: conversationId, } as AutoApprovalMessage); } else { decisions.push({ @@ -2496,6 +2553,8 @@ async function connectWithRetry( stop_reason: "error", session_id: runtime.sessionId, uuid: `error-${crypto.randomUUID()}`, + agent_id: runtime.activeAgentId ?? undefined, + conversation_id: runtime.activeConversationId ?? undefined, }); } finally { runtime.pendingTurns--; @@ -2529,6 +2588,8 @@ async function connectWithRetry( stop_reason: "error", session_id: runtime.sessionId, uuid: `error-${crypto.randomUUID()}`, + agent_id: runtime.activeAgentId ?? undefined, + conversation_id: runtime.activeConversationId ?? undefined, }); return; } @@ -2548,6 +2609,8 @@ async function connectWithRetry( content: userPayload.content, clientMessageId: userPayload.client_message_id ?? `cm-submit-${crypto.randomUUID()}`, + agentId: parsed.agentId ?? undefined, + conversationId: parsed.conversationId || "default", } as Parameters[0]); enqueuedQueueItemId = enqueuedItem?.id ?? null; // Emit blocked on state transition when turns are already queued. @@ -2850,6 +2913,8 @@ async function handleIncomingMessage( type: "run_started", runId: maybeRunId, batch_id: dequeuedBatchId, + agent_id: agentId, + conversation_id: conversationId, }); } } @@ -2864,6 +2929,8 @@ async function handleIncomingMessage( run_id: runId || errorInfo.run_id, session_id: runtime.sessionId, uuid: `error-${crypto.randomUUID()}`, + agent_id: agentId, + conversation_id: conversationId, }); } @@ -2883,6 +2950,8 @@ async function handleIncomingMessage( session_id: runtime.sessionId, uuid: chunkWithIds.otid || chunkWithIds.id || crypto.randomUUID(), + agent_id: agentId, + conversation_id: conversationId, } as unknown as MessageWire); } } @@ -2954,6 +3023,8 @@ async function handleIncomingMessage( run_id: runId || msgRunIds[msgRunIds.length - 1] || undefined, session_id: runtime.sessionId, uuid: `recovery-${crypto.randomUUID()}`, + agent_id: agentId, + conversation_id: conversationId, } as RecoveryMessage); try { @@ -3029,6 +3100,8 @@ async function handleIncomingMessage( run_id: runId, session_id: runtime.sessionId, uuid: `error-${crypto.randomUUID()}`, + agent_id: agentId, + conversation_id: conversationId, }); emitTurnResult(socket, runtime, { subtype: "error", @@ -3055,6 +3128,8 @@ async function handleIncomingMessage( stop_reason: "error", session_id: runtime.sessionId, uuid: `error-${crypto.randomUUID()}`, + agent_id: agentId, + conversation_id: conversationId, }); emitTurnResult(socket, runtime, { subtype: "error", @@ -3126,6 +3201,8 @@ async function handleIncomingMessage( : "auto-approved", session_id: runtime.sessionId, uuid: `auto-approval-${ac.approval.toolCallId}`, + agent_id: agentId, + conversation_id: conversationId, } as AutoApprovalMessage); } @@ -3166,6 +3243,8 @@ async function handleIncomingMessage( blocked_path: null, ...(diffs.length > 0 ? { diffs } : {}), }, + agent_id: agentId, + conversation_id: conversationId, }; const responseBody = await requestApprovalOverWS( @@ -3200,6 +3279,8 @@ async function handleIncomingMessage( matched_rule: "canUseTool callback", session_id: runtime.sessionId, uuid: `auto-approval-${ac.approval.toolCallId}`, + agent_id: agentId, + conversation_id: conversationId, } as AutoApprovalMessage); } else { decisions.push({ @@ -3353,6 +3434,8 @@ async function handleIncomingMessage( type: "run_request_error", error: errorPayload, batch_id: dequeuedBatchId, + agent_id: agentId, + conversation_id: conversationId, }); } @@ -3363,6 +3446,8 @@ async function handleIncomingMessage( stop_reason: "error", session_id: runtime.sessionId, uuid: `error-${crypto.randomUUID()}`, + agent_id: agentId || undefined, + conversation_id: conversationId, }); emitTurnResult(socket, runtime, { subtype: "error", @@ -3410,6 +3495,7 @@ export const __listenClientTestUtils = { buildStateResponse, handleCwdChange, emitToWS, + emitCancelAck, getConversationWorkingDirectory, rememberPendingApprovalBatchIds, resolvePendingApprovalBatchId,