perf: reuse SDK session subprocess across messages (#289)
This commit is contained in:
162
src/core/bot.ts
162
src/core/bot.ts
@@ -130,6 +130,18 @@ export class LettaBot implements AgentSession {
|
||||
|
||||
// AskUserQuestion support: resolves when the next user message arrives
|
||||
private pendingQuestionResolver: ((text: string) => void) | null = null;
|
||||
|
||||
// Persistent session: reuse a single CLI subprocess across messages
|
||||
private persistentSession: Session | null = null;
|
||||
private currentCanUseTool: CanUseToolCallback | undefined;
|
||||
// Stable callback wrapper so the Session options never change, but we can
|
||||
// swap out the per-message handler before each send().
|
||||
private readonly sessionCanUseTool: CanUseToolCallback = async (toolName, toolInput) => {
|
||||
if (this.currentCanUseTool) {
|
||||
return this.currentCanUseTool(toolName, toolInput);
|
||||
}
|
||||
return { behavior: 'allow' as const };
|
||||
};
|
||||
|
||||
constructor(config: BotConfig) {
|
||||
this.config = config;
|
||||
@@ -292,51 +304,79 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create or resume a session with automatic fallback.
|
||||
*
|
||||
* Priority: conversationId → agentId (default conv) → createAgent
|
||||
* If resume fails (conversation missing), falls back to createSession.
|
||||
* Return the persistent session, creating and initializing it if needed.
|
||||
* The subprocess stays alive across messages -- only recreated on failure.
|
||||
*/
|
||||
private async getSession(canUseTool?: CanUseToolCallback): Promise<Session> {
|
||||
const opts = this.baseSessionOptions(canUseTool);
|
||||
private async ensureSession(): Promise<Session> {
|
||||
if (this.persistentSession) {
|
||||
return this.persistentSession;
|
||||
}
|
||||
|
||||
const opts = this.baseSessionOptions(this.sessionCanUseTool);
|
||||
let session: Session;
|
||||
|
||||
if (this.store.conversationId) {
|
||||
process.env.LETTA_AGENT_ID = this.store.agentId || undefined;
|
||||
return resumeSession(this.store.conversationId, opts);
|
||||
}
|
||||
if (this.store.agentId) {
|
||||
session = resumeSession(this.store.conversationId, opts);
|
||||
} else if (this.store.agentId) {
|
||||
process.env.LETTA_AGENT_ID = this.store.agentId;
|
||||
// Create a new conversation instead of resuming the default.
|
||||
// This handles the case where the default conversation was deleted
|
||||
// or never created (e.g., after migrations).
|
||||
return createSession(this.store.agentId, opts);
|
||||
session = createSession(this.store.agentId, opts);
|
||||
} else {
|
||||
// Create new agent -- persist immediately so we don't orphan it on later failures
|
||||
console.log('[Bot] Creating new agent');
|
||||
const newAgentId = await createAgent({
|
||||
systemPrompt: SYSTEM_PROMPT,
|
||||
memory: loadMemoryBlocks(this.config.agentName),
|
||||
});
|
||||
const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com';
|
||||
this.store.setAgent(newAgentId, currentBaseUrl);
|
||||
console.log('[Bot] Saved new agent ID:', newAgentId);
|
||||
|
||||
if (this.config.agentName) {
|
||||
updateAgentName(newAgentId, this.config.agentName).catch(() => {});
|
||||
}
|
||||
installSkillsToAgent(newAgentId, this.config.skills);
|
||||
|
||||
session = createSession(newAgentId, opts);
|
||||
}
|
||||
|
||||
// Create new agent -- persist immediately so we don't orphan it on later failures
|
||||
console.log('[Bot] Creating new agent');
|
||||
const newAgentId = await createAgent({
|
||||
systemPrompt: SYSTEM_PROMPT,
|
||||
memory: loadMemoryBlocks(this.config.agentName),
|
||||
});
|
||||
const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com';
|
||||
this.store.setAgent(newAgentId, currentBaseUrl);
|
||||
console.log('[Bot] Saved new agent ID:', newAgentId);
|
||||
// Initialize eagerly so the subprocess is ready before the first send()
|
||||
console.log('[Bot] Initializing session subprocess...');
|
||||
await session.initialize();
|
||||
console.log('[Bot] Session subprocess ready');
|
||||
this.persistentSession = session;
|
||||
return session;
|
||||
}
|
||||
|
||||
// First-run setup: name and skills
|
||||
if (this.config.agentName) {
|
||||
updateAgentName(newAgentId, this.config.agentName).catch(() => {});
|
||||
/**
|
||||
* Destroy the persistent session so the next ensureSession() spawns a fresh one.
|
||||
*/
|
||||
private invalidateSession(): void {
|
||||
if (this.persistentSession) {
|
||||
console.log('[Bot] Invalidating persistent session');
|
||||
this.persistentSession.close();
|
||||
this.persistentSession = null;
|
||||
}
|
||||
installSkillsToAgent(newAgentId, this.config.skills);
|
||||
}
|
||||
|
||||
return createSession(newAgentId, opts);
|
||||
/**
|
||||
* Pre-warm the session subprocess at startup. Call after config/agent is loaded.
|
||||
*/
|
||||
async warmSession(): Promise<void> {
|
||||
if (!this.store.agentId && !this.store.conversationId) return;
|
||||
try {
|
||||
await this.ensureSession();
|
||||
} catch (err) {
|
||||
console.warn('[Bot] Session pre-warm failed:', err instanceof Error ? err.message : err);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Persist conversation ID after a successful session result.
|
||||
* Agent ID and first-run setup are handled eagerly in getSession().
|
||||
* Agent ID and first-run setup are handled eagerly in ensureSession().
|
||||
*/
|
||||
private persistSessionState(session: Session): void {
|
||||
// Agent ID already persisted in getSession() on creation.
|
||||
// Agent ID already persisted in ensureSession() on creation.
|
||||
// Here we only update if the server returned a different one (shouldn't happen).
|
||||
if (session.agentId && session.agentId !== this.store.agentId) {
|
||||
const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com';
|
||||
@@ -352,13 +392,11 @@ export class LettaBot implements AgentSession {
|
||||
* Send a message and return a deduplicated stream.
|
||||
*
|
||||
* Handles:
|
||||
* - Session creation with fallback chain
|
||||
* - Persistent session reuse (subprocess stays alive across messages)
|
||||
* - CONFLICT recovery from orphaned approvals (retry once)
|
||||
* - Conversation-not-found fallback (create new conversation)
|
||||
* - Tool call deduplication
|
||||
* - Session persistence after result
|
||||
*
|
||||
* Caller is responsible for consuming the stream and closing the session.
|
||||
*/
|
||||
private async runSession(
|
||||
message: SendMessage,
|
||||
@@ -366,7 +404,10 @@ export class LettaBot implements AgentSession {
|
||||
): Promise<{ session: Session; stream: () => AsyncGenerator<StreamMsg> }> {
|
||||
const { retried = false, canUseTool } = options;
|
||||
|
||||
let session = await this.getSession(canUseTool);
|
||||
// Update the per-message callback before sending
|
||||
this.currentCanUseTool = canUseTool;
|
||||
|
||||
let session = await this.ensureSession();
|
||||
|
||||
// Send message with fallback chain
|
||||
try {
|
||||
@@ -375,7 +416,7 @@ export class LettaBot implements AgentSession {
|
||||
// 409 CONFLICT from orphaned approval
|
||||
if (!retried && isApprovalConflictError(error) && this.store.agentId && this.store.conversationId) {
|
||||
console.log('[Bot] CONFLICT detected - attempting orphaned approval recovery...');
|
||||
session.close();
|
||||
this.invalidateSession();
|
||||
const result = await recoverOrphanedConversationApproval(
|
||||
this.store.agentId,
|
||||
this.store.conversationId
|
||||
@@ -393,10 +434,12 @@ export class LettaBot implements AgentSession {
|
||||
// on auth, network, or protocol errors (which would just fail again).
|
||||
if (this.store.agentId && isConversationMissingError(error)) {
|
||||
console.warn('[Bot] Conversation not found, creating a new conversation...');
|
||||
session.close();
|
||||
session = createSession(this.store.agentId, this.baseSessionOptions(canUseTool));
|
||||
this.invalidateSession();
|
||||
session = await this.ensureSession();
|
||||
await session.send(message);
|
||||
} else {
|
||||
// Unknown error -- invalidate so we get a fresh subprocess next time
|
||||
this.invalidateSession();
|
||||
throw error;
|
||||
}
|
||||
}
|
||||
@@ -513,6 +556,7 @@ export class LettaBot implements AgentSession {
|
||||
const oldConversationId = this.store.conversationId;
|
||||
this.store.conversationId = null;
|
||||
this.store.resetRecoveryAttempts();
|
||||
this.invalidateSession(); // Subprocess has old conversation baked in
|
||||
console.log(`[Command] /reset - conversation cleared (was: ${oldConversationId})`);
|
||||
return 'Conversation reset. Send a message to start a new conversation. (Agent memory is preserved.)';
|
||||
}
|
||||
@@ -672,6 +716,11 @@ export class LettaBot implements AgentSession {
|
||||
|
||||
private async processMessage(msg: InboundMessage, adapter: ChannelAdapter, retried = false): Promise<void> {
|
||||
// Track timing and last target
|
||||
const debugTiming = !!process.env.LETTABOT_DEBUG_TIMING;
|
||||
const t0 = debugTiming ? performance.now() : 0;
|
||||
const lap = (label: string) => {
|
||||
if (debugTiming) console.log(`[Timing] ${label}: ${(performance.now() - t0).toFixed(0)}ms`);
|
||||
};
|
||||
this.lastUserMessageTime = new Date();
|
||||
|
||||
// Skip heartbeat target update for listening mode (don't redirect heartbeats)
|
||||
@@ -684,13 +733,20 @@ export class LettaBot implements AgentSession {
|
||||
};
|
||||
}
|
||||
|
||||
// Skip typing indicator for listening mode
|
||||
// Fire-and-forget typing indicator so session creation starts immediately
|
||||
if (!msg.isListeningMode) {
|
||||
await adapter.sendTypingIndicator(msg.chatId);
|
||||
adapter.sendTypingIndicator(msg.chatId).catch(() => {});
|
||||
}
|
||||
lap('typing indicator');
|
||||
|
||||
// Pre-send approval recovery
|
||||
const recovery = await this.attemptRecovery();
|
||||
// Only run proactive recovery when previous failures were detected.
|
||||
// Clean-path messages skip straight to session creation (the 409 retry
|
||||
// in runSession() still catches stuck states reactively).
|
||||
const recovery = this.store.recoveryAttempts > 0
|
||||
? await this.attemptRecovery()
|
||||
: { recovered: false, shouldReset: false };
|
||||
lap('recovery check');
|
||||
if (recovery.shouldReset) {
|
||||
if (!msg.isListeningMode) {
|
||||
await adapter.sendMessage({
|
||||
@@ -714,6 +770,7 @@ export class LettaBot implements AgentSession {
|
||||
? formatGroupBatchEnvelope(msg.batchedMessages, {}, msg.isListeningMode)
|
||||
: formatMessageEnvelope(msg, {}, sessionContext);
|
||||
const messageToSend = await buildMultimodalMessage(formattedText, msg);
|
||||
lap('format message');
|
||||
|
||||
// Build AskUserQuestion-aware canUseTool callback with channel context.
|
||||
// In bypassPermissions mode, this callback is only invoked for interactive
|
||||
@@ -754,11 +811,12 @@ export class LettaBot implements AgentSession {
|
||||
let session: Session | null = null;
|
||||
try {
|
||||
const run = await this.runSession(messageToSend, { retried, canUseTool });
|
||||
lap('session send');
|
||||
session = run.session;
|
||||
|
||||
// Stream response with delivery
|
||||
let response = '';
|
||||
let lastUpdate = Date.now();
|
||||
let lastUpdate = 0; // Start at 0 so the first streaming edit fires immediately
|
||||
let messageId: string | null = null;
|
||||
let lastMsgType: string | null = null;
|
||||
let lastAssistantUuid: string | null = null;
|
||||
@@ -807,7 +865,9 @@ export class LettaBot implements AgentSession {
|
||||
}, 4000);
|
||||
|
||||
try {
|
||||
let firstChunkLogged = false;
|
||||
for await (const streamMsg of run.stream()) {
|
||||
if (!firstChunkLogged) { lap('first stream chunk'); firstChunkLogged = true; }
|
||||
receivedAnyData = true;
|
||||
msgTypeCounts[streamMsg.type] = (msgTypeCounts[streamMsg.type] || 0) + 1;
|
||||
|
||||
@@ -930,7 +990,7 @@ export class LettaBot implements AgentSession {
|
||||
if (!retried && this.store.agentId && this.store.conversationId) {
|
||||
const reason = shouldRetryForErrorResult ? 'error result' : 'empty result';
|
||||
console.log(`[Bot] ${reason} - attempting orphaned approval recovery...`);
|
||||
session.close();
|
||||
this.invalidateSession();
|
||||
session = null;
|
||||
clearInterval(typingInterval);
|
||||
const convResult = await recoverOrphanedConversationApproval(
|
||||
@@ -965,6 +1025,7 @@ export class LettaBot implements AgentSession {
|
||||
clearInterval(typingInterval);
|
||||
adapter.stopTypingIndicator?.(msg.chatId)?.catch(() => {});
|
||||
}
|
||||
lap('stream complete');
|
||||
|
||||
// Handle no-reply marker
|
||||
if (response.trim() === '<no-reply/>') {
|
||||
@@ -992,6 +1053,7 @@ export class LettaBot implements AgentSession {
|
||||
return;
|
||||
}
|
||||
|
||||
lap('directives done');
|
||||
// Send final response
|
||||
if (response.trim()) {
|
||||
const prefixedFinal = this.prefixResponse(response);
|
||||
@@ -1015,6 +1077,7 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
}
|
||||
|
||||
lap('message delivered');
|
||||
// Handle no response
|
||||
if (!sentAnyMessage) {
|
||||
if (!receivedAnyData) {
|
||||
@@ -1051,7 +1114,7 @@ export class LettaBot implements AgentSession {
|
||||
console.error('[Bot] Failed to send error message to channel:', sendError);
|
||||
}
|
||||
} finally {
|
||||
session?.close();
|
||||
// Session stays alive for reuse -- only invalidated on errors
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1071,7 +1134,7 @@ export class LettaBot implements AgentSession {
|
||||
this.processing = true;
|
||||
|
||||
try {
|
||||
const { session, stream } = await this.runSession(text);
|
||||
const { stream } = await this.runSession(text);
|
||||
|
||||
try {
|
||||
let response = '';
|
||||
@@ -1092,8 +1155,10 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
}
|
||||
return response;
|
||||
} finally {
|
||||
session.close();
|
||||
} catch (error) {
|
||||
// Invalidate on stream errors so next call gets a fresh subprocess
|
||||
this.invalidateSession();
|
||||
throw error;
|
||||
}
|
||||
} finally {
|
||||
this.processing = false;
|
||||
@@ -1117,12 +1182,13 @@ export class LettaBot implements AgentSession {
|
||||
this.processing = true;
|
||||
|
||||
try {
|
||||
const { session, stream } = await this.runSession(text);
|
||||
const { stream } = await this.runSession(text);
|
||||
|
||||
try {
|
||||
yield* stream();
|
||||
} finally {
|
||||
session.close();
|
||||
} catch (error) {
|
||||
this.invalidateSession();
|
||||
throw error;
|
||||
}
|
||||
} finally {
|
||||
this.processing = false;
|
||||
|
||||
@@ -585,6 +585,9 @@ async function main() {
|
||||
services.groupBatchers.push(batcher);
|
||||
}
|
||||
|
||||
// Pre-warm the SDK session subprocess so the first message doesn't pay startup cost
|
||||
bot.warmSession().catch(() => {});
|
||||
|
||||
// Per-agent cron
|
||||
if (agentConfig.features?.cron ?? globalConfig.cronEnabled) {
|
||||
const cronService = new CronService(bot);
|
||||
|
||||
Reference in New Issue
Block a user