feat(core): proactive approval detection via SDK bootstrapState() (#383)
This commit is contained in:
@@ -277,6 +277,9 @@ export class LettaBot implements AgentSession {
|
||||
// In shared mode, only the "shared" key is used. In per-channel mode, each
|
||||
// channel (and optionally heartbeat) gets its own subprocess.
|
||||
private sessions: Map<string, Session> = new Map();
|
||||
// Coalesces concurrent ensureSessionForKey calls for the same key so the
|
||||
// second caller waits for the first instead of creating a duplicate session.
|
||||
private sessionCreationLocks: Map<string, Promise<Session>> = new Map();
|
||||
private currentCanUseTool: CanUseToolCallback | undefined;
|
||||
private conversationOverrides: Set<string> = new Set();
|
||||
// Stable callback wrapper so the Session options never change, but we can
|
||||
@@ -805,8 +808,33 @@ export class LettaBot implements AgentSession {
|
||||
/**
|
||||
* Return the persistent session for the given conversation key,
|
||||
* creating and initializing it if needed.
|
||||
*
|
||||
* After initialization, calls bootstrapState() to detect pending approvals.
|
||||
* If an orphaned approval is found, recovers proactively before returning
|
||||
* the session -- preventing the first send() from hitting a 409 CONFLICT.
|
||||
*/
|
||||
private async ensureSessionForKey(key: string): Promise<Session> {
|
||||
private async ensureSessionForKey(key: string, bootstrapRetried = false): Promise<Session> {
|
||||
// Fast path: session already exists
|
||||
const existing = this.sessions.get(key);
|
||||
if (existing) return existing;
|
||||
|
||||
// Coalesce concurrent callers: if another call is already creating this
|
||||
// key (e.g. warmSession running while first message arrives), wait for
|
||||
// it instead of creating a duplicate session.
|
||||
const pending = this.sessionCreationLocks.get(key);
|
||||
if (pending) return pending;
|
||||
|
||||
const promise = this._createSessionForKey(key, bootstrapRetried);
|
||||
this.sessionCreationLocks.set(key, promise);
|
||||
try {
|
||||
return await promise;
|
||||
} finally {
|
||||
this.sessionCreationLocks.delete(key);
|
||||
}
|
||||
}
|
||||
|
||||
/** Internal session creation -- called via ensureSessionForKey's lock. */
|
||||
private async _createSessionForKey(key: string, bootstrapRetried: boolean): Promise<Session> {
|
||||
// Re-read the store file from disk so we pick up agent/conversation ID
|
||||
// changes made by other processes (e.g. after a restart or container deploy).
|
||||
// This costs one synchronous disk read per incoming message, which is fine
|
||||
@@ -814,9 +842,6 @@ export class LettaBot implements AgentSession {
|
||||
// refresh at most once per second.
|
||||
this.store.refresh();
|
||||
|
||||
const existing = this.sessions.get(key);
|
||||
if (existing) return existing;
|
||||
|
||||
const opts = this.baseSessionOptions(this.sessionCanUseTool);
|
||||
let session: Session;
|
||||
|
||||
@@ -857,13 +882,52 @@ export class LettaBot implements AgentSession {
|
||||
try {
|
||||
await this.withSessionTimeout(session.initialize(), `Session initialize (key=${key})`);
|
||||
log.info(`Session subprocess ready (key=${key})`);
|
||||
this.sessions.set(key, session);
|
||||
return session;
|
||||
} catch (error) {
|
||||
// Close immediately so failed initialization cannot leak a subprocess.
|
||||
session.close();
|
||||
throw error;
|
||||
}
|
||||
|
||||
// Proactive approval detection via bootstrapState().
|
||||
// Single CLI round-trip that returns hasPendingApproval flag alongside
|
||||
// session metadata. If an orphaned approval is stuck, recover now so the
|
||||
// first send() doesn't hit a 409 CONFLICT.
|
||||
if (!bootstrapRetried && this.store.agentId) {
|
||||
try {
|
||||
const bootstrap = await this.withSessionTimeout(
|
||||
session.bootstrapState(),
|
||||
`Session bootstrapState (key=${key})`,
|
||||
);
|
||||
if (bootstrap.hasPendingApproval) {
|
||||
const convId = bootstrap.conversationId || session.conversationId;
|
||||
log.warn(`Pending approval detected at session startup (key=${key}, conv=${convId}), recovering...`);
|
||||
session.close();
|
||||
if (convId) {
|
||||
const result = await recoverOrphanedConversationApproval(
|
||||
this.store.agentId,
|
||||
convId,
|
||||
true, /* deepScan */
|
||||
);
|
||||
if (result.recovered) {
|
||||
log.info(`Proactive approval recovery succeeded: ${result.details}`);
|
||||
} else {
|
||||
log.warn(`Proactive approval recovery did not find resolvable approvals: ${result.details}`);
|
||||
}
|
||||
}
|
||||
// Recreate session after recovery (conversation state changed).
|
||||
// Call _createSessionForKey directly (not ensureSessionForKey) since
|
||||
// we're already inside the creation lock for this key.
|
||||
return this._createSessionForKey(key, true);
|
||||
}
|
||||
} catch (err) {
|
||||
// bootstrapState failure is non-fatal -- the session is still usable.
|
||||
// The reactive 409 handler in runSession() will catch stuck approvals.
|
||||
log.warn(`bootstrapState check failed (key=${key}), continuing:`, err instanceof Error ? err.message : err);
|
||||
}
|
||||
}
|
||||
|
||||
this.sessions.set(key, session);
|
||||
return session;
|
||||
}
|
||||
|
||||
/** Legacy convenience: resolve key from shared/per-channel mode and delegate. */
|
||||
@@ -1374,10 +1438,10 @@ export class LettaBot implements AgentSession {
|
||||
}
|
||||
lap('typing indicator');
|
||||
|
||||
// Pre-send approval recovery
|
||||
// Only run proactive recovery when previous failures were detected.
|
||||
// Clean-path messages skip straight to session creation (the 409 retry
|
||||
// in runSession() still catches stuck states reactively).
|
||||
// Pre-send approval recovery (secondary defense).
|
||||
// Primary detection is now in ensureSessionForKey() via bootstrapState().
|
||||
// This fallback only fires when previous failures incremented recoveryAttempts,
|
||||
// covering edge cases where a cached session encounters a new stuck approval.
|
||||
const recovery = this.store.recoveryAttempts > 0
|
||||
? await this.attemptRecovery()
|
||||
: { recovered: false, shouldReset: false };
|
||||
|
||||
@@ -714,8 +714,13 @@ export async function recoverOrphanedConversationApproval(
|
||||
// No client is going to approve them -- reject and cancel so
|
||||
// lettabot can proceed.
|
||||
const isStuckApproval = status === 'running' && stopReason === 'requires_approval';
|
||||
// Letta Cloud uses status "created" with no stop_reason for runs
|
||||
// that paused on requires_approval but haven't been resumed yet.
|
||||
// If we found unresolved approval_request_messages for this run,
|
||||
// it's stuck -- treat it the same as a running/requires_approval.
|
||||
const isCreatedWithApproval = status === 'created';
|
||||
|
||||
if (isTerminated || isAbandonedApproval || isStuckApproval) {
|
||||
if (isTerminated || isAbandonedApproval || isStuckApproval || isCreatedWithApproval) {
|
||||
log.info(`Found ${approvals.length} blocking approval(s) from ${status}/${stopReason} run ${runId}`);
|
||||
|
||||
// Send denial for each unresolved tool call
|
||||
|
||||
Reference in New Issue
Block a user