feat: migrate to Letta TS SDK v1 (alpha) (#11)

This commit is contained in:
Charles Packer
2025-10-28 23:50:57 -07:00
committed by GitHub
parent 275fca942d
commit 4ca01d199d
17 changed files with 377 additions and 332 deletions

View File

@@ -1,6 +1,11 @@
// src/cli/App.tsx
import { Letta } from "@letta-ai/letta-client";
import type { MessageCreate } from "@letta-ai/letta-client/resources/agents/agents";
import type {
ApprovalCreate,
LettaMessageUnion,
} from "@letta-ai/letta-client/resources/agents/messages";
import type { LlmConfig } from "@letta-ai/letta-client/resources/models/models";
import { Box, Static } from "ink";
import { useCallback, useEffect, useMemo, useRef, useState } from "react";
import { getResumeData } from "../agent/check-approval";
@@ -105,7 +110,7 @@ export default function App({
| "ready";
continueSession?: boolean;
startupApproval?: ApprovalRequest | null;
messageHistory?: Letta.LettaMessageUnion[];
messageHistory?: LettaMessageUnion[];
tokenStreaming?: boolean;
}) {
// Whether a stream is in flight (disables input)
@@ -132,7 +137,7 @@ export default function App({
// Model selector state
const [modelSelectorOpen, setModelSelectorOpen] = useState(false);
const [llmConfig, setLlmConfig] = useState<Letta.LlmConfig | null>(null);
const [llmConfig, setLlmConfig] = useState<LlmConfig | null>(null);
// Token streaming preference (can be toggled at runtime)
const [tokenStreamingEnabled, setTokenStreamingEnabled] =
@@ -345,9 +350,9 @@ export default function App({
const { getClient } = await import("../agent/client");
const client = await getClient();
const agent = await client.agents.retrieve(agentId);
setLlmConfig(agent.llmConfig);
setLlmConfig(agent.llm_config);
} catch (error) {
console.error("Error fetching llmConfig:", error);
console.error("Error fetching llm_config:", error);
}
};
fetchConfig();
@@ -372,7 +377,7 @@ export default function App({
// Core streaming function - iterative loop that processes conversation turns
const processConversation = useCallback(
async (
initialInput: Array<Letta.MessageCreate | Letta.ApprovalCreate>,
initialInput: Array<MessageCreate | ApprovalCreate>,
): Promise<void> => {
let currentInput = initialInput;
@@ -398,7 +403,7 @@ export default function App({
refreshDerived();
// Case 1: Turn ended normally
if (stopReason === Letta.StopReasonType.EndTurn) {
if (stopReason === "end_turn") {
setStreaming(false);
return;
}
@@ -411,7 +416,7 @@ export default function App({
}
// Case 2: Requires approval
if (stopReason === Letta.StopReasonType.RequiresApproval) {
if (stopReason === "requires_approval") {
if (!approval) {
appendError(
`Unexpected null approval with stop reason: ${stopReason}`,
@@ -453,7 +458,7 @@ export default function App({
await processConversation([
{
type: "approval",
approvalRequestId: toolCallId,
approval_request_id: toolCallId,
approve: false,
reason: denyReason,
},
@@ -479,11 +484,11 @@ export default function App({
// Update buffers with tool return
onChunk(buffersRef.current, {
messageType: "tool_return_message",
message_type: "tool_return_message",
id: "dummy",
date: new Date(),
toolCallId,
toolReturn: toolResult.toolReturn,
date: new Date().toISOString(),
tool_call_id: toolCallId,
tool_return: toolResult.toolReturn,
status: toolResult.status,
stdout: toolResult.stdout,
stderr: toolResult.stderr,
@@ -497,8 +502,8 @@ export default function App({
approvals: [
{
type: "tool",
toolCallId,
toolReturn: toolResult.toolReturn,
tool_call_id: toolCallId,
tool_return: toolResult.toolReturn,
status: toolResult.status,
stdout: toolResult.stdout,
stderr: toolResult.stderr,
@@ -664,6 +669,61 @@ export default function App({
return { submitted: true };
}
// Special handling for /clear command - reset conversation
if (msg.trim() === "/clear") {
const cmdId = uid("cmd");
buffersRef.current.byId.set(cmdId, {
kind: "command",
id: cmdId,
input: msg,
output: "Clearing conversation...",
phase: "running",
});
buffersRef.current.order.push(cmdId);
refreshDerived();
setCommandRunning(true);
try {
const client = getClient();
await client.agents.messages.reset(agentId, {
add_default_initial_messages: false,
});
// Clear local buffers and static items
// buffersRef.current.byId.clear();
// buffersRef.current.order = [];
// buffersRef.current.tokenCount = 0;
// emittedIdsRef.current.clear();
// setStaticItems([]);
// Update command with success
buffersRef.current.byId.set(cmdId, {
kind: "command",
id: cmdId,
input: msg,
output: "Conversation cleared",
phase: "finished",
success: true,
});
buffersRef.current.order.push(cmdId);
refreshDerived();
} catch (error) {
buffersRef.current.byId.set(cmdId, {
kind: "command",
id: cmdId,
input: msg,
output: `Failed: ${error instanceof Error ? error.message : String(error)}`,
phase: "finished",
success: false,
});
refreshDerived();
} finally {
setCommandRunning(false);
}
return { submitted: true };
}
// Immediately add command to transcript with "running" phase
const cmdId = uid("cmd");
buffersRef.current.byId.set(cmdId, {
@@ -782,8 +842,8 @@ export default function App({
// Start the conversation loop
await processConversation([
{
role: Letta.MessageCreateRole.User,
content: messageContent as unknown as Letta.MessageCreate["content"],
role: "user",
content: messageContent as unknown as MessageCreate["content"],
},
]);
@@ -817,11 +877,11 @@ export default function App({
// Update buffers with tool return
onChunk(buffersRef.current, {
messageType: "tool_return_message",
message_type: "tool_return_message",
id: "dummy",
date: new Date(),
toolCallId,
toolReturn: toolResult.toolReturn,
date: new Date().toISOString(),
tool_call_id: toolCallId,
tool_return: toolResult.toolReturn,
status: toolResult.status,
stdout: toolResult.stdout,
stderr: toolResult.stderr,
@@ -837,8 +897,8 @@ export default function App({
approvals: [
{
type: "tool",
toolCallId,
toolReturn: toolResult.toolReturn,
tool_call_id: toolCallId,
tool_return: toolResult.toolReturn,
status: toolResult.status,
stdout: toolResult.stdout,
stderr: toolResult.stderr,
@@ -897,7 +957,7 @@ export default function App({
await processConversation([
{
type: "approval",
approvalRequestId: toolCallId,
approval_request_id: toolCallId,
approve: false,
reason: reason || "User denied the tool execution",
// TODO the above is legacy?
@@ -1031,11 +1091,11 @@ export default function App({
// Update buffers with tool return
onChunk(buffersRef.current, {
messageType: "tool_return_message",
message_type: "tool_return_message",
id: "dummy",
date: new Date(),
toolCallId,
toolReturn: toolResult.toolReturn,
date: new Date().toISOString(),
tool_call_id: toolCallId,
tool_return: toolResult.toolReturn,
status: toolResult.status,
stdout: toolResult.stdout,
stderr: toolResult.stderr,
@@ -1052,8 +1112,8 @@ export default function App({
approvals: [
{
type: "tool",
toolCallId,
toolReturn: toolResult.toolReturn,
tool_call_id: toolCallId,
tool_return: toolResult.toolReturn,
status: toolResult.status,
stdout: toolResult.stdout,
stderr: toolResult.stderr,
@@ -1085,7 +1145,7 @@ export default function App({
await processConversation([
{
type: "approval",
approvalRequestId: toolCallId,
approval_request_id: toolCallId,
approve: false,
reason:
reason ||

View File

@@ -36,6 +36,13 @@ export const commands: Record<string, Command> = {
return "Exiting...";
},
},
"/clear": {
desc: "Clear conversation history",
handler: () => {
// Handled specially in App.tsx to access client and agent ID
return "Clearing messages...";
},
},
};
/**

View File

@@ -4,7 +4,7 @@
// - Tool calls update in-place (same toolCallId for call+return).
// - Exposes `onChunk` to feed SDK events and `toLines` to render.
import type { Letta } from "@letta-ai/letta-client";
import type { LettaStreamingChunk } from "../../agent/message";
// One line per transcript row. Tool calls evolve in-place.
// For tool call returns, merge into the tool call matching the toolCallId
@@ -194,10 +194,7 @@ function extractTextPart(v: unknown): string {
}
// Feed one SDK chunk; mutate buffers in place.
export function onChunk(
b: Buffers,
chunk: Letta.agents.LettaStreamingResponse,
) {
export function onChunk(b: Buffers, chunk: LettaStreamingChunk) {
// TODO remove once SDK v1 has proper typing for in-stream errors
// Check for streaming error objects (not typed in SDK but emitted by backend)
// These are emitted when LLM errors occur during streaming (rate limits, timeouts, etc.)
@@ -283,20 +280,28 @@ export function onChunk(
if (!id) break;
const toolCallId = chunk.toolCall?.toolCallId;
const name = chunk.toolCall?.name;
const argsText = chunk.toolCall?.arguments;
const toolCall = chunk.tool_call || (Array.isArray(chunk.tool_calls) && chunk.tool_calls.length > 0 ? chunk.tool_calls[0] : null);
const toolCallId = toolCall?.tool_call_id;
const name = toolCall?.name;
const argsText = toolCall?.arguments;
// Record correlation: toolCallId → line id (otid)
if (toolCallId) b.toolCallIdToLineId.set(toolCallId, id);
*/
let id = chunk.otid;
// console.log(`[TOOL_CALL] Received ${chunk.messageType} with otid=${id}, toolCallId=${chunk.toolCall?.toolCallId}, name=${chunk.toolCall?.name}`);
// console.log(`[TOOL_CALL] Received ${chunk.message_type} with otid=${id}, toolCallId=${chunk.tool_call?.tool_call_id}, name=${chunk.tool_call?.name}`);
const toolCallId = chunk.toolCall?.toolCallId;
const name = chunk.toolCall?.name;
const argsText = chunk.toolCall?.arguments;
// Use deprecated tool_call or new tool_calls array
const toolCall =
chunk.tool_call ||
(Array.isArray(chunk.tool_calls) && chunk.tool_calls.length > 0
? chunk.tool_calls[0]
: null);
const toolCallId = toolCall?.tool_call_id;
const name = toolCall?.name;
const argsText = toolCall?.arguments;
// ========== START BACKEND BUG WORKAROUND (Remove after OTID fix) ==========
// Bug: Backend sends same otid for reasoning and tool_call, and multiple otids for same tool_call
@@ -310,7 +315,7 @@ export function onChunk(
}
// Handle otid transition for tracking purposes
handleOtidTransition(b, chunk.otid);
handleOtidTransition(b, chunk.otid ?? undefined);
} else {
// Check if this otid is already used by a reasoning line
if (id && b.byId.has(id)) {
@@ -327,7 +332,7 @@ export function onChunk(
// This part stays after fix:
// Handle otid transition (mark previous line as finished)
// This must happen BEFORE the break, so reasoning gets finished even when tool has no otid
handleOtidTransition(b, id);
handleOtidTransition(b, id ?? undefined);
if (!id) {
// console.log(`[TOOL_CALL] No otid, breaking`);
@@ -338,21 +343,24 @@ export function onChunk(
if (toolCallId) b.toolCallIdToLineId.set(toolCallId, id);
}
// Early exit if no valid id
if (!id) break;
const desiredPhase =
chunk.messageType === "approval_request_message"
chunk.message_type === "approval_request_message"
? "ready"
: "streaming";
const line = ensure<ToolCallLine>(b, id, () => ({
kind: "tool_call",
id,
toolCallId: toolCallId,
name: name,
toolCallId: toolCallId ?? undefined,
name: name ?? undefined,
phase: desiredPhase,
}));
// If this is an approval request and the line already exists, bump phase to ready
if (
chunk.messageType === "approval_request_message" &&
chunk.message_type === "approval_request_message" &&
line.phase !== "finished"
) {
b.byId.set(id, { ...line, phase: "ready" });
@@ -372,8 +380,8 @@ export function onChunk(
case "tool_return_message": {
// Tool return is a special case
// It will have a different otid than the tool call, but we want to merge into the tool call
const toolCallId = chunk.toolCallId;
const resultText = chunk.toolReturn;
const toolCallId = chunk.tool_call_id;
const resultText = chunk.tool_return;
const status = chunk.status;
// Look up the line by toolCallId
@@ -401,17 +409,17 @@ export function onChunk(
case "usage_statistics": {
// Accumulate usage statistics from the stream
// These messages arrive after stop_reason in the stream
if (chunk.promptTokens !== undefined) {
b.usage.promptTokens += chunk.promptTokens;
if (chunk.prompt_tokens !== undefined) {
b.usage.promptTokens += chunk.prompt_tokens;
}
if (chunk.completionTokens !== undefined) {
b.usage.completionTokens += chunk.completionTokens;
if (chunk.completion_tokens !== undefined) {
b.usage.completionTokens += chunk.completion_tokens;
}
if (chunk.totalTokens !== undefined) {
b.usage.totalTokens += chunk.totalTokens;
if (chunk.total_tokens !== undefined) {
b.usage.totalTokens += chunk.total_tokens;
}
if (chunk.stepCount !== undefined) {
b.usage.stepCount += chunk.stepCount;
if (chunk.step_count !== undefined) {
b.usage.stepCount += chunk.step_count;
}
break;
}

View File

@@ -1,4 +1,8 @@
import type { Letta } from "@letta-ai/letta-client";
import type {
LettaAssistantMessageContentUnion,
LettaMessageUnion,
LettaUserMessageContentUnion,
} from "@letta-ai/letta-client/resources/agents/messages";
import type { Buffers } from "./accumulator";
// const PASTE_LINE_THRESHOLD = 5;
@@ -16,7 +20,7 @@ function clip(s: string, limit: number): string {
}
function renderAssistantContentParts(
parts: Letta.AssistantMessageContent,
parts: string | LettaAssistantMessageContentUnion[],
): string {
// AssistantContent can be a string or an array of text parts
if (typeof parts === "string") return parts;
@@ -29,7 +33,9 @@ function renderAssistantContentParts(
return out;
}
function renderUserContentParts(parts: Letta.UserMessageContent): string {
function renderUserContentParts(
parts: string | LettaUserMessageContentUnion[],
): string {
// UserContent can be a string or an array of text OR image parts
// for text parts, we clip them if they're too big (eg copy-pasted chunks)
// for image parts, we just show a placeholder
@@ -49,7 +55,7 @@ function renderUserContentParts(parts: Letta.UserMessageContent): string {
export function backfillBuffers(
buffers: Buffers,
history: Letta.LettaMessageUnion[],
history: LettaMessageUnion[],
): void {
// Clear buffers to ensure idempotency (in case this is called multiple times)
buffers.order = [];
@@ -65,7 +71,7 @@ export function backfillBuffers(
// Use otid as line ID when available (like streaming does), fall back to msg.id
const lineId = "otid" in msg && msg.otid ? msg.otid : msg.id;
switch (msg.messageType) {
switch (msg.message_type) {
// user message - content parts may include text and image parts
case "user_message": {
const exists = buffers.byId.has(lineId);
@@ -107,9 +113,16 @@ export function backfillBuffers(
// tool call message OR approval request (they're the same in history)
case "tool_call_message":
case "approval_request_message": {
if ("toolCall" in msg && msg.toolCall?.toolCallId) {
const toolCall = msg.toolCall;
const toolCallId = toolCall.toolCallId;
// Use tool_calls array (new) or fallback to tool_call (deprecated)
const toolCalls = Array.isArray(msg.tool_calls)
? msg.tool_calls
: msg.tool_call
? [msg.tool_call]
: [];
if (toolCalls.length > 0 && toolCalls[0]?.tool_call_id) {
const toolCall = toolCalls[0];
const toolCallId = toolCall.tool_call_id;
const exists = buffers.byId.has(lineId);
buffers.byId.set(lineId, {
@@ -130,7 +143,7 @@ export function backfillBuffers(
// tool return message - merge into the existing tool call line
case "tool_return_message": {
const toolCallId = msg.toolCallId;
const toolCallId = msg.tool_call_id;
if (!toolCallId) break;
// Look up the line using the mapping (like streaming does)
@@ -143,7 +156,7 @@ export function backfillBuffers(
// Update the existing line with the result
buffers.byId.set(toolCallLineId, {
...existingLine,
resultText: msg.toolReturn,
resultText: msg.tool_return,
resultOk: msg.status === "success",
phase: "finished",
});

View File

@@ -1,4 +1,7 @@
import { Letta } from "@letta-ai/letta-client";
import type { Stream } from "@letta-ai/letta-client/core/streaming";
import type { LettaStreamingResponse } from "@letta-ai/letta-client/resources/agents/messages";
import type { StopReasonType } from "@letta-ai/letta-client/resources/runs/runs";
import {
type createBuffers,
markCurrentLineAsFinished,
@@ -13,7 +16,7 @@ export type ApprovalRequest = {
};
type DrainResult = {
stopReason: Letta.StopReasonType;
stopReason: StopReasonType;
lastRunId?: string | null;
lastSeqId?: number | null;
approval?: ApprovalRequest | null; // present only if we ended due to approval
@@ -21,7 +24,7 @@ type DrainResult = {
};
export async function drainStream(
stream: AsyncIterable<Letta.LettaStreamingResponse>,
stream: Stream<LettaStreamingResponse>,
buffers: ReturnType<typeof createBuffers>,
refresh: () => void,
abortSignal?: AbortSignal,
@@ -33,29 +36,36 @@ export async function drainStream(
let toolName: string | null = null;
let toolArgs: string | null = null;
let stopReason: Letta.StopReasonType | null = null;
let stopReason: StopReasonType | null = null;
let lastRunId: string | null = null;
let lastSeqId: number | null = null;
for await (const chunk of stream) {
// console.log("chunk", chunk);
// Check if stream was aborted
if (abortSignal?.aborted) {
stopReason = "cancelled" as Letta.StopReasonType;
stopReason = "cancelled";
// Mark incomplete tool calls as cancelled to prevent stuck blinking UI
markIncompleteToolsAsCancelled(buffers);
queueMicrotask(refresh);
break;
}
// Store the runId and seqId to re-connect if stream is interrupted
if ("runId" in chunk && "seqId" in chunk && chunk.runId && chunk.seqId) {
lastRunId = chunk.runId;
lastSeqId = chunk.seqId;
// Store the run_id and seq_id to re-connect if stream is interrupted
if (
"run_id" in chunk &&
"seq_id" in chunk &&
chunk.run_id &&
chunk.seq_id
) {
lastRunId = chunk.run_id;
lastSeqId = chunk.seq_id;
}
if (chunk.messageType === "ping") continue;
if (chunk.message_type === "ping") continue;
// Need to store the approval request ID to send an approval in a new run
if (chunk.messageType === "approval_request_message") {
if (chunk.message_type === "approval_request_message") {
approvalRequestId = chunk.id;
}
@@ -63,25 +73,32 @@ export async function drainStream(
// in both the onChunk handler and here, we could refactor to instead pull the tool name
// and JSON args from the mutated lines (eg last mutated line)
if (
chunk.messageType === "tool_call_message" ||
chunk.messageType === "approval_request_message"
chunk.message_type === "tool_call_message" ||
chunk.message_type === "approval_request_message"
) {
if (chunk.toolCall?.toolCallId) {
toolCallId = chunk.toolCall.toolCallId;
// Use deprecated tool_call or new tool_calls array
const toolCall =
chunk.tool_call ||
(Array.isArray(chunk.tool_calls) && chunk.tool_calls.length > 0
? chunk.tool_calls[0]
: null);
if (toolCall?.tool_call_id) {
toolCallId = toolCall.tool_call_id;
}
if (chunk.toolCall?.name) {
if (toolCall?.name) {
if (toolName) {
// TODO would expect that we should allow stacking? I guess not?
// toolName = toolName + chunk.toolCall.name;
// toolName = toolName + toolCall.name;
} else {
toolName = chunk.toolCall.name;
toolName = toolCall.name;
}
}
if (chunk.toolCall?.arguments) {
if (toolCall?.arguments) {
if (toolArgs) {
toolArgs = toolArgs + chunk.toolCall.arguments;
toolArgs = toolArgs + toolCall.arguments;
} else {
toolArgs = chunk.toolCall.arguments;
toolArgs = toolCall.arguments;
}
}
}
@@ -89,15 +106,15 @@ export async function drainStream(
onChunk(buffers, chunk);
queueMicrotask(refresh);
if (chunk.messageType === "stop_reason") {
stopReason = chunk.stopReason;
if (chunk.message_type === "stop_reason") {
stopReason = chunk.stop_reason;
// Continue reading stream to get usage_statistics that may come after
}
}
// Stream has ended, check if we captured a stop reason
if (!stopReason) {
stopReason = Letta.StopReasonType.Error;
stopReason = "error";
}
// Mark the final line as finished now that stream has ended