From 039707387ea73c99854f60be77f48ca9efd89da6 Mon Sep 17 00:00:00 2001 From: Cameron Date: Fri, 13 Mar 2026 14:17:33 -0700 Subject: [PATCH] feat(bluesky): fetch parent thread context for reply posts (#592) Co-authored-by: Letta Code --- src/channels/bluesky/adapter.ts | 112 ++++++++++++++++++++++++++++++++ src/channels/bluesky/types.ts | 2 + 2 files changed, 114 insertions(+) diff --git a/src/channels/bluesky/adapter.ts b/src/channels/bluesky/adapter.ts index ad00f2e..0c5b3d5 100644 --- a/src/channels/bluesky/adapter.ts +++ b/src/channels/bluesky/adapter.ts @@ -92,6 +92,10 @@ export class BlueskyAdapter implements ChannelAdapter { private lastRuntimeRefreshAt?: string; private lastRuntimeReloadAt?: string; private readonly handleFetchCooldownMs = 5 * 60 * 1000; + private threadContextCache = new Map(); + private static readonly THREAD_CACHE_TTL_MS = 60_000; + private static readonly THREAD_CACHE_MAX = 100; + private static readonly THREAD_CONTEXT_MAX_CHARS = 1000; onMessage?: (msg: InboundMessage) => Promise; onCommand?: (command: string) => Promise; @@ -431,6 +435,17 @@ export class BlueskyAdapter implements ChannelAdapter { const did = payload.did || 'unknown'; const handle = payload.did ? this.handleByDid.get(payload.did) : undefined; const { text, messageId, source, extraContext } = this.formatCommit(payload, handle); + + // Fetch thread context for reply posts + if (source?.threadParentUri) { + const threadContext = await this.fetchThreadContext(source.threadParentUri); + if (threadContext) { + extraContext['Thread context'] = `\n${threadContext}`; + delete extraContext['Thread root']; + delete extraContext['Reply parent']; + } + } + if (!text) { log.debug(`Dropping non-post Jetstream event: ${payload.commit?.collection} from ${did}`); return; @@ -1298,6 +1313,16 @@ export class BlueskyAdapter implements ChannelAdapter { if (details.replyRefs.parentUri) source.threadParentUri = details.replyRefs.parentUri; if (details.replyRefs.parentCid) source.threadParentCid = details.replyRefs.parentCid; + // Fetch thread context for reply posts + if (source.threadParentUri) { + const threadContext = await this.fetchThreadContext(source.threadParentUri); + if (threadContext) { + extraContext['Thread context'] = `\n${threadContext}`; + delete extraContext['Thread root']; + delete extraContext['Reply parent']; + } + } + const chatId = source.uri ?? authorDid; this.lastPostByChatId.set(chatId, { uri: notification.uri, @@ -1462,6 +1487,93 @@ export class BlueskyAdapter implements ChannelAdapter { return undefined; } + /** + * Fetch parent thread context for a reply post. Returns a formatted string + * with the parent chain (root first), or null on failure. + */ + private async fetchThreadContext(parentUri: string): Promise { + const depth = this.config.threadContextDepth ?? 5; + if (depth <= 0) return null; + + // Check cache + const cached = this.threadContextCache.get(parentUri); + if (cached && cached.expiresAt > Date.now()) { + return cached.text; + } + + try { + try { await this.ensureSession(); } catch { /* auth optional for AppView */ } + const url = `${getAppViewUrl(this.config.appViewUrl)}/xrpc/app.bsky.feed.getPostThread` + + `?uri=${encodeURIComponent(parentUri)}&depth=0&parentHeight=${depth}`; + const res = await fetchWithTimeout(url, { + headers: this.accessJwt ? { 'Authorization': `Bearer ${this.accessJwt}` } : undefined, + }, 5000); + if (!res.ok) return null; + + const data = await res.json() as { + thread?: { + post?: { author?: { handle?: string }; record?: { text?: string } }; + parent?: unknown; + }; + }; + + // Walk parent chain to build chronological thread + const posts: { handle: string; text: string }[] = []; + let node = data.thread; + while (node && typeof node === 'object') { + const n = node as { + post?: { author?: { handle?: string }; record?: { text?: string } }; + parent?: unknown; + }; + if (n.post?.record?.text) { + posts.push({ + handle: n.post.author?.handle || 'unknown', + text: n.post.record.text, + }); + } + node = n.parent as typeof node | undefined; + } + + if (posts.length === 0) return null; + + // posts[] is most-recent-first (from the parent walk). Reverse to chronological. + posts.reverse(); + + // Format all lines, then keep as many recent posts as fit within the limit. + // The most recent parent (closest to the new reply) is the most important. + const lines = posts.map(p => `@${p.handle}: "${truncate(p.text, 200)}"`); + let result = ''; + let startIdx = 0; + const joined = lines.join('\n'); + if (joined.length <= BlueskyAdapter.THREAD_CONTEXT_MAX_CHARS) { + result = joined; + } else { + // Work backwards from the most recent, accumulating lines + let budget = BlueskyAdapter.THREAD_CONTEXT_MAX_CHARS - '[...earlier posts truncated]\n'.length; + for (let i = lines.length - 1; i >= 0; i--) { + if (lines[i].length + 1 > budget) { // +1 for newline + startIdx = i + 1; + break; + } + budget -= lines[i].length + 1; + } + result = '[...earlier posts truncated]\n' + lines.slice(startIdx).join('\n'); + } + + // Cache result + this.threadContextCache.set(parentUri, { + text: result, + expiresAt: Date.now() + BlueskyAdapter.THREAD_CACHE_TTL_MS, + }); + pruneMap(this.threadContextCache, BlueskyAdapter.THREAD_CACHE_MAX); + + return result; + } catch (err) { + log.warn('Failed to fetch thread context:', err instanceof Error ? err.message : err); + return null; + } + } + private loadState(): void { if (!this.statePath || !this.config.agentName) return; if (!existsSync(this.statePath)) return; diff --git a/src/channels/bluesky/types.ts b/src/channels/bluesky/types.ts index a45bcee..9690b6c 100644 --- a/src/channels/bluesky/types.ts +++ b/src/channels/bluesky/types.ts @@ -49,6 +49,8 @@ export interface BlueskyConfig { reasons?: string[] | string; backfill?: boolean; }; + /** Max parent posts to fetch for thread context on replies (0 to disable). Default: 5. */ + threadContextDepth?: number; } export interface JetstreamCommit {