|
|
|
|
@@ -130,13 +130,16 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
private groupIntervals: Map<string, number> = new Map();
|
|
|
|
|
private instantGroupIds: Set<string> = new Set();
|
|
|
|
|
private listeningGroupIds: Set<string> = new Set();
|
|
|
|
|
private processing = false;
|
|
|
|
|
private processing = false; // Global lock for shared mode
|
|
|
|
|
private processingKeys: Set<string> = new Set(); // Per-key locks for per-channel mode
|
|
|
|
|
|
|
|
|
|
// AskUserQuestion support: resolves when the next user message arrives
|
|
|
|
|
private pendingQuestionResolver: ((text: string) => void) | null = null;
|
|
|
|
|
|
|
|
|
|
// Persistent session: reuse a single CLI subprocess across messages
|
|
|
|
|
private persistentSession: Session | null = null;
|
|
|
|
|
// Persistent sessions: reuse CLI subprocesses across messages.
|
|
|
|
|
// In shared mode, only the "shared" key is used. In per-channel mode, each
|
|
|
|
|
// channel (and optionally heartbeat) gets its own subprocess.
|
|
|
|
|
private sessions: Map<string, Session> = new Map();
|
|
|
|
|
private currentCanUseTool: CanUseToolCallback | undefined;
|
|
|
|
|
// Stable callback wrapper so the Session options never change, but we can
|
|
|
|
|
// swap out the per-message handler before each send().
|
|
|
|
|
@@ -307,21 +310,59 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
return acted;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// =========================================================================
|
|
|
|
|
// Conversation key resolution
|
|
|
|
|
// =========================================================================
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Return the persistent session, creating and initializing it if needed.
|
|
|
|
|
* The subprocess stays alive across messages -- only recreated on failure.
|
|
|
|
|
* Resolve the conversation key for a channel message.
|
|
|
|
|
* In shared mode returns "shared"; in per-channel mode returns the channel id.
|
|
|
|
|
*/
|
|
|
|
|
private async ensureSession(): Promise<Session> {
|
|
|
|
|
if (this.persistentSession) {
|
|
|
|
|
return this.persistentSession;
|
|
|
|
|
private resolveConversationKey(channel: string): string {
|
|
|
|
|
return this.config.conversationMode === 'per-channel' ? channel : 'shared';
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Resolve the conversation key for heartbeat/sendToAgent.
|
|
|
|
|
*/
|
|
|
|
|
private resolveHeartbeatConversationKey(): string {
|
|
|
|
|
if (this.config.conversationMode !== 'per-channel') return 'shared';
|
|
|
|
|
|
|
|
|
|
const hb = this.config.heartbeatConversation || 'last-active';
|
|
|
|
|
if (hb === 'dedicated') return 'heartbeat';
|
|
|
|
|
if (hb === 'last-active') {
|
|
|
|
|
// Use the last channel the user messaged on
|
|
|
|
|
const target = this.store.lastMessageTarget;
|
|
|
|
|
return target ? target.channel : 'shared';
|
|
|
|
|
}
|
|
|
|
|
// Explicit channel name (e.g., "telegram")
|
|
|
|
|
return hb;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// =========================================================================
|
|
|
|
|
// Session lifecycle (per-key)
|
|
|
|
|
// =========================================================================
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Return the persistent session for the given conversation key,
|
|
|
|
|
* creating and initializing it if needed.
|
|
|
|
|
*/
|
|
|
|
|
private async ensureSessionForKey(key: string): Promise<Session> {
|
|
|
|
|
const existing = this.sessions.get(key);
|
|
|
|
|
if (existing) return existing;
|
|
|
|
|
|
|
|
|
|
const opts = this.baseSessionOptions(this.sessionCanUseTool);
|
|
|
|
|
let session: Session;
|
|
|
|
|
|
|
|
|
|
if (this.store.conversationId) {
|
|
|
|
|
// In per-channel mode, look up per-key conversation ID.
|
|
|
|
|
// In shared mode (key === "shared"), use the legacy single conversationId.
|
|
|
|
|
const convId = key === 'shared'
|
|
|
|
|
? this.store.conversationId
|
|
|
|
|
: this.store.getConversationId(key);
|
|
|
|
|
|
|
|
|
|
if (convId) {
|
|
|
|
|
process.env.LETTA_AGENT_ID = this.store.agentId || undefined;
|
|
|
|
|
session = resumeSession(this.store.conversationId, opts);
|
|
|
|
|
session = resumeSession(convId, opts);
|
|
|
|
|
} else if (this.store.agentId) {
|
|
|
|
|
process.env.LETTA_AGENT_ID = this.store.agentId;
|
|
|
|
|
session = createSession(this.store.agentId, opts);
|
|
|
|
|
@@ -345,21 +386,36 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Initialize eagerly so the subprocess is ready before the first send()
|
|
|
|
|
console.log('[Bot] Initializing session subprocess...');
|
|
|
|
|
console.log(`[Bot] Initializing session subprocess (key=${key})...`);
|
|
|
|
|
await session.initialize();
|
|
|
|
|
console.log('[Bot] Session subprocess ready');
|
|
|
|
|
this.persistentSession = session;
|
|
|
|
|
console.log(`[Bot] Session subprocess ready (key=${key})`);
|
|
|
|
|
this.sessions.set(key, session);
|
|
|
|
|
return session;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/** Legacy convenience: resolve key from shared/per-channel mode and delegate. */
|
|
|
|
|
private async ensureSession(): Promise<Session> {
|
|
|
|
|
return this.ensureSessionForKey('shared');
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Destroy the persistent session so the next ensureSession() spawns a fresh one.
|
|
|
|
|
* Destroy session(s). If key provided, destroys only that key.
|
|
|
|
|
* If key is undefined, destroys ALL sessions.
|
|
|
|
|
*/
|
|
|
|
|
private invalidateSession(): void {
|
|
|
|
|
if (this.persistentSession) {
|
|
|
|
|
console.log('[Bot] Invalidating persistent session');
|
|
|
|
|
this.persistentSession.close();
|
|
|
|
|
this.persistentSession = null;
|
|
|
|
|
private invalidateSession(key?: string): void {
|
|
|
|
|
if (key) {
|
|
|
|
|
const session = this.sessions.get(key);
|
|
|
|
|
if (session) {
|
|
|
|
|
console.log(`[Bot] Invalidating session (key=${key})`);
|
|
|
|
|
session.close();
|
|
|
|
|
this.sessions.delete(key);
|
|
|
|
|
}
|
|
|
|
|
} else {
|
|
|
|
|
for (const [k, session] of this.sessions) {
|
|
|
|
|
console.log(`[Bot] Invalidating session (key=${k})`);
|
|
|
|
|
session.close();
|
|
|
|
|
}
|
|
|
|
|
this.sessions.clear();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -369,7 +425,11 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
async warmSession(): Promise<void> {
|
|
|
|
|
if (!this.store.agentId && !this.store.conversationId) return;
|
|
|
|
|
try {
|
|
|
|
|
await this.ensureSession();
|
|
|
|
|
// In shared mode, warm the single session. In per-channel mode, warm nothing
|
|
|
|
|
// (sessions are created on first message per channel).
|
|
|
|
|
if (this.config.conversationMode !== 'per-channel') {
|
|
|
|
|
await this.ensureSessionForKey('shared');
|
|
|
|
|
}
|
|
|
|
|
} catch (err) {
|
|
|
|
|
console.warn('[Bot] Session pre-warm failed:', err instanceof Error ? err.message : err);
|
|
|
|
|
}
|
|
|
|
|
@@ -377,18 +437,27 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Persist conversation ID after a successful session result.
|
|
|
|
|
* Agent ID and first-run setup are handled eagerly in ensureSession().
|
|
|
|
|
* Agent ID and first-run setup are handled eagerly in ensureSessionForKey().
|
|
|
|
|
*/
|
|
|
|
|
private persistSessionState(session: Session): void {
|
|
|
|
|
// Agent ID already persisted in ensureSession() on creation.
|
|
|
|
|
private persistSessionState(session: Session, convKey?: string): void {
|
|
|
|
|
// Agent ID already persisted in ensureSessionForKey() on creation.
|
|
|
|
|
// Here we only update if the server returned a different one (shouldn't happen).
|
|
|
|
|
if (session.agentId && session.agentId !== this.store.agentId) {
|
|
|
|
|
const currentBaseUrl = process.env.LETTA_BASE_URL || 'https://api.letta.com';
|
|
|
|
|
this.store.setAgent(session.agentId, currentBaseUrl, session.conversationId || undefined);
|
|
|
|
|
console.log('[Bot] Agent ID updated:', session.agentId);
|
|
|
|
|
} else if (session.conversationId && session.conversationId !== this.store.conversationId) {
|
|
|
|
|
this.store.conversationId = session.conversationId;
|
|
|
|
|
console.log('[Bot] Conversation ID updated:', session.conversationId);
|
|
|
|
|
} else if (session.conversationId) {
|
|
|
|
|
// In per-channel mode, persist per-key. In shared mode, use legacy field.
|
|
|
|
|
if (convKey && convKey !== 'shared') {
|
|
|
|
|
const existing = this.store.getConversationId(convKey);
|
|
|
|
|
if (session.conversationId !== existing) {
|
|
|
|
|
this.store.setConversationId(convKey, session.conversationId);
|
|
|
|
|
console.log(`[Bot] Conversation ID updated (key=${convKey}):`, session.conversationId);
|
|
|
|
|
}
|
|
|
|
|
} else if (session.conversationId !== this.store.conversationId) {
|
|
|
|
|
this.store.conversationId = session.conversationId;
|
|
|
|
|
console.log('[Bot] Conversation ID updated:', session.conversationId);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -404,30 +473,35 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
*/
|
|
|
|
|
private async runSession(
|
|
|
|
|
message: SendMessage,
|
|
|
|
|
options: { retried?: boolean; canUseTool?: CanUseToolCallback } = {},
|
|
|
|
|
options: { retried?: boolean; canUseTool?: CanUseToolCallback; convKey?: string } = {},
|
|
|
|
|
): Promise<{ session: Session; stream: () => AsyncGenerator<StreamMsg> }> {
|
|
|
|
|
const { retried = false, canUseTool } = options;
|
|
|
|
|
const { retried = false, canUseTool, convKey = 'shared' } = options;
|
|
|
|
|
|
|
|
|
|
// Update the per-message callback before sending
|
|
|
|
|
this.currentCanUseTool = canUseTool;
|
|
|
|
|
|
|
|
|
|
let session = await this.ensureSession();
|
|
|
|
|
let session = await this.ensureSessionForKey(convKey);
|
|
|
|
|
|
|
|
|
|
// Resolve the conversation ID for this key (for error recovery)
|
|
|
|
|
const convId = convKey === 'shared'
|
|
|
|
|
? this.store.conversationId
|
|
|
|
|
: this.store.getConversationId(convKey);
|
|
|
|
|
|
|
|
|
|
// Send message with fallback chain
|
|
|
|
|
try {
|
|
|
|
|
await session.send(message);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
// 409 CONFLICT from orphaned approval
|
|
|
|
|
if (!retried && isApprovalConflictError(error) && this.store.agentId && this.store.conversationId) {
|
|
|
|
|
if (!retried && isApprovalConflictError(error) && this.store.agentId && convId) {
|
|
|
|
|
console.log('[Bot] CONFLICT detected - attempting orphaned approval recovery...');
|
|
|
|
|
this.invalidateSession();
|
|
|
|
|
this.invalidateSession(convKey);
|
|
|
|
|
const result = await recoverOrphanedConversationApproval(
|
|
|
|
|
this.store.agentId,
|
|
|
|
|
this.store.conversationId
|
|
|
|
|
convId
|
|
|
|
|
);
|
|
|
|
|
if (result.recovered) {
|
|
|
|
|
console.log(`[Bot] Recovery succeeded (${result.details}), retrying...`);
|
|
|
|
|
return this.runSession(message, { retried: true, canUseTool });
|
|
|
|
|
return this.runSession(message, { retried: true, canUseTool, convKey });
|
|
|
|
|
}
|
|
|
|
|
console.error(`[Bot] Orphaned approval recovery failed: ${result.details}`);
|
|
|
|
|
throw error;
|
|
|
|
|
@@ -437,28 +511,29 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
// Only retry on errors that indicate missing conversation/agent, not
|
|
|
|
|
// on auth, network, or protocol errors (which would just fail again).
|
|
|
|
|
if (this.store.agentId && isConversationMissingError(error)) {
|
|
|
|
|
console.warn('[Bot] Conversation not found, creating a new conversation...');
|
|
|
|
|
this.invalidateSession();
|
|
|
|
|
session = await this.ensureSession();
|
|
|
|
|
console.warn(`[Bot] Conversation not found (key=${convKey}), creating a new conversation...`);
|
|
|
|
|
this.invalidateSession(convKey);
|
|
|
|
|
if (convKey !== 'shared') {
|
|
|
|
|
this.store.clearConversation(convKey);
|
|
|
|
|
} else {
|
|
|
|
|
this.store.conversationId = null;
|
|
|
|
|
}
|
|
|
|
|
session = await this.ensureSessionForKey(convKey);
|
|
|
|
|
await session.send(message);
|
|
|
|
|
} else {
|
|
|
|
|
// Unknown error -- invalidate so we get a fresh subprocess next time
|
|
|
|
|
this.invalidateSession();
|
|
|
|
|
this.invalidateSession(convKey);
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Persist conversation ID immediately after successful send, before streaming.
|
|
|
|
|
// If streaming disconnects/aborts before result, the next turn will still
|
|
|
|
|
// resume the correct conversation instead of forking a new one.
|
|
|
|
|
if (session.conversationId && session.conversationId !== this.store.conversationId) {
|
|
|
|
|
this.store.conversationId = session.conversationId;
|
|
|
|
|
console.log('[Bot] Saved conversation ID:', session.conversationId);
|
|
|
|
|
}
|
|
|
|
|
this.persistSessionState(session, convKey);
|
|
|
|
|
|
|
|
|
|
// Return session and a deduplicated stream generator
|
|
|
|
|
const seenToolCallIds = new Set<string>();
|
|
|
|
|
const self = this;
|
|
|
|
|
const capturedConvKey = convKey; // Capture for closure
|
|
|
|
|
|
|
|
|
|
async function* dedupedStream(): AsyncGenerator<StreamMsg> {
|
|
|
|
|
for await (const raw of session.stream()) {
|
|
|
|
|
@@ -475,7 +550,7 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
|
|
|
|
|
// Persist state on result
|
|
|
|
|
if (msg.type === 'result') {
|
|
|
|
|
self.persistSessionState(session);
|
|
|
|
|
self.persistSessionState(session, capturedConvKey);
|
|
|
|
|
break;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
@@ -490,7 +565,7 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
|
|
|
|
|
registerChannel(adapter: ChannelAdapter): void {
|
|
|
|
|
adapter.onMessage = (msg) => this.handleMessage(msg, adapter);
|
|
|
|
|
adapter.onCommand = (cmd) => this.handleCommand(cmd);
|
|
|
|
|
adapter.onCommand = (cmd) => this.handleCommand(cmd, adapter.id);
|
|
|
|
|
this.channels.set(adapter.id, adapter);
|
|
|
|
|
console.log(`Registered channel: ${adapter.name}`);
|
|
|
|
|
}
|
|
|
|
|
@@ -523,9 +598,14 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.messageQueue.push({ msg: effective, adapter });
|
|
|
|
|
if (!this.processing) {
|
|
|
|
|
this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err));
|
|
|
|
|
if (this.config.conversationMode === 'per-channel') {
|
|
|
|
|
const convKey = this.resolveConversationKey(effective.channel);
|
|
|
|
|
this.enqueueForKey(convKey, effective, adapter);
|
|
|
|
|
} else {
|
|
|
|
|
this.messageQueue.push({ msg: effective, adapter });
|
|
|
|
|
if (!this.processing) {
|
|
|
|
|
this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -533,7 +613,7 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
// Commands
|
|
|
|
|
// =========================================================================
|
|
|
|
|
|
|
|
|
|
private async handleCommand(command: string): Promise<string | null> {
|
|
|
|
|
private async handleCommand(command: string, channelId?: string): Promise<string | null> {
|
|
|
|
|
console.log(`[Command] Received: /${command}`);
|
|
|
|
|
switch (command) {
|
|
|
|
|
case 'status': {
|
|
|
|
|
@@ -557,12 +637,35 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
return '⏰ Heartbeat triggered (silent mode - check server logs)';
|
|
|
|
|
}
|
|
|
|
|
case 'reset': {
|
|
|
|
|
const oldConversationId = this.store.conversationId;
|
|
|
|
|
this.store.conversationId = null;
|
|
|
|
|
const convKey = channelId ? this.resolveConversationKey(channelId) : undefined;
|
|
|
|
|
if (convKey && convKey !== 'shared') {
|
|
|
|
|
// Per-channel mode: only clear the conversation for this channel
|
|
|
|
|
this.store.clearConversation(convKey);
|
|
|
|
|
this.invalidateSession(convKey);
|
|
|
|
|
console.log(`[Command] /reset - conversation cleared for ${convKey}`);
|
|
|
|
|
// Eagerly create the new session so we can report the conversation ID
|
|
|
|
|
try {
|
|
|
|
|
const session = await this.ensureSessionForKey(convKey);
|
|
|
|
|
const newConvId = session.conversationId || '(pending)';
|
|
|
|
|
this.persistSessionState(session, convKey);
|
|
|
|
|
return `Conversation reset for this channel. New conversation: ${newConvId}\nOther channels are unaffected. (Agent memory is preserved.)`;
|
|
|
|
|
} catch {
|
|
|
|
|
return `Conversation reset for this channel. Other channels are unaffected. (Agent memory is preserved.)`;
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
// Shared mode or no channel context: clear everything
|
|
|
|
|
this.store.clearConversation();
|
|
|
|
|
this.store.resetRecoveryAttempts();
|
|
|
|
|
this.invalidateSession(); // Subprocess has old conversation baked in
|
|
|
|
|
console.log(`[Command] /reset - conversation cleared (was: ${oldConversationId})`);
|
|
|
|
|
return 'Conversation reset. Send a message to start a new conversation. (Agent memory is preserved.)';
|
|
|
|
|
this.invalidateSession();
|
|
|
|
|
console.log('[Command] /reset - all conversations cleared');
|
|
|
|
|
try {
|
|
|
|
|
const session = await this.ensureSessionForKey('shared');
|
|
|
|
|
const newConvId = session.conversationId || '(pending)';
|
|
|
|
|
this.persistSessionState(session, 'shared');
|
|
|
|
|
return `Conversation reset. New conversation: ${newConvId}\n(Agent memory is preserved.)`;
|
|
|
|
|
} catch {
|
|
|
|
|
return 'Conversation reset. Send a message to start a new conversation. (Agent memory is preserved.)';
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
default:
|
|
|
|
|
return null;
|
|
|
|
|
@@ -690,12 +793,59 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
return;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.messageQueue.push({ msg, adapter });
|
|
|
|
|
if (!this.processing) {
|
|
|
|
|
this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err));
|
|
|
|
|
if (this.config.conversationMode === 'per-channel') {
|
|
|
|
|
// Per-channel mode: messages on different channels can run in parallel.
|
|
|
|
|
// Only serialize within the same conversation key.
|
|
|
|
|
const convKey = this.resolveConversationKey(msg.channel);
|
|
|
|
|
this.enqueueForKey(convKey, msg, adapter);
|
|
|
|
|
} else {
|
|
|
|
|
// Shared mode: single global queue (existing behavior)
|
|
|
|
|
this.messageQueue.push({ msg, adapter });
|
|
|
|
|
if (!this.processing) {
|
|
|
|
|
this.processQueue().catch(err => console.error('[Queue] Fatal error in processQueue:', err));
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Enqueue a message for a specific conversation key.
|
|
|
|
|
* Messages with the same key are serialized; different keys run in parallel.
|
|
|
|
|
*/
|
|
|
|
|
private keyedQueues: Map<string, Array<{ msg: InboundMessage; adapter: ChannelAdapter }>> = new Map();
|
|
|
|
|
|
|
|
|
|
private enqueueForKey(key: string, msg: InboundMessage, adapter: ChannelAdapter): void {
|
|
|
|
|
let queue = this.keyedQueues.get(key);
|
|
|
|
|
if (!queue) {
|
|
|
|
|
queue = [];
|
|
|
|
|
this.keyedQueues.set(key, queue);
|
|
|
|
|
}
|
|
|
|
|
queue.push({ msg, adapter });
|
|
|
|
|
|
|
|
|
|
if (!this.processingKeys.has(key)) {
|
|
|
|
|
this.processKeyedQueue(key).catch(err =>
|
|
|
|
|
console.error(`[Queue] Fatal error in processKeyedQueue(${key}):`, err)
|
|
|
|
|
);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private async processKeyedQueue(key: string): Promise<void> {
|
|
|
|
|
if (this.processingKeys.has(key)) return;
|
|
|
|
|
this.processingKeys.add(key);
|
|
|
|
|
|
|
|
|
|
const queue = this.keyedQueues.get(key);
|
|
|
|
|
while (queue && queue.length > 0) {
|
|
|
|
|
const { msg, adapter } = queue.shift()!;
|
|
|
|
|
try {
|
|
|
|
|
await this.processMessage(msg, adapter);
|
|
|
|
|
} catch (error) {
|
|
|
|
|
console.error(`[Queue] Error processing message (key=${key}):`, error);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.processingKeys.delete(key);
|
|
|
|
|
this.keyedQueues.delete(key);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private async processQueue(): Promise<void> {
|
|
|
|
|
if (this.processing || this.messageQueue.length === 0) return;
|
|
|
|
|
|
|
|
|
|
@@ -815,7 +965,8 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
// Run session
|
|
|
|
|
let session: Session | null = null;
|
|
|
|
|
try {
|
|
|
|
|
const run = await this.runSession(messageToSend, { retried, canUseTool });
|
|
|
|
|
const convKey = this.resolveConversationKey(msg.channel);
|
|
|
|
|
const run = await this.runSession(messageToSend, { retried, canUseTool, convKey });
|
|
|
|
|
lap('session send');
|
|
|
|
|
session = run.session;
|
|
|
|
|
|
|
|
|
|
@@ -992,15 +1143,19 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
console.error(`[Bot] Warning: Agent returned terminal error (error=${streamMsg.error}, stopReason=${streamMsg.stopReason || 'N/A'}) with no response.`);
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
if (!retried && this.store.agentId && this.store.conversationId) {
|
|
|
|
|
const retryConvKey = this.resolveConversationKey(msg.channel);
|
|
|
|
|
const retryConvId = retryConvKey === 'shared'
|
|
|
|
|
? this.store.conversationId
|
|
|
|
|
: this.store.getConversationId(retryConvKey);
|
|
|
|
|
if (!retried && this.store.agentId && retryConvId) {
|
|
|
|
|
const reason = shouldRetryForErrorResult ? 'error result' : 'empty result';
|
|
|
|
|
console.log(`[Bot] ${reason} - attempting orphaned approval recovery...`);
|
|
|
|
|
this.invalidateSession();
|
|
|
|
|
this.invalidateSession(retryConvKey);
|
|
|
|
|
session = null;
|
|
|
|
|
clearInterval(typingInterval);
|
|
|
|
|
const convResult = await recoverOrphanedConversationApproval(
|
|
|
|
|
this.store.agentId,
|
|
|
|
|
this.store.conversationId
|
|
|
|
|
retryConvId
|
|
|
|
|
);
|
|
|
|
|
if (convResult.recovered) {
|
|
|
|
|
console.log(`[Bot] Recovery succeeded (${convResult.details}), retrying message...`);
|
|
|
|
|
@@ -1127,19 +1282,48 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
// sendToAgent - Background triggers (heartbeats, cron, webhooks)
|
|
|
|
|
// =========================================================================
|
|
|
|
|
|
|
|
|
|
/**
|
|
|
|
|
* Acquire the appropriate lock for a conversation key.
|
|
|
|
|
* In per-channel mode with a dedicated key, no lock needed (parallel OK).
|
|
|
|
|
* In per-channel mode with a channel key, wait for that key's queue.
|
|
|
|
|
* In shared mode, use the global processing flag.
|
|
|
|
|
*/
|
|
|
|
|
private async acquireLock(convKey: string): Promise<boolean> {
|
|
|
|
|
if (convKey === 'heartbeat') return false; // No lock needed
|
|
|
|
|
|
|
|
|
|
if (this.config.conversationMode === 'per-channel') {
|
|
|
|
|
while (this.processingKeys.has(convKey)) {
|
|
|
|
|
await new Promise(resolve => setTimeout(resolve, 1000));
|
|
|
|
|
}
|
|
|
|
|
this.processingKeys.add(convKey);
|
|
|
|
|
} else {
|
|
|
|
|
while (this.processing) {
|
|
|
|
|
await new Promise(resolve => setTimeout(resolve, 1000));
|
|
|
|
|
}
|
|
|
|
|
this.processing = true;
|
|
|
|
|
}
|
|
|
|
|
return true;
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
private releaseLock(convKey: string, acquired: boolean): void {
|
|
|
|
|
if (!acquired) return;
|
|
|
|
|
if (this.config.conversationMode === 'per-channel') {
|
|
|
|
|
this.processingKeys.delete(convKey);
|
|
|
|
|
} else {
|
|
|
|
|
this.processing = false;
|
|
|
|
|
this.processQueue();
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
async sendToAgent(
|
|
|
|
|
text: string,
|
|
|
|
|
_context?: TriggerContext
|
|
|
|
|
): Promise<string> {
|
|
|
|
|
// Serialize with message queue to prevent 409 conflicts
|
|
|
|
|
while (this.processing) {
|
|
|
|
|
await new Promise(resolve => setTimeout(resolve, 1000));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.processing = true;
|
|
|
|
|
const convKey = this.resolveHeartbeatConversationKey();
|
|
|
|
|
const acquired = await this.acquireLock(convKey);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
const { stream } = await this.runSession(text);
|
|
|
|
|
const { stream } = await this.runSession(text, { convKey });
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
let response = '';
|
|
|
|
|
@@ -1162,12 +1346,11 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
return response;
|
|
|
|
|
} catch (error) {
|
|
|
|
|
// Invalidate on stream errors so next call gets a fresh subprocess
|
|
|
|
|
this.invalidateSession();
|
|
|
|
|
this.invalidateSession(convKey);
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
this.processing = false;
|
|
|
|
|
this.processQueue();
|
|
|
|
|
this.releaseLock(convKey, acquired);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
@@ -1179,25 +1362,20 @@ export class LettaBot implements AgentSession {
|
|
|
|
|
text: string,
|
|
|
|
|
_context?: TriggerContext
|
|
|
|
|
): AsyncGenerator<StreamMsg> {
|
|
|
|
|
// Serialize with message queue to prevent 409 conflicts
|
|
|
|
|
while (this.processing) {
|
|
|
|
|
await new Promise(resolve => setTimeout(resolve, 1000));
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
this.processing = true;
|
|
|
|
|
const convKey = this.resolveHeartbeatConversationKey();
|
|
|
|
|
const acquired = await this.acquireLock(convKey);
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
const { stream } = await this.runSession(text);
|
|
|
|
|
const { stream } = await this.runSession(text, { convKey });
|
|
|
|
|
|
|
|
|
|
try {
|
|
|
|
|
yield* stream();
|
|
|
|
|
} catch (error) {
|
|
|
|
|
this.invalidateSession();
|
|
|
|
|
this.invalidateSession(convKey);
|
|
|
|
|
throw error;
|
|
|
|
|
}
|
|
|
|
|
} finally {
|
|
|
|
|
this.processing = false;
|
|
|
|
|
this.processQueue();
|
|
|
|
|
this.releaseLock(convKey, acquired);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
|