feat(bluesky): fetch parent thread context for reply posts (#592)
Co-authored-by: Letta Code <noreply@letta.com>
This commit is contained in:
@@ -92,6 +92,10 @@ export class BlueskyAdapter implements ChannelAdapter {
|
|||||||
private lastRuntimeRefreshAt?: string;
|
private lastRuntimeRefreshAt?: string;
|
||||||
private lastRuntimeReloadAt?: string;
|
private lastRuntimeReloadAt?: string;
|
||||||
private readonly handleFetchCooldownMs = 5 * 60 * 1000;
|
private readonly handleFetchCooldownMs = 5 * 60 * 1000;
|
||||||
|
private threadContextCache = new Map<string, { text: string; expiresAt: number }>();
|
||||||
|
private static readonly THREAD_CACHE_TTL_MS = 60_000;
|
||||||
|
private static readonly THREAD_CACHE_MAX = 100;
|
||||||
|
private static readonly THREAD_CONTEXT_MAX_CHARS = 1000;
|
||||||
|
|
||||||
onMessage?: (msg: InboundMessage) => Promise<void>;
|
onMessage?: (msg: InboundMessage) => Promise<void>;
|
||||||
onCommand?: (command: string) => Promise<string | null>;
|
onCommand?: (command: string) => Promise<string | null>;
|
||||||
@@ -431,6 +435,17 @@ export class BlueskyAdapter implements ChannelAdapter {
|
|||||||
const did = payload.did || 'unknown';
|
const did = payload.did || 'unknown';
|
||||||
const handle = payload.did ? this.handleByDid.get(payload.did) : undefined;
|
const handle = payload.did ? this.handleByDid.get(payload.did) : undefined;
|
||||||
const { text, messageId, source, extraContext } = this.formatCommit(payload, handle);
|
const { text, messageId, source, extraContext } = this.formatCommit(payload, handle);
|
||||||
|
|
||||||
|
// Fetch thread context for reply posts
|
||||||
|
if (source?.threadParentUri) {
|
||||||
|
const threadContext = await this.fetchThreadContext(source.threadParentUri);
|
||||||
|
if (threadContext) {
|
||||||
|
extraContext['Thread context'] = `\n${threadContext}`;
|
||||||
|
delete extraContext['Thread root'];
|
||||||
|
delete extraContext['Reply parent'];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
if (!text) {
|
if (!text) {
|
||||||
log.debug(`Dropping non-post Jetstream event: ${payload.commit?.collection} from ${did}`);
|
log.debug(`Dropping non-post Jetstream event: ${payload.commit?.collection} from ${did}`);
|
||||||
return;
|
return;
|
||||||
@@ -1298,6 +1313,16 @@ export class BlueskyAdapter implements ChannelAdapter {
|
|||||||
if (details.replyRefs.parentUri) source.threadParentUri = details.replyRefs.parentUri;
|
if (details.replyRefs.parentUri) source.threadParentUri = details.replyRefs.parentUri;
|
||||||
if (details.replyRefs.parentCid) source.threadParentCid = details.replyRefs.parentCid;
|
if (details.replyRefs.parentCid) source.threadParentCid = details.replyRefs.parentCid;
|
||||||
|
|
||||||
|
// Fetch thread context for reply posts
|
||||||
|
if (source.threadParentUri) {
|
||||||
|
const threadContext = await this.fetchThreadContext(source.threadParentUri);
|
||||||
|
if (threadContext) {
|
||||||
|
extraContext['Thread context'] = `\n${threadContext}`;
|
||||||
|
delete extraContext['Thread root'];
|
||||||
|
delete extraContext['Reply parent'];
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
const chatId = source.uri ?? authorDid;
|
const chatId = source.uri ?? authorDid;
|
||||||
this.lastPostByChatId.set(chatId, {
|
this.lastPostByChatId.set(chatId, {
|
||||||
uri: notification.uri,
|
uri: notification.uri,
|
||||||
@@ -1462,6 +1487,93 @@ export class BlueskyAdapter implements ChannelAdapter {
|
|||||||
return undefined;
|
return undefined;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Fetch parent thread context for a reply post. Returns a formatted string
|
||||||
|
* with the parent chain (root first), or null on failure.
|
||||||
|
*/
|
||||||
|
private async fetchThreadContext(parentUri: string): Promise<string | null> {
|
||||||
|
const depth = this.config.threadContextDepth ?? 5;
|
||||||
|
if (depth <= 0) return null;
|
||||||
|
|
||||||
|
// Check cache
|
||||||
|
const cached = this.threadContextCache.get(parentUri);
|
||||||
|
if (cached && cached.expiresAt > Date.now()) {
|
||||||
|
return cached.text;
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
try { await this.ensureSession(); } catch { /* auth optional for AppView */ }
|
||||||
|
const url = `${getAppViewUrl(this.config.appViewUrl)}/xrpc/app.bsky.feed.getPostThread`
|
||||||
|
+ `?uri=${encodeURIComponent(parentUri)}&depth=0&parentHeight=${depth}`;
|
||||||
|
const res = await fetchWithTimeout(url, {
|
||||||
|
headers: this.accessJwt ? { 'Authorization': `Bearer ${this.accessJwt}` } : undefined,
|
||||||
|
}, 5000);
|
||||||
|
if (!res.ok) return null;
|
||||||
|
|
||||||
|
const data = await res.json() as {
|
||||||
|
thread?: {
|
||||||
|
post?: { author?: { handle?: string }; record?: { text?: string } };
|
||||||
|
parent?: unknown;
|
||||||
|
};
|
||||||
|
};
|
||||||
|
|
||||||
|
// Walk parent chain to build chronological thread
|
||||||
|
const posts: { handle: string; text: string }[] = [];
|
||||||
|
let node = data.thread;
|
||||||
|
while (node && typeof node === 'object') {
|
||||||
|
const n = node as {
|
||||||
|
post?: { author?: { handle?: string }; record?: { text?: string } };
|
||||||
|
parent?: unknown;
|
||||||
|
};
|
||||||
|
if (n.post?.record?.text) {
|
||||||
|
posts.push({
|
||||||
|
handle: n.post.author?.handle || 'unknown',
|
||||||
|
text: n.post.record.text,
|
||||||
|
});
|
||||||
|
}
|
||||||
|
node = n.parent as typeof node | undefined;
|
||||||
|
}
|
||||||
|
|
||||||
|
if (posts.length === 0) return null;
|
||||||
|
|
||||||
|
// posts[] is most-recent-first (from the parent walk). Reverse to chronological.
|
||||||
|
posts.reverse();
|
||||||
|
|
||||||
|
// Format all lines, then keep as many recent posts as fit within the limit.
|
||||||
|
// The most recent parent (closest to the new reply) is the most important.
|
||||||
|
const lines = posts.map(p => `@${p.handle}: "${truncate(p.text, 200)}"`);
|
||||||
|
let result = '';
|
||||||
|
let startIdx = 0;
|
||||||
|
const joined = lines.join('\n');
|
||||||
|
if (joined.length <= BlueskyAdapter.THREAD_CONTEXT_MAX_CHARS) {
|
||||||
|
result = joined;
|
||||||
|
} else {
|
||||||
|
// Work backwards from the most recent, accumulating lines
|
||||||
|
let budget = BlueskyAdapter.THREAD_CONTEXT_MAX_CHARS - '[...earlier posts truncated]\n'.length;
|
||||||
|
for (let i = lines.length - 1; i >= 0; i--) {
|
||||||
|
if (lines[i].length + 1 > budget) { // +1 for newline
|
||||||
|
startIdx = i + 1;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
budget -= lines[i].length + 1;
|
||||||
|
}
|
||||||
|
result = '[...earlier posts truncated]\n' + lines.slice(startIdx).join('\n');
|
||||||
|
}
|
||||||
|
|
||||||
|
// Cache result
|
||||||
|
this.threadContextCache.set(parentUri, {
|
||||||
|
text: result,
|
||||||
|
expiresAt: Date.now() + BlueskyAdapter.THREAD_CACHE_TTL_MS,
|
||||||
|
});
|
||||||
|
pruneMap(this.threadContextCache, BlueskyAdapter.THREAD_CACHE_MAX);
|
||||||
|
|
||||||
|
return result;
|
||||||
|
} catch (err) {
|
||||||
|
log.warn('Failed to fetch thread context:', err instanceof Error ? err.message : err);
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private loadState(): void {
|
private loadState(): void {
|
||||||
if (!this.statePath || !this.config.agentName) return;
|
if (!this.statePath || !this.config.agentName) return;
|
||||||
if (!existsSync(this.statePath)) return;
|
if (!existsSync(this.statePath)) return;
|
||||||
|
|||||||
@@ -49,6 +49,8 @@ export interface BlueskyConfig {
|
|||||||
reasons?: string[] | string;
|
reasons?: string[] | string;
|
||||||
backfill?: boolean;
|
backfill?: boolean;
|
||||||
};
|
};
|
||||||
|
/** Max parent posts to fetch for thread context on replies (0 to disable). Default: 5. */
|
||||||
|
threadContextDepth?: number;
|
||||||
}
|
}
|
||||||
|
|
||||||
export interface JetstreamCommit {
|
export interface JetstreamCommit {
|
||||||
|
|||||||
Reference in New Issue
Block a user