refactor: extract DisplayPipeline from processMessage stream loop (#550)
Extracts a DisplayPipeline async generator that wraps the raw SDK stream and yields clean DisplayEvent types. Refactors processMessage() to consume pipeline events instead of raw StreamMsg objects. - Locks foreground on first substantive event (reasoning/tool_call/etc), eliminating buffering delay for real-time display - Filters pre-foreground error/retry events to prevent false approval recovery - Re-throws 429 in rejectApproval to prevent rate-limit loops - Gates reasoning log on display config - 12 pipeline unit tests + updated integration tests (56 total) - Net -224 lines from bot.ts Written by Cameron ◯ Letta Code "The purpose of abstraction is not to be vague, but to create a new semantic level in which one can be absolutely precise." -- Edsger Dijkstra
This commit is contained in:
@@ -82,18 +82,9 @@ describe('stream logging levels', () => {
|
|||||||
const infoMessages = loggerSpies.info.mock.calls.map(([message]) => String(message));
|
const infoMessages = loggerSpies.info.mock.calls.map(([message]) => String(message));
|
||||||
const traceMessages = loggerSpies.trace.mock.calls.map(([message]) => String(message));
|
const traceMessages = loggerSpies.trace.mock.calls.map(([message]) => String(message));
|
||||||
|
|
||||||
expect(debugMessages.some((m) => m.includes('Buffering run-scoped pre-foreground display event'))).toBe(false);
|
// Run ID filtering now handled by DisplayPipeline; verify summary log is emitted at info level
|
||||||
expect(debugMessages.some((m) => m.includes('Deferring run-scoped pre-foreground event'))).toBe(false);
|
|
||||||
expect(debugMessages.some((m) => m.includes('Skipping non-foreground stream event'))).toBe(false);
|
|
||||||
|
|
||||||
expect(infoMessages.some((m) => m.includes('type=tool_call'))).toBe(false);
|
|
||||||
expect(infoMessages.some((m) => m.includes('type=tool_result'))).toBe(false);
|
|
||||||
|
|
||||||
expect(traceMessages.some((m) => m.includes('Buffering run-scoped pre-foreground display event'))).toBe(true);
|
|
||||||
expect(traceMessages.some((m) => m.includes('Skipping non-foreground stream event'))).toBe(true);
|
|
||||||
expect(traceMessages.some((m) => m.includes('type=tool_call'))).toBe(true);
|
|
||||||
expect(traceMessages.some((m) => m.includes('type=tool_result'))).toBe(true);
|
|
||||||
|
|
||||||
expect(infoMessages.some((m) => m.includes('Filtered') && m.includes('non-foreground event(s)'))).toBe(true);
|
expect(infoMessages.some((m) => m.includes('Filtered') && m.includes('non-foreground event(s)'))).toBe(true);
|
||||||
|
// Foreground run locking is logged at info level
|
||||||
|
expect(infoMessages.some((m) => m.includes('Foreground run locked'))).toBe(true);
|
||||||
});
|
});
|
||||||
});
|
});
|
||||||
896
src/core/bot.ts
896
src/core/bot.ts
File diff suppressed because it is too large
Load Diff
211
src/core/display-pipeline.test.ts
Normal file
211
src/core/display-pipeline.test.ts
Normal file
@@ -0,0 +1,211 @@
|
|||||||
|
import { describe, it, expect } from 'vitest';
|
||||||
|
import { createDisplayPipeline, type DisplayEvent } from './display-pipeline.js';
|
||||||
|
import type { StreamMsg } from './types.js';
|
||||||
|
|
||||||
|
/** Helper: collect all DisplayEvents from a pipeline fed with the given messages. */
|
||||||
|
async function collect(
|
||||||
|
messages: StreamMsg[],
|
||||||
|
convKey = 'test',
|
||||||
|
): Promise<DisplayEvent[]> {
|
||||||
|
async function* feed(): AsyncIterable<StreamMsg> {
|
||||||
|
for (const msg of messages) yield msg;
|
||||||
|
}
|
||||||
|
const events: DisplayEvent[] = [];
|
||||||
|
for await (const evt of createDisplayPipeline(feed(), {
|
||||||
|
convKey,
|
||||||
|
resultFingerprints: new Map(),
|
||||||
|
})) {
|
||||||
|
events.push(evt);
|
||||||
|
}
|
||||||
|
return events;
|
||||||
|
}
|
||||||
|
|
||||||
|
describe('createDisplayPipeline', () => {
|
||||||
|
it('locks foreground on first reasoning event and yields immediately', async () => {
|
||||||
|
const events = await collect([
|
||||||
|
{ type: 'reasoning', content: 'thinking...', runId: 'run-1' },
|
||||||
|
{ type: 'assistant', content: 'reply', runId: 'run-1' },
|
||||||
|
{ type: 'result', success: true, result: 'reply', runIds: ['run-1'] },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const types = events.map(e => e.type);
|
||||||
|
// Reasoning should appear BEFORE text -- no buffering
|
||||||
|
expect(types[0]).toBe('reasoning');
|
||||||
|
expect(types[1]).toBe('text');
|
||||||
|
expect(types[2]).toBe('complete');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('locks foreground on first tool_call event', async () => {
|
||||||
|
const events = await collect([
|
||||||
|
{ type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: { command: 'echo hi' }, runId: 'run-1' },
|
||||||
|
{ type: 'assistant', content: 'done', runId: 'run-1' },
|
||||||
|
{ type: 'result', success: true, result: 'done', runIds: ['run-1'] },
|
||||||
|
]);
|
||||||
|
|
||||||
|
expect(events[0].type).toBe('tool_call');
|
||||||
|
expect(events[1].type).toBe('text');
|
||||||
|
expect(events[2].type).toBe('complete');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('filters pre-foreground error events to prevent false retry triggers', async () => {
|
||||||
|
const events = await collect([
|
||||||
|
{ type: 'error', runId: 'run-bg', message: 'conflict waiting for approval', stopReason: 'error' },
|
||||||
|
{ type: 'result', success: false, error: 'error', runIds: ['run-main'] },
|
||||||
|
]);
|
||||||
|
|
||||||
|
// Pre-foreground error is filtered (not yielded). Only the result passes through.
|
||||||
|
const errorEvt = events.find(e => e.type === 'error');
|
||||||
|
const completeEvt = events.find(e => e.type === 'complete');
|
||||||
|
expect(errorEvt).toBeUndefined();
|
||||||
|
expect(completeEvt).toBeDefined();
|
||||||
|
if (completeEvt?.type === 'complete') {
|
||||||
|
expect(completeEvt.runIds).toContain('run-main');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('rebinds foreground on assistant event with new run ID', async () => {
|
||||||
|
const events = await collect([
|
||||||
|
{ type: 'assistant', content: 'before tool ', runId: 'run-1' },
|
||||||
|
{ type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: {}, runId: 'run-1' },
|
||||||
|
{ type: 'assistant', content: 'after tool', runId: 'run-2' },
|
||||||
|
{ type: 'result', success: true, result: 'before tool after tool', runIds: ['run-2'] },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const textEvents = events.filter(e => e.type === 'text');
|
||||||
|
// Both assistant events should pass through (rebind on run-2)
|
||||||
|
expect(textEvents.length).toBe(2);
|
||||||
|
expect(events.find(e => e.type === 'complete')).toBeDefined();
|
||||||
|
});
|
||||||
|
|
||||||
|
it('filters non-foreground events after lock', async () => {
|
||||||
|
const events = await collect([
|
||||||
|
{ type: 'reasoning', content: 'foreground thinking', runId: 'run-1' },
|
||||||
|
{ type: 'reasoning', content: 'background noise', runId: 'run-2' },
|
||||||
|
{ type: 'assistant', content: 'reply', runId: 'run-1' },
|
||||||
|
{ type: 'result', success: true, result: 'reply', runIds: ['run-1'] },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const reasoningEvents = events.filter(e => e.type === 'reasoning');
|
||||||
|
// Only foreground reasoning should appear (run-2 filtered after lock to run-1)
|
||||||
|
expect(reasoningEvents.length).toBe(1);
|
||||||
|
if (reasoningEvents[0].type === 'reasoning') {
|
||||||
|
expect(reasoningEvents[0].content).toBe('foreground thinking');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('accumulates reasoning chunks and flushes on type change', async () => {
|
||||||
|
const events = await collect([
|
||||||
|
{ type: 'reasoning', content: 'part 1 ', runId: 'run-1' },
|
||||||
|
{ type: 'reasoning', content: 'part 2', runId: 'run-1' },
|
||||||
|
{ type: 'assistant', content: 'reply', runId: 'run-1' },
|
||||||
|
{ type: 'result', success: true, result: 'reply', runIds: ['run-1'] },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const reasoningEvents = events.filter(e => e.type === 'reasoning');
|
||||||
|
// Multiple reasoning chunks should be accumulated into one event
|
||||||
|
expect(reasoningEvents.length).toBe(1);
|
||||||
|
if (reasoningEvents[0].type === 'reasoning') {
|
||||||
|
expect(reasoningEvents[0].content).toBe('part 1 part 2');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('prefers streamed text over result field on divergence', async () => {
|
||||||
|
const events = await collect([
|
||||||
|
{ type: 'assistant', content: 'streamed reply', runId: 'run-1' },
|
||||||
|
{ type: 'result', success: true, result: 'result field reply', runIds: ['run-1'] },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const complete = events.find(e => e.type === 'complete');
|
||||||
|
expect(complete).toBeDefined();
|
||||||
|
if (complete?.type === 'complete') {
|
||||||
|
expect(complete.text).toBe('streamed reply');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('falls back to result field when no streamed text', async () => {
|
||||||
|
const events = await collect([
|
||||||
|
{ type: 'result', success: true, result: 'result only', runIds: ['run-1'] },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const complete = events.find(e => e.type === 'complete');
|
||||||
|
expect(complete).toBeDefined();
|
||||||
|
if (complete?.type === 'complete') {
|
||||||
|
expect(complete.text).toBe('result only');
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('detects stale duplicate results by run fingerprint', async () => {
|
||||||
|
const fingerprints = new Map<string, string>();
|
||||||
|
|
||||||
|
// First call -- fresh
|
||||||
|
const events1 = await (async () => {
|
||||||
|
async function* feed(): AsyncIterable<StreamMsg> {
|
||||||
|
yield { type: 'result', success: true, result: 'first', runIds: ['run-1'] };
|
||||||
|
}
|
||||||
|
const events: DisplayEvent[] = [];
|
||||||
|
for await (const evt of createDisplayPipeline(feed(), { convKey: 'test', resultFingerprints: fingerprints })) {
|
||||||
|
events.push(evt);
|
||||||
|
}
|
||||||
|
return events;
|
||||||
|
})();
|
||||||
|
|
||||||
|
// Second call with same runIds -- stale
|
||||||
|
const events2 = await (async () => {
|
||||||
|
async function* feed(): AsyncIterable<StreamMsg> {
|
||||||
|
yield { type: 'result', success: true, result: 'second', runIds: ['run-1'] };
|
||||||
|
}
|
||||||
|
const events: DisplayEvent[] = [];
|
||||||
|
for await (const evt of createDisplayPipeline(feed(), { convKey: 'test', resultFingerprints: fingerprints })) {
|
||||||
|
events.push(evt);
|
||||||
|
}
|
||||||
|
return events;
|
||||||
|
})();
|
||||||
|
|
||||||
|
const c1 = events1.find(e => e.type === 'complete');
|
||||||
|
const c2 = events2.find(e => e.type === 'complete');
|
||||||
|
expect(c1?.type === 'complete' && c1.stale).toBe(false);
|
||||||
|
expect(c2?.type === 'complete' && c2.stale).toBe(true);
|
||||||
|
});
|
||||||
|
|
||||||
|
it('marks cancelled results', async () => {
|
||||||
|
const events = await collect([
|
||||||
|
{ type: 'result', success: true, result: '', stopReason: 'cancelled', runIds: ['run-1'] },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const complete = events.find(e => e.type === 'complete');
|
||||||
|
expect(complete).toBeDefined();
|
||||||
|
if (complete?.type === 'complete') {
|
||||||
|
expect(complete.cancelled).toBe(true);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
it('skips stream_event types', async () => {
|
||||||
|
const events = await collect([
|
||||||
|
{ type: 'stream_event', content: 'partial delta' },
|
||||||
|
{ type: 'assistant', content: 'reply', runId: 'run-1' },
|
||||||
|
{ type: 'result', success: true, result: 'reply', runIds: ['run-1'] },
|
||||||
|
]);
|
||||||
|
|
||||||
|
// stream_event never reaches the output -- only text + complete
|
||||||
|
expect(events.length).toBe(2);
|
||||||
|
expect(events[0].type).toBe('text');
|
||||||
|
expect(events[1].type).toBe('complete');
|
||||||
|
});
|
||||||
|
|
||||||
|
it('yields tool_result events', async () => {
|
||||||
|
const events = await collect([
|
||||||
|
{ type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: {}, runId: 'run-1' },
|
||||||
|
{ type: 'tool_result', toolCallId: 'tc-1', content: 'ok', isError: false, runId: 'run-1' },
|
||||||
|
{ type: 'assistant', content: 'done', runId: 'run-1' },
|
||||||
|
{ type: 'result', success: true, result: 'done', runIds: ['run-1'] },
|
||||||
|
]);
|
||||||
|
|
||||||
|
const toolResult = events.find(e => e.type === 'tool_result');
|
||||||
|
expect(toolResult).toBeDefined();
|
||||||
|
if (toolResult?.type === 'tool_result') {
|
||||||
|
expect(toolResult.toolCallId).toBe('tc-1');
|
||||||
|
expect(toolResult.content).toBe('ok');
|
||||||
|
expect(toolResult.isError).toBe(false);
|
||||||
|
}
|
||||||
|
});
|
||||||
|
});
|
||||||
360
src/core/display-pipeline.ts
Normal file
360
src/core/display-pipeline.ts
Normal file
@@ -0,0 +1,360 @@
|
|||||||
|
/**
|
||||||
|
* DisplayPipeline — transforms raw SDK stream events into clean,
|
||||||
|
* high-level display events for channel delivery.
|
||||||
|
*
|
||||||
|
* Encapsulates:
|
||||||
|
* - Run ID filtering (foreground tracking, rebinding)
|
||||||
|
* - Reasoning chunk accumulation (flushed on type transitions)
|
||||||
|
* - stream_event skipping
|
||||||
|
* - Type transition tracking
|
||||||
|
* - Result text selection (streamed vs result field)
|
||||||
|
* - Stale/cancelled result classification
|
||||||
|
*/
|
||||||
|
|
||||||
|
import type { StreamMsg } from './types.js';
|
||||||
|
import { createLogger } from '../logger.js';
|
||||||
|
|
||||||
|
const log = createLogger('DisplayPipeline');
|
||||||
|
|
||||||
|
// ─── Display event types ────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
export interface ReasoningEvent {
|
||||||
|
type: 'reasoning';
|
||||||
|
/** Complete accumulated reasoning block. */
|
||||||
|
content: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ToolCallEvent {
|
||||||
|
type: 'tool_call';
|
||||||
|
name: string;
|
||||||
|
args: Record<string, unknown>;
|
||||||
|
id: string;
|
||||||
|
/** The raw StreamMsg for consumers that need extra fields. */
|
||||||
|
raw: StreamMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ToolResultEvent {
|
||||||
|
type: 'tool_result';
|
||||||
|
toolCallId: string;
|
||||||
|
content: string;
|
||||||
|
isError: boolean;
|
||||||
|
raw: StreamMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface TextEvent {
|
||||||
|
type: 'text';
|
||||||
|
/** Full accumulated assistant text for this turn. */
|
||||||
|
content: string;
|
||||||
|
/** Just this chunk's addition. */
|
||||||
|
delta: string;
|
||||||
|
/** Assistant message UUID (changes on multi-turn responses). */
|
||||||
|
uuid: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface CompleteEvent {
|
||||||
|
type: 'complete';
|
||||||
|
/** Final response text (after streamed-vs-result selection). */
|
||||||
|
text: string;
|
||||||
|
success: boolean;
|
||||||
|
error?: string;
|
||||||
|
stopReason?: string;
|
||||||
|
conversationId?: string;
|
||||||
|
runIds: string[];
|
||||||
|
durationMs?: number;
|
||||||
|
/** True if this is a stale duplicate result (same run fingerprint as last time). */
|
||||||
|
stale: boolean;
|
||||||
|
/** True if this result came from a cancelled run (should be discarded + retried). */
|
||||||
|
cancelled: boolean;
|
||||||
|
/** Whether any assistant text was accumulated during streaming. */
|
||||||
|
hadStreamedText: boolean;
|
||||||
|
/** The raw StreamMsg for consumers that need extra fields. */
|
||||||
|
raw: StreamMsg;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface ErrorEvent {
|
||||||
|
type: 'error';
|
||||||
|
message: string;
|
||||||
|
stopReason?: string;
|
||||||
|
apiError?: Record<string, unknown>;
|
||||||
|
runId?: string;
|
||||||
|
}
|
||||||
|
|
||||||
|
export interface RetryEvent {
|
||||||
|
type: 'retry';
|
||||||
|
attempt: number;
|
||||||
|
maxAttempts: number;
|
||||||
|
reason: string;
|
||||||
|
delayMs?: number;
|
||||||
|
}
|
||||||
|
|
||||||
|
export type DisplayEvent =
|
||||||
|
| ReasoningEvent
|
||||||
|
| ToolCallEvent
|
||||||
|
| ToolResultEvent
|
||||||
|
| TextEvent
|
||||||
|
| CompleteEvent
|
||||||
|
| ErrorEvent
|
||||||
|
| RetryEvent;
|
||||||
|
|
||||||
|
// ─── Run fingerprinting (stale detection) ───────────────────────────────────
|
||||||
|
|
||||||
|
function classifyResult(
|
||||||
|
convKey: string,
|
||||||
|
runIds: string[],
|
||||||
|
fingerprints: Map<string, string>,
|
||||||
|
): 'fresh' | 'stale' | 'unknown' {
|
||||||
|
if (runIds.length === 0) return 'unknown';
|
||||||
|
const fingerprint = [...new Set(runIds)].sort().join(',');
|
||||||
|
const previous = fingerprints.get(convKey);
|
||||||
|
if (previous === fingerprint) {
|
||||||
|
log.warn(`Stale duplicate result detected (key=${convKey}, runIds=${fingerprint})`);
|
||||||
|
return 'stale';
|
||||||
|
}
|
||||||
|
fingerprints.set(convKey, fingerprint);
|
||||||
|
return 'fresh';
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Helpers ────────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
function extractRunIds(msg: StreamMsg): string[] {
|
||||||
|
const ids: string[] = [];
|
||||||
|
const rawId = (msg as StreamMsg & { runId?: unknown; run_id?: unknown }).runId
|
||||||
|
?? (msg as StreamMsg & { run_id?: unknown }).run_id;
|
||||||
|
if (typeof rawId === 'string' && rawId.trim()) ids.push(rawId.trim());
|
||||||
|
|
||||||
|
const rawIds = (msg as StreamMsg & { runIds?: unknown; run_ids?: unknown }).runIds
|
||||||
|
?? (msg as StreamMsg & { run_ids?: unknown }).run_ids;
|
||||||
|
if (Array.isArray(rawIds)) {
|
||||||
|
for (const id of rawIds) {
|
||||||
|
if (typeof id === 'string' && id.trim()) ids.push(id.trim());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ids.length > 0 ? [...new Set(ids)] : [];
|
||||||
|
}
|
||||||
|
|
||||||
|
// ─── Pipeline ───────────────────────────────────────────────────────────────
|
||||||
|
|
||||||
|
export interface DisplayPipelineOptions {
|
||||||
|
/** Conversation key for stale-result detection. */
|
||||||
|
convKey: string;
|
||||||
|
/** Shared fingerprint map for stale-result detection (instance-level, not module-level). */
|
||||||
|
resultFingerprints: Map<string, string>;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Wraps an SDK stream (already deduped by session-manager) and yields
|
||||||
|
* clean DisplayEvents. All run-ID filtering, reasoning accumulation,
|
||||||
|
* and result classification happens inside.
|
||||||
|
*/
|
||||||
|
export async function* createDisplayPipeline(
|
||||||
|
stream: AsyncIterable<StreamMsg>,
|
||||||
|
opts: DisplayPipelineOptions,
|
||||||
|
): AsyncGenerator<DisplayEvent> {
|
||||||
|
const { convKey, resultFingerprints } = opts;
|
||||||
|
|
||||||
|
// ── Foreground run tracking ──
|
||||||
|
let foregroundRunId: string | null = null;
|
||||||
|
let foregroundSource: string | null = null;
|
||||||
|
|
||||||
|
// ── Reasoning accumulation ──
|
||||||
|
let reasoningBuffer = '';
|
||||||
|
|
||||||
|
// ── Assistant text accumulation ──
|
||||||
|
let assistantText = '';
|
||||||
|
let lastAssistantUuid: string | null = null;
|
||||||
|
let lastSemanticType: string | null = null;
|
||||||
|
|
||||||
|
// ── All run IDs seen (for result) ──
|
||||||
|
const allRunIds = new Set<string>();
|
||||||
|
|
||||||
|
// ── Stats ──
|
||||||
|
let filteredCount = 0;
|
||||||
|
|
||||||
|
// ── Helpers ──
|
||||||
|
function* flushReasoning(): Generator<DisplayEvent> {
|
||||||
|
if (reasoningBuffer.trim()) {
|
||||||
|
yield { type: 'reasoning', content: reasoningBuffer };
|
||||||
|
reasoningBuffer = '';
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Main loop ──
|
||||||
|
for await (const msg of stream) {
|
||||||
|
const eventRunIds = extractRunIds(msg);
|
||||||
|
for (const id of eventRunIds) allRunIds.add(id);
|
||||||
|
|
||||||
|
// Skip stream_event (low-level deltas, not semantic)
|
||||||
|
if (msg.type === 'stream_event') continue;
|
||||||
|
|
||||||
|
log.trace(`raw: type=${msg.type} runIds=${eventRunIds.join(',') || 'none'} fg=${foregroundRunId || 'unlocked'}`);
|
||||||
|
|
||||||
|
// ── Run ID filtering ──
|
||||||
|
// Lock types: substantive events that prove this run is the foreground turn.
|
||||||
|
// Error/retry are excluded -- they're transient signals that could come
|
||||||
|
// from a failed run before the real foreground starts.
|
||||||
|
const isLockType = msg.type === 'reasoning' || msg.type === 'tool_call'
|
||||||
|
|| msg.type === 'tool_result' || msg.type === 'assistant' || msg.type === 'result';
|
||||||
|
|
||||||
|
if (foregroundRunId === null && eventRunIds.length > 0 && isLockType) {
|
||||||
|
// Lock foreground on the first substantive event with a run ID.
|
||||||
|
// Background Tasks use separate sessions and cannot produce events in
|
||||||
|
// this stream, so the first run-scoped event is always from the current
|
||||||
|
// turn's run. This eliminates buffering delay -- reasoning and tool calls
|
||||||
|
// display immediately instead of waiting for the first assistant event.
|
||||||
|
foregroundRunId = eventRunIds[0];
|
||||||
|
foregroundSource = msg.type;
|
||||||
|
log.info(`Foreground run locked: ${foregroundRunId} (source=${foregroundSource})`);
|
||||||
|
// Fall through to type transitions and dispatch for immediate processing.
|
||||||
|
} else if (foregroundRunId === null && eventRunIds.length > 0 && !isLockType) {
|
||||||
|
// Pre-foreground error/retry events are filtered. If passed through,
|
||||||
|
// they set lastErrorDetail in the consumer and can spuriously trigger
|
||||||
|
// approval recovery or suppress legitimate retries.
|
||||||
|
filteredCount++;
|
||||||
|
continue;
|
||||||
|
} else if (foregroundRunId && eventRunIds.length > 0 && !eventRunIds.includes(foregroundRunId)) {
|
||||||
|
// Event from a different run. Rebind on assistant events only
|
||||||
|
// (background Tasks don't produce assistant events in the foreground stream).
|
||||||
|
if (msg.type === 'assistant') {
|
||||||
|
const newRunId = eventRunIds[0];
|
||||||
|
log.info(`Foreground run rebind: ${foregroundRunId} -> ${newRunId}`);
|
||||||
|
foregroundRunId = newRunId;
|
||||||
|
foregroundSource = 'assistant';
|
||||||
|
} else {
|
||||||
|
filteredCount++;
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// ── Type transitions ──
|
||||||
|
// (stream_event is already `continue`d above, so all events here are semantic.)
|
||||||
|
if (lastSemanticType && lastSemanticType !== msg.type) {
|
||||||
|
if (lastSemanticType === 'reasoning') {
|
||||||
|
yield* flushReasoning();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
lastSemanticType = msg.type;
|
||||||
|
|
||||||
|
// ── Dispatch by type ──
|
||||||
|
switch (msg.type) {
|
||||||
|
case 'reasoning': {
|
||||||
|
reasoningBuffer += msg.content || '';
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'tool_call': {
|
||||||
|
yield {
|
||||||
|
type: 'tool_call',
|
||||||
|
name: msg.toolName || 'unknown',
|
||||||
|
args: (msg.toolInput && typeof msg.toolInput === 'object' ? msg.toolInput : {}) as Record<string, unknown>,
|
||||||
|
id: msg.toolCallId || '',
|
||||||
|
raw: msg,
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'tool_result': {
|
||||||
|
yield {
|
||||||
|
type: 'tool_result',
|
||||||
|
toolCallId: msg.toolCallId || '',
|
||||||
|
content: typeof (msg as any).content === 'string'
|
||||||
|
? (msg as any).content
|
||||||
|
: typeof (msg as any).result === 'string'
|
||||||
|
? (msg as any).result
|
||||||
|
: '',
|
||||||
|
isError: !!msg.isError,
|
||||||
|
raw: msg,
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'assistant': {
|
||||||
|
const delta = msg.content || '';
|
||||||
|
const uuid = msg.uuid || '';
|
||||||
|
lastAssistantUuid = uuid || lastAssistantUuid;
|
||||||
|
|
||||||
|
assistantText += delta;
|
||||||
|
yield {
|
||||||
|
type: 'text',
|
||||||
|
content: assistantText,
|
||||||
|
delta,
|
||||||
|
uuid: lastAssistantUuid || '',
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'result': {
|
||||||
|
// Flush any remaining reasoning
|
||||||
|
yield* flushReasoning();
|
||||||
|
|
||||||
|
const resultText = typeof msg.result === 'string' ? msg.result : '';
|
||||||
|
const streamedTrimmed = assistantText.trim();
|
||||||
|
const resultTrimmed = resultText.trim();
|
||||||
|
const runIds = extractRunIds(msg);
|
||||||
|
|
||||||
|
// Result text selection: prefer streamed text over result field
|
||||||
|
let finalText = assistantText;
|
||||||
|
if (streamedTrimmed.length > 0 && resultTrimmed !== streamedTrimmed) {
|
||||||
|
// Diverged — prefer streamed (avoid n-1 desync)
|
||||||
|
log.warn(`Result diverges from streamed (resultLen=${resultText.length}, streamLen=${assistantText.length}), preferring streamed`);
|
||||||
|
} else if (streamedTrimmed.length === 0 && msg.success !== false && !msg.error) {
|
||||||
|
// No streamed text — use result as fallback
|
||||||
|
finalText = resultText;
|
||||||
|
}
|
||||||
|
|
||||||
|
// Classify
|
||||||
|
const cancelled = (msg as any).stopReason === 'cancelled';
|
||||||
|
const staleState = classifyResult(convKey, runIds.length > 0 ? runIds : [...allRunIds], resultFingerprints);
|
||||||
|
const stale = staleState === 'stale';
|
||||||
|
|
||||||
|
if (filteredCount > 0) {
|
||||||
|
log.info(`Filtered ${filteredCount} non-foreground event(s) (key=${convKey})`);
|
||||||
|
}
|
||||||
|
|
||||||
|
yield {
|
||||||
|
type: 'complete',
|
||||||
|
text: finalText,
|
||||||
|
success: msg.success !== false,
|
||||||
|
error: typeof msg.error === 'string' ? msg.error : undefined,
|
||||||
|
stopReason: typeof (msg as any).stopReason === 'string' ? (msg as any).stopReason : undefined,
|
||||||
|
conversationId: typeof (msg as any).conversationId === 'string' ? (msg as any).conversationId : undefined,
|
||||||
|
runIds: runIds.length > 0 ? runIds : [...allRunIds],
|
||||||
|
durationMs: typeof (msg as any).durationMs === 'number' ? (msg as any).durationMs : undefined,
|
||||||
|
stale,
|
||||||
|
cancelled,
|
||||||
|
hadStreamedText: streamedTrimmed.length > 0,
|
||||||
|
raw: msg,
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'error': {
|
||||||
|
yield {
|
||||||
|
type: 'error',
|
||||||
|
message: (msg as any).message || 'unknown',
|
||||||
|
stopReason: (msg as any).stopReason,
|
||||||
|
apiError: (msg as any).apiError,
|
||||||
|
runId: (msg as any).runId,
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
case 'retry': {
|
||||||
|
yield {
|
||||||
|
type: 'retry',
|
||||||
|
attempt: (msg as any).attempt ?? 0,
|
||||||
|
maxAttempts: (msg as any).maxAttempts ?? 0,
|
||||||
|
reason: (msg as any).reason || 'unknown',
|
||||||
|
delayMs: (msg as any).delayMs,
|
||||||
|
};
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
|
||||||
|
default:
|
||||||
|
// Unhandled event types — skip
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
// Flush any trailing reasoning that wasn't followed by a type change
|
||||||
|
yield* flushReasoning();
|
||||||
|
}
|
||||||
@@ -154,6 +154,61 @@ describe('result divergence guard', () => {
|
|||||||
expect(sentTexts.some(text => text.includes('repeated CLI command failures'))).toBe(true);
|
expect(sentTexts.some(text => text.includes('repeated CLI command failures'))).toBe(true);
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('stops consuming stream and avoids retry after explicit tool-loop abort', async () => {
|
||||||
|
const bot = new LettaBot({
|
||||||
|
workingDir: workDir,
|
||||||
|
allowedTools: [],
|
||||||
|
maxToolCalls: 1,
|
||||||
|
});
|
||||||
|
|
||||||
|
const adapter = {
|
||||||
|
id: 'mock',
|
||||||
|
name: 'Mock',
|
||||||
|
start: vi.fn(async () => {}),
|
||||||
|
stop: vi.fn(async () => {}),
|
||||||
|
isRunning: vi.fn(() => true),
|
||||||
|
sendMessage: vi.fn(async (_msg: OutboundMessage) => ({ messageId: 'msg-1' })),
|
||||||
|
editMessage: vi.fn(async () => {}),
|
||||||
|
sendTypingIndicator: vi.fn(async () => {}),
|
||||||
|
stopTypingIndicator: vi.fn(async () => {}),
|
||||||
|
supportsEditing: vi.fn(() => false),
|
||||||
|
sendFile: vi.fn(async () => ({ messageId: 'file-1' })),
|
||||||
|
};
|
||||||
|
|
||||||
|
const runSession = vi.fn();
|
||||||
|
runSession.mockResolvedValueOnce({
|
||||||
|
session: { abort: vi.fn(async () => {}) },
|
||||||
|
stream: async function* () {
|
||||||
|
yield { type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: { command: 'echo hi' } };
|
||||||
|
// These trailing events should be ignored because the run was already aborted.
|
||||||
|
yield { type: 'assistant', content: 'late assistant text' };
|
||||||
|
yield { type: 'result', success: false, error: 'error', stopReason: 'cancelled', result: '' };
|
||||||
|
},
|
||||||
|
});
|
||||||
|
runSession.mockResolvedValueOnce({
|
||||||
|
session: { abort: vi.fn(async () => {}) },
|
||||||
|
stream: async function* () {
|
||||||
|
yield { type: 'assistant', content: 'retried response' };
|
||||||
|
yield { type: 'result', success: true, result: 'retried response' };
|
||||||
|
},
|
||||||
|
});
|
||||||
|
(bot as any).sessionManager.runSession = runSession;
|
||||||
|
|
||||||
|
const msg: InboundMessage = {
|
||||||
|
channel: 'discord',
|
||||||
|
chatId: 'chat-1',
|
||||||
|
userId: 'user-1',
|
||||||
|
text: 'hello',
|
||||||
|
timestamp: new Date(),
|
||||||
|
};
|
||||||
|
|
||||||
|
await (bot as any).processMessage(msg, adapter);
|
||||||
|
|
||||||
|
expect(runSession).toHaveBeenCalledTimes(1);
|
||||||
|
const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text);
|
||||||
|
expect(sentTexts).toEqual(['(Agent got stuck in a tool loop and was stopped. Try sending your message again.)']);
|
||||||
|
});
|
||||||
|
|
||||||
it('does not deliver reasoning text from error results as the response', async () => {
|
it('does not deliver reasoning text from error results as the response', async () => {
|
||||||
const bot = new LettaBot({
|
const bot = new LettaBot({
|
||||||
workingDir: workDir,
|
workingDir: workDir,
|
||||||
@@ -255,7 +310,7 @@ describe('result divergence guard', () => {
|
|||||||
expect(sentTexts).toEqual(['Before tool. ', 'After tool.']);
|
expect(sentTexts).toEqual(['Before tool. ', 'After tool.']);
|
||||||
});
|
});
|
||||||
|
|
||||||
it('buffers pre-foreground run-scoped display events and drops non-foreground buffers', async () => {
|
it('locks foreground on first event with run ID and displays immediately', async () => {
|
||||||
const bot = new LettaBot({
|
const bot = new LettaBot({
|
||||||
workingDir: workDir,
|
workingDir: workDir,
|
||||||
allowedTools: [],
|
allowedTools: [],
|
||||||
@@ -276,11 +331,14 @@ describe('result divergence guard', () => {
|
|||||||
sendFile: vi.fn(async () => ({ messageId: 'file-1' })),
|
sendFile: vi.fn(async () => ({ messageId: 'file-1' })),
|
||||||
};
|
};
|
||||||
|
|
||||||
|
// Reasoning and tool_call arrive before any assistant event. The pipeline
|
||||||
|
// locks foreground on the first event with a run ID (the reasoning event)
|
||||||
|
// and processes everything immediately -- no buffering.
|
||||||
(bot as any).sessionManager.runSession = vi.fn(async () => ({
|
(bot as any).sessionManager.runSession = vi.fn(async () => ({
|
||||||
session: { abort: vi.fn(async () => {}) },
|
session: { abort: vi.fn(async () => {}) },
|
||||||
stream: async function* () {
|
stream: async function* () {
|
||||||
yield { type: 'reasoning', content: 'background-thinking', runId: 'run-bg' };
|
yield { type: 'reasoning', content: 'pre-tool thinking', runId: 'run-tool' };
|
||||||
yield { type: 'tool_call', toolCallId: 'tc-bg', toolName: 'Bash', toolInput: { command: 'echo leak' }, runId: 'run-bg' };
|
yield { type: 'tool_call', toolCallId: 'tc-1', toolName: 'Bash', toolInput: { command: 'echo hi' }, runId: 'run-tool' };
|
||||||
yield { type: 'assistant', content: 'main reply', runId: 'run-main' };
|
yield { type: 'assistant', content: 'main reply', runId: 'run-main' };
|
||||||
yield { type: 'result', success: true, result: 'main reply', runIds: ['run-main'] };
|
yield { type: 'result', success: true, result: 'main reply', runIds: ['run-main'] };
|
||||||
},
|
},
|
||||||
@@ -297,7 +355,9 @@ describe('result divergence guard', () => {
|
|||||||
await (bot as any).processMessage(msg, adapter);
|
await (bot as any).processMessage(msg, adapter);
|
||||||
|
|
||||||
const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text);
|
const sentTexts = adapter.sendMessage.mock.calls.map(([payload]) => payload.text);
|
||||||
expect(sentTexts).toEqual(['main reply']);
|
// Reasoning display + tool call display + main reply -- all immediate, no buffering
|
||||||
|
expect(sentTexts.length).toBe(3);
|
||||||
|
expect(sentTexts[2]).toBe('main reply');
|
||||||
});
|
});
|
||||||
|
|
||||||
it('retries once when a competing result arrives before any foreground terminal result', async () => {
|
it('retries once when a competing result arrives before any foreground terminal result', async () => {
|
||||||
|
|||||||
@@ -1100,6 +1100,81 @@ describe('SDK session contract', () => {
|
|||||||
expect(sentTexts).toContain('after retry');
|
expect(sentTexts).toContain('after retry');
|
||||||
});
|
});
|
||||||
|
|
||||||
|
it('filters pre-foreground errors so they do not trigger false approval recovery', async () => {
|
||||||
|
const bot = new LettaBot({
|
||||||
|
workingDir: join(dataDir, 'working'),
|
||||||
|
allowedTools: [],
|
||||||
|
});
|
||||||
|
|
||||||
|
let runCall = 0;
|
||||||
|
(bot as any).sessionManager.runSession = vi.fn(async () => ({
|
||||||
|
session: { abort: vi.fn(async () => undefined) },
|
||||||
|
stream: async function* () {
|
||||||
|
if (runCall++ === 0) {
|
||||||
|
// Pre-foreground error is filtered by the pipeline -- it never
|
||||||
|
// reaches processMessage, so lastErrorDetail stays null and
|
||||||
|
// isApprovalConflict cannot fire.
|
||||||
|
yield {
|
||||||
|
type: 'error',
|
||||||
|
runId: 'run-bg',
|
||||||
|
message: 'CONFLICT: Cannot send a new message: waiting for approval',
|
||||||
|
stopReason: 'error',
|
||||||
|
};
|
||||||
|
yield { type: 'result', success: false, error: 'error', conversationId: 'conv-approval', runIds: ['run-main'] };
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// Retry succeeds
|
||||||
|
yield { type: 'assistant', content: 'after retry' };
|
||||||
|
yield { type: 'result', success: true, result: 'after retry', conversationId: 'conv-approval', runIds: ['run-main-2'] };
|
||||||
|
},
|
||||||
|
}));
|
||||||
|
|
||||||
|
vi.mocked(recoverOrphanedConversationApproval).mockResolvedValueOnce({
|
||||||
|
recovered: false,
|
||||||
|
details: 'No unresolved approval requests found',
|
||||||
|
});
|
||||||
|
|
||||||
|
const adapter = {
|
||||||
|
id: 'mock',
|
||||||
|
name: 'Mock',
|
||||||
|
start: vi.fn(async () => {}),
|
||||||
|
stop: vi.fn(async () => {}),
|
||||||
|
isRunning: vi.fn(() => true),
|
||||||
|
sendMessage: vi.fn(async (_payload: unknown) => ({ messageId: 'msg-1' })),
|
||||||
|
editMessage: vi.fn(async () => {}),
|
||||||
|
sendTypingIndicator: vi.fn(async () => {}),
|
||||||
|
stopTypingIndicator: vi.fn(async () => {}),
|
||||||
|
supportsEditing: vi.fn(() => false),
|
||||||
|
sendFile: vi.fn(async () => ({ messageId: 'file-1' })),
|
||||||
|
};
|
||||||
|
|
||||||
|
const msg = {
|
||||||
|
channel: 'discord',
|
||||||
|
chatId: 'chat-1',
|
||||||
|
userId: 'user-1',
|
||||||
|
text: 'hello',
|
||||||
|
timestamp: new Date(),
|
||||||
|
};
|
||||||
|
|
||||||
|
await (bot as any).processMessage(msg, adapter);
|
||||||
|
|
||||||
|
// The pre-foreground error is filtered, so lastErrorDetail is null.
|
||||||
|
// The result (success=false, nothing delivered) triggers shouldRetryForErrorResult,
|
||||||
|
// NOT isApprovalConflict. The retry goes through the error-result path with
|
||||||
|
// orphaned approval recovery, then retries and succeeds.
|
||||||
|
expect((bot as any).sessionManager.runSession).toHaveBeenCalledTimes(2);
|
||||||
|
// Approval recovery should have been attempted via the error-result path
|
||||||
|
expect(recoverOrphanedConversationApproval).toHaveBeenCalledWith(
|
||||||
|
'agent-contract-test',
|
||||||
|
'conv-approval',
|
||||||
|
);
|
||||||
|
const sentTexts = adapter.sendMessage.mock.calls.map((call) => {
|
||||||
|
const payload = call[0] as { text?: string };
|
||||||
|
return payload.text;
|
||||||
|
});
|
||||||
|
expect(sentTexts).toContain('after retry');
|
||||||
|
});
|
||||||
|
|
||||||
it('uses agent-level recovery for default conversation alias on terminal approval conflict', async () => {
|
it('uses agent-level recovery for default conversation alias on terminal approval conflict', async () => {
|
||||||
const bot = new LettaBot({
|
const bot = new LettaBot({
|
||||||
workingDir: join(dataDir, 'working'),
|
workingDir: join(dataDir, 'working'),
|
||||||
|
|||||||
@@ -542,6 +542,12 @@ export async function rejectApproval(
|
|||||||
log.warn(`Approval already resolved for tool call ${approval.toolCallId}`);
|
log.warn(`Approval already resolved for tool call ${approval.toolCallId}`);
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
// Re-throw rate limit errors so callers can bail out early instead of
|
||||||
|
// hammering the API in a tight loop.
|
||||||
|
if (err?.status === 429) {
|
||||||
|
log.error('Failed to reject approval:', e);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
log.error('Failed to reject approval:', e);
|
log.error('Failed to reject approval:', e);
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user