diff --git a/memfs-server.py b/memfs-server.py new file mode 100755 index 0000000..b0ee596 --- /dev/null +++ b/memfs-server.py @@ -0,0 +1,179 @@ +#!/usr/bin/env python3 +""" +Local git HTTP sidecar for Letta memfs on self-hosted deployments. + +Implements git smart HTTP protocol via git-http-backend, storing bare repos under +~/.letta/memfs/repository/{org_id}/{agent_id}/repo.git/ — the exact structure that +Letta's LocalStorageBackend reads from. + +The Letta server proxies /v1/git/* requests here when LETTA_MEMFS_SERVICE_URL is set. +letta-code pushes → Letta server proxies → here → bare repo → LocalStorageBackend reads. + +Run as a systemd user service. Listens on 127.0.0.1:8285. +""" + +import os +import re +import subprocess +import sys +from http.server import BaseHTTPRequestHandler, ThreadingHTTPServer +from pathlib import Path + +LISTEN_HOST = "0.0.0.0" +LISTEN_PORT = 8285 +REPO_BASE = Path.home() / ".letta" / "memfs" / "repository" + +# /git/{agent_id}/state.git/{rest} +_PATH_RE = re.compile(r"^/git/([^/]+)/state\.git(/.*)?$") + + +def _repo_path(org_id: str, agent_id: str) -> Path: + return REPO_BASE / org_id / agent_id / "repo.git" + + +def _fix_repo_config(repo: Path) -> None: + """Ensure repo is pushable (Corykidios fix-repo-configs pattern).""" + subprocess.run(["git", "config", "--file", str(repo / "config"), "core.bare", "true"], capture_output=True) + subprocess.run(["git", "config", "--file", str(repo / "config"), "receive.denyCurrentBranch", "ignore"], capture_output=True) + subprocess.run(["git", "config", "--file", str(repo / "config"), "http.receivepack", "true"], capture_output=True) + + +def _ensure_repo(repo: Path, agent_id: str) -> Path: + """Ensure repo exists, searching all org dirs if needed (Corykidios pattern).""" + # If repo exists at expected path, use it + if repo.exists(): + _fix_repo_config(repo) + return repo + + # Search all org directories for existing agent repo + if REPO_BASE.exists(): + for org_dir in REPO_BASE.iterdir(): + if org_dir.is_dir(): + candidate = org_dir / agent_id / "repo.git" + if candidate.exists(): + print(f"[memfs] Found existing repo at {candidate}", file=sys.stderr, flush=True) + _fix_repo_config(candidate) + return candidate + + # Create new repo at expected path + repo.mkdir(parents=True, exist_ok=True) + subprocess.run(["git", "init", "--bare", str(repo)], check=True, capture_output=True) + with open(repo / "config", "a") as f: + f.write("\n[http]\n\treceivepack = true\n") + print(f"[memfs] Created new repo at {repo}", file=sys.stderr, flush=True) + return repo + + +def _read_chunked(rfile) -> bytes: + """Read and decode HTTP chunked transfer encoding from a socket file.""" + body = bytearray() + while True: + line = rfile.readline().strip() + if not line: + continue + size = int(line, 16) + if size == 0: + rfile.readline() # trailing CRLF after terminal chunk + break + body.extend(rfile.read(size)) + rfile.readline() # trailing CRLF after chunk data + return bytes(body) + + +class GitHandler(BaseHTTPRequestHandler): + def log_message(self, fmt, *args): + print(f"[memfs] {self.address_string()} {fmt % args}", file=sys.stderr, flush=True) + + def _handle(self): + raw_path = self.path.split("?")[0] + m = _PATH_RE.match(raw_path) + if not m: + self.send_error(404, "Not a valid memfs path") + return + + agent_id = m.group(1) + git_suffix = m.group(2) or "/" + org_id = self.headers.get("X-Organization-Id") or "default" + query = self.path.split("?", 1)[1] if "?" in self.path else "" + + repo = _repo_path(org_id, agent_id) + repo = _ensure_repo(repo, agent_id) # Pass agent_id for search + + # Read request body — handle both Content-Length and chunked + te = self.headers.get("Transfer-Encoding", "") + cl = self.headers.get("Content-Length") + if cl: + body = self.rfile.read(int(cl)) + elif "chunked" in te.lower(): + body = _read_chunked(self.rfile) + else: + body = b"" + + env = { + **os.environ, + "GIT_PROJECT_ROOT": str(repo.parent), + "PATH_INFO": "/repo.git" + git_suffix, + "REQUEST_METHOD": self.command, + "QUERY_STRING": query, + "CONTENT_TYPE": self.headers.get("Content-Type", ""), + "CONTENT_LENGTH": str(len(body)), + "GIT_HTTP_EXPORT_ALL": "1", + } + + result = subprocess.run( + ["git", "http-backend"], + input=body, + capture_output=True, + env=env, + ) + + if result.returncode != 0: + print(f"[memfs] git-http-backend error: {result.stderr.decode()}", file=sys.stderr) + self.send_error(500, "git-http-backend failed") + return + + # Parse CGI output (headers + body) + raw = result.stdout + header_end = raw.find(b"\r\n\r\n") + if header_end == -1: + header_end = raw.find(b"\n\n") + if header_end == -1: + self.send_error(502, "Invalid CGI response") + return + + header_block = raw[:header_end].decode("utf-8", errors="replace") + body_out = raw[header_end + 4 if raw[header_end:header_end+4] == b"\r\n\r\n" else header_end + 2:] + + status = 200 + headers = [] + for line in header_block.splitlines(): + if ":" in line: + k, _, v = line.partition(":") + k, v = k.strip(), v.strip() + if k.lower() == "status": + try: + status = int(v.split()[0]) + except ValueError: + pass + else: + headers.append((k, v)) + + self.send_response(status) + for k, v in headers: + self.send_header(k, v) + self.send_header("Content-Length", str(len(body_out))) + self.end_headers() + self.wfile.write(body_out) + + def do_GET(self): + self._handle() + + def do_POST(self): + self._handle() + + +if __name__ == "__main__": + REPO_BASE.mkdir(parents=True, exist_ok=True) + print(f"[memfs] Starting on http://{LISTEN_HOST}:{LISTEN_PORT}", flush=True) + print(f"[memfs] Repo base: {REPO_BASE}", flush=True) + ThreadingHTTPServer((LISTEN_HOST, LISTEN_PORT), GitHandler).serve_forever() diff --git a/package-lock.json b/package-lock.json index 3012d2d..aa40a11 100644 --- a/package-lock.json +++ b/package-lock.json @@ -13,7 +13,7 @@ "@clack/prompts": "^0.11.0", "@hapi/boom": "^10.0.1", "@letta-ai/letta-client": "^1.7.12", - "@letta-ai/letta-code-sdk": "^0.1.11", + "@letta-ai/letta-code-sdk": "^0.1.14", "@types/express": "^5.0.6", "@types/node": "^25.0.10", "@types/node-schedule": "^2.1.8", @@ -1376,9 +1376,9 @@ "license": "Apache-2.0" }, "node_modules/@letta-ai/letta-code": { - "version": "0.18.2", - "resolved": "https://registry.npmjs.org/@letta-ai/letta-code/-/letta-code-0.18.2.tgz", - "integrity": "sha512-HzNqMjBUiAq5IyZ8DSSWBHq/ahkd4RRYfO/V9eXMBZRTRpLb7Dae2hwvicE+aRSLmJqMdxpH6WI7+ZHKlFsILQ==", + "version": "0.19.5", + "resolved": "https://registry.npmjs.org/@letta-ai/letta-code/-/letta-code-0.19.5.tgz", + "integrity": "sha512-INEDS79dkzJoQyL3IJRof+HNob3GZXgAge/JdJRFaVfJhU/o/6aTPcPWpQwxygE5ExIDSUlL85OlZ3CcBv0TyA==", "hasInstallScript": true, "license": "Apache-2.0", "dependencies": { @@ -1387,6 +1387,7 @@ "highlight.js": "^11.11.1", "ink-link": "^5.0.0", "lowlight": "^3.3.0", + "node-pty": "^1.1.0", "open": "^10.2.0", "sharp": "^0.34.5", "ws": "^8.19.0" @@ -1402,12 +1403,12 @@ } }, "node_modules/@letta-ai/letta-code-sdk": { - "version": "0.1.11", - "resolved": "https://registry.npmjs.org/@letta-ai/letta-code-sdk/-/letta-code-sdk-0.1.11.tgz", - "integrity": "sha512-P1ueLWQuCnERizrvU3fZ9/rrMAJSIT+2j2/xxptqxMOKUuUrDmvAix1/eyDXqAwZkBVGImyqLGm4zqwNVNA7Dg==", + "version": "0.1.14", + "resolved": "https://registry.npmjs.org/@letta-ai/letta-code-sdk/-/letta-code-sdk-0.1.14.tgz", + "integrity": "sha512-rSMp7kYwRZ4PAe3jET+PETFesuYCbeodEp6Qf7a5rLu97epqs+zNegSR+UUgq6c9+c5eqbuo+BsRThTKiSNJkA==", "license": "Apache-2.0", "dependencies": { - "@letta-ai/letta-code": "0.18.2" + "@letta-ai/letta-code": "0.19.5" } }, "node_modules/@letta-ai/letta-code/node_modules/balanced-match": { @@ -5669,9 +5670,9 @@ } }, "node_modules/ink/node_modules/type-fest": { - "version": "5.4.4", - "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-5.4.4.tgz", - "integrity": "sha512-JnTrzGu+zPV3aXIUhnyWJj4z/wigMsdYajGLIYakqyOW1nPllzXEJee0QQbHj+CTIQtXGlAjuK0UY+2xTyjVAw==", + "version": "5.5.0", + "resolved": "https://registry.npmjs.org/type-fest/-/type-fest-5.5.0.tgz", + "integrity": "sha512-PlBfpQwiUvGViBNX84Yxwjsdhd1TUlXr6zjX7eoirtCPIr08NAmxwa+fcYBTeRQxHo9YC9wwF3m9i700sHma8g==", "license": "(MIT OR CC0-1.0)", "peer": true, "dependencies": { @@ -7696,8 +7697,7 @@ "version": "7.1.1", "resolved": "https://registry.npmjs.org/node-addon-api/-/node-addon-api-7.1.1.tgz", "integrity": "sha512-5m3bsyrjFWE1xf7nz7YXdN4udnVtXK6/Yfgn5qnahL6bCkf2yKt4k3nuTKAtT4r3IG8JNR2ncsIMdZuAzJjHQQ==", - "license": "MIT", - "optional": true + "license": "MIT" }, "node_modules/node-domexception": { "version": "1.0.0", @@ -7839,6 +7839,16 @@ "url": "https://github.com/sponsors/isaacs" } }, + "node_modules/node-pty": { + "version": "1.1.0", + "resolved": "https://registry.npmjs.org/node-pty/-/node-pty-1.1.0.tgz", + "integrity": "sha512-20JqtutY6JPXTUnL0ij1uad7Qe1baT46lyolh2sSENDd4sTzKZ4nmAFkeAARDKwmlLjPx6XKRlwRUxwjOy+lUg==", + "hasInstallScript": true, + "license": "MIT", + "dependencies": { + "node-addon-api": "^7.1.0" + } + }, "node_modules/node-schedule": { "version": "2.1.1", "resolved": "https://registry.npmjs.org/node-schedule/-/node-schedule-2.1.1.tgz", diff --git a/package.json b/package.json index 802797f..f7f1a71 100644 --- a/package.json +++ b/package.json @@ -70,7 +70,7 @@ "@clack/prompts": "^0.11.0", "@hapi/boom": "^10.0.1", "@letta-ai/letta-client": "^1.7.12", - "@letta-ai/letta-code-sdk": "^0.1.11", + "@letta-ai/letta-code-sdk": "^0.1.14", "@types/express": "^5.0.6", "@types/node": "^25.0.10", "@types/node-schedule": "^2.1.8", diff --git a/src/channels/factory.ts b/src/channels/factory.ts index a609ba2..26ef235 100644 --- a/src/channels/factory.ts +++ b/src/channels/factory.ts @@ -124,6 +124,8 @@ const SHARED_CHANNEL_BUILDERS: SharedChannelBuilder[] = [ groups: discord.groups, agentName: agentConfig.name, ignoreBotReactions: discord.ignoreBotReactions, + ttsUrl: discord.ttsUrl, + ttsVoice: discord.ttsVoice, }); }, }, diff --git a/src/config/types.ts b/src/config/types.ts index 0eec8bf..fc5d0f1 100644 --- a/src/config/types.ts +++ b/src/config/types.ts @@ -410,6 +410,8 @@ export interface DiscordConfig { listeningGroups?: string[]; // @deprecated Use groups..mode = "listen" groups?: Record; // Per-guild/channel settings, "*" for defaults ignoreBotReactions?: boolean; // Ignore all bot reactions (default: true). Set false for multi-bot setups. + ttsUrl?: string; // TTS API endpoint (e.g. VibeVoice) + ttsVoice?: string; // TTS voice ID } export interface BlueskyConfig { diff --git a/src/core/bot.ts b/src/core/bot.ts index 24743c2..f5ea2c7 100644 --- a/src/core/bot.ts +++ b/src/core/bot.ts @@ -16,7 +16,7 @@ import { formatApiErrorForUser } from './errors.js'; import { formatToolCallDisplay, formatReasoningDisplay, formatQuestionsForChannel } from './display.js'; import type { AgentSession } from './interfaces.js'; import { Store } from './store.js'; -import { getPendingApprovals, rejectApproval, cancelRuns, cancelConversation, recoverOrphanedConversationApproval, getLatestRunError, getAgentModel, updateAgentModel, isRecoverableConversationId, recoverPendingApprovalsForAgent } from '../tools/letta-api.js'; +import { getPendingApprovals, rejectApproval, cancelRuns, cancelConversation, recoverOrphanedConversationApproval, getLatestRunError, getAgentModel, updateAgentModel, isRecoverableConversationId, recoverPendingApprovalsForAgent, createConversationForAgent } from '../tools/letta-api.js'; import { getAgentSkillExecutableDirs, isVoiceMemoConfigured } from '../skills/loader.js'; import { formatMessageEnvelope, formatGroupBatchEnvelope, type SessionContextOptions } from './formatter.js'; import type { GroupBatcher } from './group-batcher.js'; @@ -785,6 +785,44 @@ export class LettaBot implements AgentSession { return '⏰ Heartbeat triggered (silent mode - check server logs)'; } case 'reset': { + // !reset aster — cycle only Aster's conscience conversation, leave Ani's alone. + if (args?.trim().toLowerCase() === 'aster') { + const conscienceAgentId = process.env.CONSCIENCE_AGENT_ID; + if (!conscienceAgentId) { + return 'Conscience agent not configured (CONSCIENCE_AGENT_ID not set).'; + } + const newConscienceConvId = await createConversationForAgent(conscienceAgentId); + if (!newConscienceConvId) { + return 'Failed to create a new conscience conversation. Check server logs.'; + } + // Update in-memory env var so the running process uses the new conversation immediately. + process.env.CONSCIENCE_CONVERSATION_ID = newConscienceConvId; + // Persist to store (lettabot-agent.json) for reference. + this.store.setAgentField('Aster', 'conversationId', newConscienceConvId); + // Patch the systemd service file so restarts also pick up the new conversation. + const serviceFile = '/home/ani/.config/systemd/user/ani-bridge.service'; + try { + const { writeFile } = await import('node:fs/promises'); + const current = await readFile(serviceFile, 'utf-8'); + const updated = current.replace( + /^(Environment=CONSCIENCE_CONVERSATION_ID=).+$/m, + `$1${newConscienceConvId}`, + ); + await writeFile(serviceFile, updated, 'utf-8'); + // Reload systemd unit definitions (no restart — just picks up the edited file). + await new Promise((resolve, reject) => { + execFile('systemctl', ['--user', 'daemon-reload'], (err) => { + if (err) reject(err); else resolve(); + }); + }); + log.info(`/reset aster - service file updated, daemon reloaded: ${newConscienceConvId}`); + } catch (svcErr) { + log.warn(`/reset aster - failed to patch service file: ${svcErr}`); + } + log.info(`/reset aster - conscience conversation cycled: ${newConscienceConvId}`); + return `Aster's conversation reset. New conversation: \`${newConscienceConvId}\`\nService file updated — restart-safe.`; + } + // Always scope the reset to the caller's conversation key so that // other channels/chats' conversations are never silently destroyed. // resolveConversationKey returns 'shared' for non-override channels, @@ -806,6 +844,40 @@ export class LettaBot implements AgentSession { const session = await this.sessionManager.ensureSessionForKey(convKey); const newConvId = session.conversationId || '(pending)'; this.sessionManager.persistSessionState(session, convKey); + + // Reset conscience conversation alongside Ani's. + // This ensures failure notifications target the new active conversation + // and conscience starts fresh rather than replaying a broken context. + const conscienceAgentId = process.env.CONSCIENCE_AGENT_ID; + if (conscienceAgentId) { + const newConscienceConvId = await createConversationForAgent(conscienceAgentId); + if (newConscienceConvId) { + process.env.CONSCIENCE_CONVERSATION_ID = newConscienceConvId; + this.store.setAgentField('Aster', 'conversationId', newConscienceConvId); + // Also patch the service file so restarts pick up the new conversation. + const serviceFile = '/home/ani/.config/systemd/user/ani-bridge.service'; + try { + const { writeFile } = await import('node:fs/promises'); + const current = await readFile(serviceFile, 'utf-8'); + const updated = current.replace( + /^(Environment=CONSCIENCE_CONVERSATION_ID=).+$/m, + `$1${newConscienceConvId}`, + ); + await writeFile(serviceFile, updated, 'utf-8'); + await new Promise((resolve, reject) => { + execFile('systemctl', ['--user', 'daemon-reload'], (err) => { + if (err) reject(err); else resolve(); + }); + }); + } catch (svcErr) { + log.warn(`/reset - failed to patch conscience service var: ${svcErr}`); + } + log.info(`/reset - conscience conversation cycled: ${newConscienceConvId}`); + } else { + log.warn('/reset - Failed to cycle conscience conversation; will resume the previous one.'); + } + } + if (convKey === 'shared') { return `Conversation reset. New conversation: ${newConvId}\n(Agent memory is preserved.)`; } @@ -1602,9 +1674,29 @@ export class LettaBot implements AgentSession { const thread = subagentThreads.get(event.toolCallId); if (thread) { const status = event.isError ? '**Failed**' : '**Complete**'; - const preview = event.content.slice(0, 800); - adapter.sendThreadMessage(thread.rootEventId, thread.chatId, `${status}\n${preview}`) - .catch(err => log.warn('Failed to post subagent result to thread:', err)); + // Post full result to thread (chunked if very long) + const maxChunk = 16000; // well under Matrix's ~65KB body limit + const content = event.content; + if (content.length <= maxChunk) { + adapter.sendThreadMessage(thread.rootEventId, thread.chatId, `${status}\n${content}`) + .catch(err => log.warn('Failed to post subagent result to thread:', err)); + } else { + // Send status header + chunked content + const chunks: string[] = []; + for (let i = 0; i < content.length; i += maxChunk) { + chunks.push(content.slice(i, i + maxChunk)); + } + (async () => { + try { + await adapter.sendThreadMessage!(thread.rootEventId, thread.chatId, `${status} (${chunks.length} parts)`); + for (const chunk of chunks) { + await adapter.sendThreadMessage!(thread.rootEventId, thread.chatId, chunk); + } + } catch (err) { + log.warn('Failed to post subagent result to thread:', err); + } + })(); + } subagentThreads.delete(event.toolCallId); } } diff --git a/src/core/formatter.ts b/src/core/formatter.ts index 26b7c4d..ea6a20a 100644 --- a/src/core/formatter.ts +++ b/src/core/formatter.ts @@ -352,9 +352,6 @@ function buildResponseDirectives(msg: InboundMessage): string[] { lines.push(`- Prefer directives over tool calls for reactions (faster and cheaper)`); } - // voice memo (always available -- TTS config is server-side) - lines.push(`- \`Your message here\` — send a voice memo via TTS`); - // file sending (only if channel supports it) if (supportsFiles) { lines.push(`- \`\` — send a file (restricted to configured directory)`); diff --git a/src/core/store.ts b/src/core/store.ts index f964193..1670349 100644 --- a/src/core/store.ts +++ b/src/core/store.ts @@ -420,6 +420,18 @@ export class Store { this.save(); } + /** + * Update a field on any named agent entry in the store. + * Used to persist auxiliary agent data (e.g. Aster's conversation ID after !reset). + */ + setAgentField(agentName: string, field: string, value: string | null): void { + if (!this.data.agents[agentName]) { + this.data.agents[agentName] = { agentId: null }; + } + (this.data.agents[agentName] as unknown as Record)[field] = value; + this.save(); + } + getInfo(): AgentStore { return { ...this.agentData() }; } diff --git a/src/tools/letta-api.ts b/src/tools/letta-api.ts index 7c37f2f..32640ea 100644 --- a/src/tools/letta-api.ts +++ b/src/tools/letta-api.ts @@ -993,3 +993,18 @@ export async function disableAllToolApprovals(agentId: string): Promise return 0; } } + +/** + * Create a fresh conversation for an existing agent. + * Used by !reset to cycle Aster's conversation alongside Ani's. + */ +export async function createConversationForAgent(agentId: string): Promise { + try { + const client = getClient(); + const conversation = await client.conversations.create({ agent_id: agentId }); + return conversation.id; + } catch (e) { + log.error('Failed to create conversation for agent:', e); + return null; + } +}