fix(core): harden session lifecycle around init/send failures (#333)
This commit is contained in:
@@ -221,6 +221,32 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
}
|
||||
|
||||
private getSessionTimeoutMs(): number {
|
||||
const envTimeoutMs = Number(process.env.LETTA_SESSION_TIMEOUT_MS);
|
||||
if (Number.isFinite(envTimeoutMs) && envTimeoutMs > 0) {
|
||||
return envTimeoutMs;
|
||||
}
|
||||
return 60000;
|
||||
}
|
||||
|
||||
private async withSessionTimeout<T>(
|
||||
promise: Promise<T>,
|
||||
label: string,
|
||||
): Promise<T> {
|
||||
const timeoutMs = this.getSessionTimeoutMs();
|
||||
let timeoutId: ReturnType<typeof setTimeout> | undefined;
|
||||
const timeoutPromise = new Promise<T>((_, reject) => {
|
||||
timeoutId = setTimeout(() => {
|
||||
reject(new Error(`${label} timed out after ${timeoutMs}ms`));
|
||||
}, timeoutMs);
|
||||
});
|
||||
try {
|
||||
return await Promise.race([promise, timeoutPromise]);
|
||||
} finally {
|
||||
if (timeoutId) clearTimeout(timeoutId);
|
||||
}
|
||||
}
|
||||
|
||||
private baseSessionOptions(canUseTool?: CanUseToolCallback) {
|
||||
return {
|
||||
permissionMode: 'bypassPermissions' as const,
|
||||
@@ -387,10 +413,16 @@ export class LettaBot implements AgentSession {
|
||||
|
||||
// Initialize eagerly so the subprocess is ready before the first send()
|
||||
console.log(`[Bot] Initializing session subprocess (key=${key})...`);
|
||||
await session.initialize();
|
||||
console.log(`[Bot] Session subprocess ready (key=${key})`);
|
||||
this.sessions.set(key, session);
|
||||
return session;
|
||||
try {
|
||||
await this.withSessionTimeout(session.initialize(), `Session initialize (key=${key})`);
|
||||
console.log(`[Bot] Session subprocess ready (key=${key})`);
|
||||
this.sessions.set(key, session);
|
||||
return session;
|
||||
} catch (error) {
|
||||
// Close immediately so failed initialization cannot leak a subprocess.
|
||||
session.close();
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
|
||||
/** Legacy convenience: resolve key from shared/per-channel mode and delegate. */
|
||||
@@ -489,7 +521,7 @@ export class LettaBot implements AgentSession {
|
||||
|
||||
// Send message with fallback chain
|
||||
try {
|
||||
await session.send(message);
|
||||
await this.withSessionTimeout(session.send(message), `Session send (key=${convKey})`);
|
||||
} catch (error) {
|
||||
// 409 CONFLICT from orphaned approval
|
||||
if (!retried && isApprovalConflictError(error) && this.store.agentId && convId) {
|
||||
@@ -519,7 +551,12 @@ export class LettaBot implements AgentSession {
|
||||
this.store.conversationId = null;
|
||||
}
|
||||
session = await this.ensureSessionForKey(convKey);
|
||||
await session.send(message);
|
||||
try {
|
||||
await this.withSessionTimeout(session.send(message), `Session send retry (key=${convKey})`);
|
||||
} catch (retryError) {
|
||||
this.invalidateSession(convKey);
|
||||
throw retryError;
|
||||
}
|
||||
} else {
|
||||
// Unknown error -- invalidate so we get a fresh subprocess next time
|
||||
this.invalidateSession(convKey);
|
||||
@@ -546,11 +583,13 @@ export class LettaBot implements AgentSession {
|
||||
if (id) seenToolCallIds.add(id);
|
||||
}
|
||||
|
||||
yield msg;
|
||||
|
||||
// Persist state on result
|
||||
if (msg.type === 'result') {
|
||||
self.persistSessionState(session, capturedConvKey);
|
||||
}
|
||||
|
||||
yield msg;
|
||||
|
||||
if (msg.type === 'result') {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
@@ -19,16 +19,19 @@ describe('SDK session contract', () => {
|
||||
let originalDataDir: string | undefined;
|
||||
let originalAgentId: string | undefined;
|
||||
let originalRailwayMount: string | undefined;
|
||||
let originalSessionTimeout: string | undefined;
|
||||
|
||||
beforeEach(() => {
|
||||
dataDir = mkdtempSync(join(tmpdir(), 'lettabot-sdk-contract-'));
|
||||
originalDataDir = process.env.DATA_DIR;
|
||||
originalAgentId = process.env.LETTA_AGENT_ID;
|
||||
originalRailwayMount = process.env.RAILWAY_VOLUME_MOUNT_PATH;
|
||||
originalSessionTimeout = process.env.LETTA_SESSION_TIMEOUT_MS;
|
||||
|
||||
process.env.DATA_DIR = dataDir;
|
||||
process.env.LETTA_AGENT_ID = 'agent-contract-test';
|
||||
delete process.env.RAILWAY_VOLUME_MOUNT_PATH;
|
||||
delete process.env.LETTA_SESSION_TIMEOUT_MS;
|
||||
|
||||
vi.clearAllMocks();
|
||||
});
|
||||
@@ -43,6 +46,9 @@ describe('SDK session contract', () => {
|
||||
if (originalRailwayMount === undefined) delete process.env.RAILWAY_VOLUME_MOUNT_PATH;
|
||||
else process.env.RAILWAY_VOLUME_MOUNT_PATH = originalRailwayMount;
|
||||
|
||||
if (originalSessionTimeout === undefined) delete process.env.LETTA_SESSION_TIMEOUT_MS;
|
||||
else process.env.LETTA_SESSION_TIMEOUT_MS = originalSessionTimeout;
|
||||
|
||||
rmSync(dataDir, { recursive: true, force: true });
|
||||
});
|
||||
|
||||
@@ -80,4 +86,82 @@ describe('SDK session contract', () => {
|
||||
expect(mockSession.send).toHaveBeenNthCalledWith(2, 'second message');
|
||||
expect(mockSession.stream).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
|
||||
it('closes session if initialize times out before first send', async () => {
|
||||
process.env.LETTA_SESSION_TIMEOUT_MS = '5';
|
||||
|
||||
const mockSession = {
|
||||
initialize: vi.fn(() => new Promise<never>(() => {})),
|
||||
send: vi.fn(async (_message: unknown) => undefined),
|
||||
stream: vi.fn(() =>
|
||||
(async function* () {
|
||||
yield { type: 'result', success: true };
|
||||
})()
|
||||
),
|
||||
close: vi.fn(() => undefined),
|
||||
agentId: 'agent-contract-test',
|
||||
conversationId: 'conversation-contract-test',
|
||||
};
|
||||
|
||||
vi.mocked(createSession).mockReturnValue(mockSession as never);
|
||||
vi.mocked(resumeSession).mockReturnValue(mockSession as never);
|
||||
|
||||
const bot = new LettaBot({
|
||||
workingDir: join(dataDir, 'working'),
|
||||
allowedTools: [],
|
||||
});
|
||||
|
||||
await expect(bot.sendToAgent('will timeout')).rejects.toThrow('Session initialize (key=shared) timed out after 5ms');
|
||||
expect(mockSession.close).toHaveBeenCalledTimes(1);
|
||||
});
|
||||
|
||||
it('invalidates retry session when fallback send fails after conversation-missing error', async () => {
|
||||
const missingConversation = new Error('conversation not found');
|
||||
const retryFailure = new Error('network down');
|
||||
|
||||
const firstSession = {
|
||||
initialize: vi.fn(async () => undefined),
|
||||
send: vi.fn(async () => {
|
||||
throw missingConversation;
|
||||
}),
|
||||
stream: vi.fn(() =>
|
||||
(async function* () {
|
||||
yield { type: 'result', success: true };
|
||||
})()
|
||||
),
|
||||
close: vi.fn(() => undefined),
|
||||
agentId: 'agent-contract-test',
|
||||
conversationId: 'conversation-contract-test-1',
|
||||
};
|
||||
|
||||
const secondSession = {
|
||||
initialize: vi.fn(async () => undefined),
|
||||
send: vi.fn(async () => {
|
||||
throw retryFailure;
|
||||
}),
|
||||
stream: vi.fn(() =>
|
||||
(async function* () {
|
||||
yield { type: 'result', success: true };
|
||||
})()
|
||||
),
|
||||
close: vi.fn(() => undefined),
|
||||
agentId: 'agent-contract-test',
|
||||
conversationId: 'conversation-contract-test-2',
|
||||
};
|
||||
|
||||
vi.mocked(createSession)
|
||||
.mockReturnValueOnce(firstSession as never)
|
||||
.mockReturnValueOnce(secondSession as never);
|
||||
vi.mocked(resumeSession).mockReturnValue(firstSession as never);
|
||||
|
||||
const bot = new LettaBot({
|
||||
workingDir: join(dataDir, 'working'),
|
||||
allowedTools: [],
|
||||
});
|
||||
|
||||
await expect(bot.sendToAgent('trigger fallback')).rejects.toThrow('network down');
|
||||
expect(firstSession.close).toHaveBeenCalledTimes(1);
|
||||
expect(secondSession.close).toHaveBeenCalledTimes(1);
|
||||
expect(vi.mocked(createSession)).toHaveBeenCalledTimes(2);
|
||||
});
|
||||
});
|
||||
|
||||
Reference in New Issue
Block a user