diff --git a/README.md b/README.md
index 19332a4..93f6840 100644
--- a/README.md
+++ b/README.md
@@ -1,6 +1,6 @@
# LettaBot
-Your personal AI assistant that remembers everything across **Telegram, Slack, Discord, WhatsApp, and Signal**. Powered by the [Letta Code SDK](https://github.com/letta-ai/letta-code-sdk).
+Your personal AI assistant that remembers everything across **Telegram, Slack, Discord, WhatsApp, and Signal** β plus Bluesky Jetstream feed ingestion. Powered by the [Letta Code SDK](https://github.com/letta-ai/letta-code-sdk).
@@ -16,6 +16,7 @@ Your personal AI assistant that remembers everything across **Telegram, Slack, D
## Features
- **Multi-Channel** - Chat seamlessly across Telegram, Slack, Discord, WhatsApp, and Signal
+- **Feed Ingestion** - Read-only Bluesky Jetstream stream for selected DID(s)
- **Unified Memory** - Single agent remembers everything from all channels
- **Persistent Memory** - Agent remembers conversations across sessions (days/weeks/months)
- **Local Tool Execution** - Agent can read files, search code, run commands on your machine
@@ -236,6 +237,7 @@ agents:
| Discord | [Setup Guide](docs/discord-setup.md) | Discord bot + Message Content Intent |
| WhatsApp | [Setup Guide](docs/whatsapp-setup.md) | Phone with WhatsApp |
| Signal | [Setup Guide](docs/signal-setup.md) | signal-cli + phone number |
+| Bluesky (read-only) | [Setup Guide](docs/bluesky-setup.md) | Jetstream WebSocket + DID filter |
At least one channel is required. Telegram is the easiest to start with.
diff --git a/bluesky-jetstream.json b/bluesky-jetstream.json
new file mode 100644
index 0000000..c1145af
--- /dev/null
+++ b/bluesky-jetstream.json
@@ -0,0 +1,19 @@
+{
+ "version": 1,
+ "updatedAt": "2026-03-10T19:26:51.496Z",
+ "agents": {
+ "LettaBot": {
+ "cursor": 1773168967870655,
+ "wantedDids": [
+ "did:plc:gfrmhdmjvxn2sjedzboeudef"
+ ],
+ "wantedCollections": [
+ "app.bsky.feed.post"
+ ],
+ "auth": {
+ "did": "did:plc:gfrmhdmjvxn2sjedzboeudef",
+ "handle": "cameron.stream"
+ }
+ }
+ }
+}
\ No newline at end of file
diff --git a/docs/bluesky-setup.md b/docs/bluesky-setup.md
new file mode 100644
index 0000000..2b037a0
--- /dev/null
+++ b/docs/bluesky-setup.md
@@ -0,0 +1,184 @@
+# Bluesky Jetstream Setup
+
+LettaBot can ingest Bluesky events using the Jetstream WebSocket feed. This channel is read-only by default, with optional reply posting if you provide a Bluesky app password.
+
+## Overview
+
+- Jetstream provides a firehose of ATProto commit events.
+- You filter by DID(s) and optionally by collection.
+- Events are delivered to the agent in listening mode by default (read-only).
+- If enabled, the bot can auto-reply to posts using the ATProto XRPC API.
+
+## Configuration (lettabot.yaml)
+
+```yaml
+channels:
+ bluesky:
+ enabled: true
+ # autoReply: true # Enable auto-replies (default: false / read-only)
+ wantedDids: ["did:plc:..."]
+ # lists:
+ # "at://did:plc:.../app.bsky.graph.list/xyz": { mode: listen }
+ # wantedCollections: ["app.bsky.feed.post"]
+ # notifications:
+ # enabled: true
+ # intervalSec: 60
+ # reasons: ["mention", "reply", "quote"]
+ # handle: you.bsky.social
+ # appPassword: xxxx-xxxx-xxxx-xxxx
+ # serviceUrl: https://bsky.social
+ # appViewUrl: https://public.api.bsky.app
+```
+
+### Conversation routing
+
+If you want Bluesky to keep its own conversation history while other channels stay shared, add a per-channel override:
+
+```yaml
+conversations:
+ mode: shared
+ perChannel: ["bluesky"]
+```
+
+### Filters (how Jetstream is narrowed)
+
+- `wantedDids`: list of DID(s) to include. Multiple entries are ORed.
+- `wantedCollections`: list of collections to include. Multiple entries are ORed.
+- Both filters are ANDed together.
+ - Example: wantedDids=[A] + wantedCollections=[app.bsky.feed.post] => only posts by DID A.
+
+If you omit `wantedCollections`, you'll see all collections for the included DIDs (posts, likes, reposts, follows, blocks, etc.).
+
+If there are no `wantedDids` (after list expansion), Jetstream does not connect. Notifications polling can still run if auth is configured.
+
+### Manual posting (skill/CLI)
+
+Bluesky is read-only by default. To post, reply, like, or repost, use the CLI:
+
+```bash
+lettabot-bluesky post --text "Hello" --agent
+lettabot-bluesky post --reply-to at://did:plc:.../app.bsky.feed.post/... --text "Reply" --agent
+lettabot-bluesky like at://did:plc:.../app.bsky.feed.post/... --agent
+lettabot-bluesky repost at://did:plc:.../app.bsky.feed.post/... --agent
+```
+
+Posts over 300 characters require `--threaded` to explicitly split into a reply thread.
+
+If there are **no** `wantedDids` (after list expansion), Jetstream does **not** connect. Notifications polling can still run if auth is configured.
+
+### Mentions
+
+Jetstream does not provide mention notifications. Mentions are surfaced via the Notifications API (see below). `mention-only` mode only triggers replies for mention notifications.
+
+## Notifications (mentions, replies, likes, etc.)
+
+Jetstream does not include notifications. To get mentions/replies like the Bluesky app, enable polling via the Notifications API:
+
+```yaml
+channels:
+ bluesky:
+ notifications:
+ enabled: true
+ intervalSec: 60
+ reasons: ["mention", "reply", "quote"]
+```
+
+If you supply posting credentials (`handle` + `appPassword`) and do not explicitly disable notifications, polling is enabled with defaults (60s, reasons: mention/reply/quote). Notifications polling works even if `wantedDids` is empty.
+
+Notification reasons include (non-exhaustive): `like`, `repost`, `follow`, `mention`, `reply`, `quote`, `starterpack-joined`, `verified`, `unverified`, `like-via-repost`, `repost-via-repost`, `subscribed-post`.
+
+Only `mention`, `reply`, and `quote` are considered "actionable" for reply behavior (based on your `groups` mode). Other reasons are always listening-only.
+
+## Runtime Kill Switch (per agent)
+
+Disable or re-enable Bluesky without restarting the server:
+
+```bash
+lettabot bluesky disable --agent MyAgent
+lettabot bluesky enable --agent MyAgent
+```
+
+Refresh list expansions on the running server:
+
+```bash
+lettabot bluesky refresh-lists --agent MyAgent
+```
+
+Kill switch state is stored in `bluesky-runtime.json` (per agent) under the data directory and polled by the running server.
+
+When you use `bluesky add-did`, `bluesky add-list`, or `bluesky set-default`, the CLI also triggers a runtime config reload so the running server updates Jetstream subscriptions without restart.
+
+## Per-DID Modes (using `groups` syntax)
+
+Bluesky uses the same `groups` pattern as other channels, where `"*"` is the default:
+
+```yaml
+channels:
+ bluesky:
+ enabled: true
+ wantedDids: ["did:plc:author1"]
+ groups:
+ "*": { mode: listen }
+ "did:plc:author1": { mode: open }
+ "did:plc:author2": { mode: listen }
+ "did:plc:spammy": { mode: disabled }
+```
+
+Mode mapping:
+- `open` -> reply to posts for that DID
+- `listen` -> listening-only
+- `mention-only` -> reply only for mention notifications
+- `disabled` -> ignore events from that DID
+
+Default behavior:
+- If `"*"` is set, it is used as the default for any DID without an explicit override.
+- If `"*"` is not set, default is `listen`.
+
+## Lists
+
+You can target a Bluesky list by URI and assign a mode. On startup, the list is expanded to member DIDs and added to the stream filter.
+
+```yaml
+channels:
+ bluesky:
+ lists:
+ "at://did:plc:.../app.bsky.graph.list/xyz": { mode: listen }
+```
+
+If a DID appears in both `groups` and a list, the explicit `groups` mode wins.
+
+List expansion uses the AppView API (default: `https://public.api.bsky.app`). Set `appViewUrl` if you need a different AppView (e.g., for private lists).
+
+## Reply Posting (optional)
+
+To allow replies, set posting credentials and choose a default mode that allows replies (`open` or `mention-only`):
+
+```yaml
+channels:
+ bluesky:
+ groups:
+ "*": { mode: open }
+ handle: you.bsky.social
+ appPassword: xxxx-xxxx-xxxx-xxxx
+```
+
+Notes:
+- You must use a Bluesky app password (Settings -> App Passwords).
+- Replies are posted only for `app.bsky.feed.post` events.
+- Replies go to the latest post from the DID currently being processed.
+- Posts are capped to 300 characters.
+
+## Embeds (summary output)
+
+Post embeds are summarized in a compact form, for example:
+- `Embed: 2 image(s) (alt: ...)`
+- `Embed: link "Title" https://...`
+- `Embed: record at://...`
+
+## Troubleshooting
+
+### No messages appearing
+- Ensure `wantedDids` contains DID values (e.g. `did:plc:...`), not handles.
+- Confirm `wantedCollections` isn't filtering out posts (omit it to see all collections).
+- Check logs for the warning about missing `wantedDids` (firehose may be too noisy).
+- Verify the Jetstream URL is reachable.
diff --git a/lettabot.example.yaml b/lettabot.example.yaml
index 7d1712f..cad479d 100644
--- a/lettabot.example.yaml
+++ b/lettabot.example.yaml
@@ -1,6 +1,6 @@
# LettaBot Configuration
# Copy this to lettabot.yaml and fill in your values.
-#
+#
# Server modes:
# - 'api': Use Letta API (api.letta.com) with API key
# - 'docker': Use a Docker/custom Letta server
@@ -61,6 +61,24 @@ agents:
# whatsapp:
# enabled: true
# selfChat: false
+ # bluesky:
+ # enabled: true
+ # wantedDids: ["did:plc:..."]
+ # # groups:
+ # # "*": { mode: listen } # listen = observe only; open = observe + auto-reply; mention-only = auto-reply to @mentions only
+ # # "did:plc:...": { mode: open }
+ # # lists:
+ # # "at://did:plc:.../app.bsky.graph.list/xyz": { mode: listen }
+ # # wantedCollections: ["app.bsky.feed.post"]
+ # # notifications:
+ # # enabled: true
+ # # intervalSec: 60
+ # # reasons: ["mention", "reply", "quote"] # also: like, repost, follow
+ # # backfill: false # process initial backlog on startup
+ # # jetstreamUrl: wss://jetstream2.us-east.bsky.network/subscribe
+ # # handle: you.bsky.social
+ # # appPassword: xxxx-xxxx-xxxx-xxxx
+ # # serviceUrl: https://bsky.social
# BYOK Providers (optional, api mode only)
# These will be synced to Letta API on startup
@@ -93,7 +111,7 @@ features:
# display:
# showToolCalls: false # Show tool invocations in chat (e.g. "Using tool: Read (file_path: ...)")
# showReasoning: false # Show agent reasoning/thinking in chat
- # reasoningMaxChars: 0 # Truncate reasoning to N chars (0 = no limit, default)
+ # reasoningMaxChars: 0 # Truncate reasoning to N chars (0 = no limit, default)
# Attachment handling (defaults to 20MB if omitted)
# attachments:
diff --git a/package-lock.json b/package-lock.json
index d05ef47..21d6e0f 100644
--- a/package-lock.json
+++ b/package-lock.json
@@ -9,6 +9,7 @@
"version": "0.2.0",
"license": "Apache-2.0",
"dependencies": {
+ "@atproto/api": "^0.19.1",
"@clack/prompts": "^0.11.0",
"@hapi/boom": "^10.0.1",
"@letta-ai/letta-client": "^1.7.12",
@@ -35,6 +36,7 @@
},
"bin": {
"lettabot": "dist/cli.js",
+ "lettabot-bluesky": "dist/channels/bluesky/cli.js",
"lettabot-channels": "dist/cli/channels.js",
"lettabot-history": "dist/cli/history.js",
"lettabot-message": "dist/cli/message.js",
@@ -91,6 +93,88 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
+ "node_modules/@atproto/api": {
+ "version": "0.19.3",
+ "resolved": "https://registry.npmjs.org/@atproto/api/-/api-0.19.3.tgz",
+ "integrity": "sha512-G8YpBpRouHdTAIagi/QQIUZOhGd1jfBQWkJy9QfxAzjjEpPvaVOSk4e1S85QzGLm/xbzVONzGkmdtiOSfP6wVg==",
+ "license": "MIT",
+ "dependencies": {
+ "@atproto/common-web": "^0.4.18",
+ "@atproto/lexicon": "^0.6.2",
+ "@atproto/syntax": "^0.5.0",
+ "@atproto/xrpc": "^0.7.7",
+ "await-lock": "^2.2.2",
+ "multiformats": "^9.9.0",
+ "tlds": "^1.234.0",
+ "zod": "^3.23.8"
+ }
+ },
+ "node_modules/@atproto/common-web": {
+ "version": "0.4.18",
+ "resolved": "https://registry.npmjs.org/@atproto/common-web/-/common-web-0.4.18.tgz",
+ "integrity": "sha512-ilImzP+9N/mtse440kN60pGrEzG7wi4xsV13nGeLrS+Zocybc/ISOpKlbZM13o+twPJ+Q7veGLw9CtGg0GAFoQ==",
+ "license": "MIT",
+ "dependencies": {
+ "@atproto/lex-data": "^0.0.13",
+ "@atproto/lex-json": "^0.0.13",
+ "@atproto/syntax": "^0.5.0",
+ "zod": "^3.23.8"
+ }
+ },
+ "node_modules/@atproto/lex-data": {
+ "version": "0.0.13",
+ "resolved": "https://registry.npmjs.org/@atproto/lex-data/-/lex-data-0.0.13.tgz",
+ "integrity": "sha512-7Z7RwZ1Y/JzBF/Tcn/I4UJ/vIGfh5zn1zjv0KX+flke2JtgFkSE8uh2hOtqgBQMNqE3zdJFM+dcSWln86hR3MQ==",
+ "license": "MIT",
+ "dependencies": {
+ "multiformats": "^9.9.0",
+ "tslib": "^2.8.1",
+ "uint8arrays": "3.0.0",
+ "unicode-segmenter": "^0.14.0"
+ }
+ },
+ "node_modules/@atproto/lex-json": {
+ "version": "0.0.13",
+ "resolved": "https://registry.npmjs.org/@atproto/lex-json/-/lex-json-0.0.13.tgz",
+ "integrity": "sha512-hwLhkKaIHulGJpt0EfXAEWdrxqM2L1tV/tvilzhMp3QxPqYgXchFnrfVmLsyFDx6P6qkH1GsX/XC2V36U0UlPQ==",
+ "license": "MIT",
+ "dependencies": {
+ "@atproto/lex-data": "^0.0.13",
+ "tslib": "^2.8.1"
+ }
+ },
+ "node_modules/@atproto/lexicon": {
+ "version": "0.6.2",
+ "resolved": "https://registry.npmjs.org/@atproto/lexicon/-/lexicon-0.6.2.tgz",
+ "integrity": "sha512-p3Ly6hinVZW0ETuAXZMeUGwuMm3g8HvQMQ41yyEE6AL0hAkfeKFaZKos6BdBrr6CjkpbrDZqE8M+5+QOceysMw==",
+ "license": "MIT",
+ "dependencies": {
+ "@atproto/common-web": "^0.4.18",
+ "@atproto/syntax": "^0.5.0",
+ "iso-datestring-validator": "^2.2.2",
+ "multiformats": "^9.9.0",
+ "zod": "^3.23.8"
+ }
+ },
+ "node_modules/@atproto/syntax": {
+ "version": "0.5.0",
+ "resolved": "https://registry.npmjs.org/@atproto/syntax/-/syntax-0.5.0.tgz",
+ "integrity": "sha512-UA2DSpGdOQzUQ4gi5SH+NEJz/YR3a3Fg3y2oh+xETDSiTRmA4VhHRCojhXAVsBxUT6EnItw190C/KN+DWW90kw==",
+ "license": "MIT",
+ "dependencies": {
+ "tslib": "^2.8.1"
+ }
+ },
+ "node_modules/@atproto/xrpc": {
+ "version": "0.7.7",
+ "resolved": "https://registry.npmjs.org/@atproto/xrpc/-/xrpc-0.7.7.tgz",
+ "integrity": "sha512-K1ZyO/BU8JNtXX5dmPp7b5UrkLMMqpsIa/Lrj5D3Su+j1Xwq1m6QJ2XJ1AgjEjkI1v4Muzm7klianLE6XGxtmA==",
+ "license": "MIT",
+ "dependencies": {
+ "@atproto/lexicon": "^0.6.0",
+ "zod": "^3.23.8"
+ }
+ },
"node_modules/@borewit/text-codec": {
"version": "0.2.1",
"resolved": "https://registry.npmjs.org/@borewit/text-codec/-/text-codec-0.2.1.tgz",
@@ -2842,6 +2926,12 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
+ "node_modules/await-lock": {
+ "version": "2.2.2",
+ "resolved": "https://registry.npmjs.org/await-lock/-/await-lock-2.2.2.tgz",
+ "integrity": "sha512-aDczADvlvTGajTDjcjpJMqRkOF6Qdz3YbPZm/PyW6tKPkx2hlYBzxMhEywM/tU72HrVZjgl5VCdRuMlA7pZ8Gw==",
+ "license": "MIT"
+ },
"node_modules/axios": {
"version": "1.13.3",
"resolved": "https://registry.npmjs.org/axios/-/axios-1.13.3.tgz",
@@ -5031,6 +5121,12 @@
"integrity": "sha512-RHxMLp9lnKHGHRng9QFhRCMbYAcVpn69smSGcq3f36xjgVVWThj4qqLbTLlq7Ssj8B+fIQ1EuCEGI2lKsyQeIw==",
"license": "ISC"
},
+ "node_modules/iso-datestring-validator": {
+ "version": "2.2.2",
+ "resolved": "https://registry.npmjs.org/iso-datestring-validator/-/iso-datestring-validator-2.2.2.tgz",
+ "integrity": "sha512-yLEMkBbLZTlVQqOnQ4FiMujR6T4DEcCb1xizmvXS+OxuhwcbtynoosRzdMA69zZCShCNAbi+gJ71FxZBBXx1SA==",
+ "license": "MIT"
+ },
"node_modules/jackspeak": {
"version": "3.4.3",
"resolved": "https://registry.npmjs.org/jackspeak/-/jackspeak-3.4.3.tgz",
@@ -6257,6 +6353,12 @@
"integrity": "sha512-6FlzubTLZG3J2a/NVCAleEhjzq5oxgHyaCU9yYXvcLsvoVaHJq/s5xXI6/XXP6tz7R9xAOtHnSO/tXtF3WRTlA==",
"license": "MIT"
},
+ "node_modules/multiformats": {
+ "version": "9.9.0",
+ "resolved": "https://registry.npmjs.org/multiformats/-/multiformats-9.9.0.tgz",
+ "integrity": "sha512-HoMUjhH9T8DDBNT+6xzkrd9ga/XiBI4xLr58LJACwK6G3HTOPeMz4nB4KJs33L2BelrIJa7P0VuNaVF3hMYfjg==",
+ "license": "(Apache-2.0 AND MIT)"
+ },
"node_modules/music-metadata": {
"version": "11.11.0",
"resolved": "https://registry.npmjs.org/music-metadata/-/music-metadata-11.11.0.tgz",
@@ -8489,6 +8591,15 @@
"node": ">=14.0.0"
}
},
+ "node_modules/tlds": {
+ "version": "1.261.0",
+ "resolved": "https://registry.npmjs.org/tlds/-/tlds-1.261.0.tgz",
+ "integrity": "sha512-QXqwfEl9ddlGBaRFXIvNKK6OhipSiLXuRuLJX5DErz0o0Q0rYxulWLdFryTkV5PkdZct5iMInwYEGe/eR++1AA==",
+ "license": "MIT",
+ "bin": {
+ "tlds": "bin.js"
+ }
+ },
"node_modules/toidentifier": {
"version": "1.0.1",
"resolved": "https://registry.npmjs.org/toidentifier/-/toidentifier-1.0.1.tgz",
@@ -8545,8 +8656,7 @@
"version": "2.8.1",
"resolved": "https://registry.npmjs.org/tslib/-/tslib-2.8.1.tgz",
"integrity": "sha512-oJFu94HQb+KVduSUQL7wnpmqnfmLsOA/nAh6b6EH0wCEoK0/mPeXU6c3wKDV83MkOuHPRHtSXKKU99IBazS/2w==",
- "license": "0BSD",
- "optional": true
+ "license": "0BSD"
},
"node_modules/tsscmp": {
"version": "1.0.6",
@@ -8629,6 +8739,15 @@
"url": "https://github.com/sponsors/sindresorhus"
}
},
+ "node_modules/uint8arrays": {
+ "version": "3.0.0",
+ "resolved": "https://registry.npmjs.org/uint8arrays/-/uint8arrays-3.0.0.tgz",
+ "integrity": "sha512-HRCx0q6O9Bfbp+HHSfQQKD7wU70+lydKVt4EghkdOvlK/NlrF90z+eXV34mUd48rNvVJXwkrMSPpCATkct8fJA==",
+ "license": "MIT",
+ "dependencies": {
+ "multiformats": "^9.4.2"
+ }
+ },
"node_modules/undici": {
"version": "6.21.3",
"resolved": "https://registry.npmjs.org/undici/-/undici-6.21.3.tgz",
@@ -8645,6 +8764,12 @@
"integrity": "sha512-Zz+aZWSj8LE6zoxD+xrjh4VfkIG8Ya6LvYkZqtUQGJPZjYl53ypCaUwWqo7eI0x66KBGeRo+mlBEkMSeSZ38Nw==",
"license": "MIT"
},
+ "node_modules/unicode-segmenter": {
+ "version": "0.14.5",
+ "resolved": "https://registry.npmjs.org/unicode-segmenter/-/unicode-segmenter-0.14.5.tgz",
+ "integrity": "sha512-jHGmj2LUuqDcX3hqY12Ql+uhUTn8huuxNZGq7GvtF6bSybzH3aFgedYu/KTzQStEgt1Ra2F3HxadNXsNjb3m3g==",
+ "license": "MIT"
+ },
"node_modules/unified": {
"version": "11.0.5",
"resolved": "https://registry.npmjs.org/unified/-/unified-11.0.5.tgz",
@@ -9757,6 +9882,15 @@
"license": "MIT",
"peer": true
},
+ "node_modules/zod": {
+ "version": "3.25.76",
+ "resolved": "https://registry.npmjs.org/zod/-/zod-3.25.76.tgz",
+ "integrity": "sha512-gzUt/qt81nXsFGKIFcC3YnfEAx5NkunCfnDlvuBSSFS02bcXu4Lmea0AFIUwbLWxWPx3d9p8S5QoaujKcNQxcQ==",
+ "license": "MIT",
+ "funding": {
+ "url": "https://github.com/sponsors/colinhacks"
+ }
+ },
"node_modules/zwitch": {
"version": "2.0.4",
"resolved": "https://registry.npmjs.org/zwitch/-/zwitch-2.0.4.tgz",
diff --git a/package.json b/package.json
index 62d5009..54b70a1 100644
--- a/package.json
+++ b/package.json
@@ -7,6 +7,7 @@
"lettabot": "./dist/cli.js",
"lettabot-schedule": "./dist/cron/cli.js",
"lettabot-message": "./dist/cli/message.js",
+ "lettabot-bluesky": "./dist/channels/bluesky/cli.js",
"lettabot-react": "./dist/cli/react.js",
"lettabot-history": "./dist/cli/history.js",
"lettabot-channels": "./dist/cli/channels.js"
@@ -64,6 +65,7 @@
"patches/"
],
"dependencies": {
+ "@atproto/api": "^0.19.1",
"@clack/prompts": "^0.11.0",
"@hapi/boom": "^10.0.1",
"@letta-ai/letta-client": "^1.7.12",
diff --git a/skills/bluesky/SKILL.md b/skills/bluesky/SKILL.md
new file mode 100644
index 0000000..bd49b14
--- /dev/null
+++ b/skills/bluesky/SKILL.md
@@ -0,0 +1,87 @@
+---
+name: bluesky
+description: Post, reply, like, and repost on Bluesky using the lettabot-bluesky CLI. Read-only by default; explicit actions required.
+metadata: |
+ {
+ "clawdbot": {
+ "emoji": "π¦",
+ "primaryEnv": "BLUESKY_HANDLE"
+ }
+ }
+---
+
+# Bluesky
+
+Bluesky is **read-only by default** in Lettabot. To post, reply, like, or repost you must use the `lettabot-bluesky` CLI.
+
+## Availability
+
+When this skill is enabled, it ships a skill-local `lettabot-bluesky` shim,
+so the command is available to agent subprocesses without separate npm install.
+
+Both entrypoints are supported and equivalent:
+
+```bash
+lettabot-bluesky ...
+lettabot bluesky ...
+```
+
+The shim prefers project-local entrypoints (`./dist/cli.js` or `./src/cli.ts`) before falling back to an installed `lettabot` binary on PATH.
+
+## Quick Reference
+
+```bash
+lettabot-bluesky post --text "Hello" --agent
+lettabot-bluesky post --reply-to at://did:plc:.../app.bsky.feed.post/... --text "Reply" --agent
+lettabot-bluesky post --text "Long..." --threaded --agent
+lettabot-bluesky post --text "Check this out" --image data/outbound/photo.jpg --alt "Alt text" --agent
+lettabot-bluesky post --text "Gallery" --image data/outbound/a.jpg --alt "First" --image data/outbound/b.jpg --alt "Second" --agent
+lettabot-bluesky like at://did:plc:.../app.bsky.feed.post/... --agent
+lettabot-bluesky repost at://did:plc:.../app.bsky.feed.post/... --agent
+lettabot-bluesky repost at://did:plc:.../app.bsky.feed.post/... --text "Quote" --agent [--threaded]
+```
+
+## Read Commands (public API)
+
+```bash
+lettabot-bluesky profile --agent
+lettabot-bluesky thread --agent
+lettabot-bluesky author-feed --limit 25 --cursor --agent
+lettabot-bluesky list-feed --limit 25 --cursor --agent
+lettabot-bluesky resolve --agent
+lettabot-bluesky followers --limit 25 --agent
+lettabot-bluesky follows --limit 25 --agent
+lettabot-bluesky lists --limit 25 --agent
+lettabot-bluesky actor-feeds --limit 25 --agent
+```
+
+## AuthβRequired Reads (uses app password)
+
+```bash
+lettabot-bluesky search --query "memory agents" --limit 25 --cursor --agent
+lettabot-bluesky timeline --limit 25 --cursor --agent
+lettabot-bluesky notifications --limit 25 --cursor --reasons mention,reply --agent
+```
+
+## Moderation (Mute / Block)
+
+```bash
+lettabot-bluesky mute --agent
+lettabot-bluesky unmute --agent
+lettabot-bluesky block --agent
+lettabot-bluesky unblock --agent
+lettabot-bluesky blocks --limit 50 --cursor --agent
+lettabot-bluesky mutes --limit 50 --cursor --agent
+```
+
+Notes:
+- `unblock` requires the **block record URI** (returned by the `block` command).
+- Pagination: many commands support `--cursor` (use the `cursor` field from the previous response).
+
+## Notes
+
+- Posts are capped at 300 characters unless you pass `--threaded`.
+- `--threaded` splits text into a reply thread (explicit optβin).
+- Replies and quotes require the target `at://` URI (included in incoming Bluesky messages).
+- The CLI uses the Bluesky app password from your `lettabot.yaml` for the selected agent.
+- **Images**: up to 4 per post; supported formats: JPEG, PNG, GIF, WebP. Use `--image ` (up to 4Γ) and `--alt ` after each image for alt text. `--alt` applies to the immediately preceding `--image`. Images must be inside the configured `sendFileDir` (default: `data/outbound`). Images attach to the first post only when `--threaded`.
diff --git a/skills/bluesky/lettabot-bluesky b/skills/bluesky/lettabot-bluesky
new file mode 100755
index 0000000..62e6b78
--- /dev/null
+++ b/skills/bluesky/lettabot-bluesky
@@ -0,0 +1,23 @@
+#!/usr/bin/env bash
+# Skill-local shim so `lettabot-bluesky` is available to agent subprocesses
+# whenever the Bluesky skill is installed.
+
+set -euo pipefail
+
+# Prefer project-local entrypoints so development worktrees run their current code.
+if [[ -f "./dist/cli.js" ]]; then
+ exec node "./dist/cli.js" bluesky "$@"
+fi
+
+if [[ -f "./src/cli.ts" ]] && command -v npx >/dev/null 2>&1; then
+ exec npx tsx "./src/cli.ts" bluesky "$@"
+fi
+
+# Fallback to an installed base CLI.
+if command -v lettabot >/dev/null 2>&1; then
+ exec lettabot bluesky "$@"
+fi
+
+echo "Error: unable to resolve a Bluesky CLI entrypoint." >&2
+echo "Expected one of: ./dist/cli.js, ./src/cli.ts (via npx tsx), or lettabot on PATH." >&2
+exit 1
\ No newline at end of file
diff --git a/src/channels/bluesky.test.ts b/src/channels/bluesky.test.ts
new file mode 100644
index 0000000..f5abe58
--- /dev/null
+++ b/src/channels/bluesky.test.ts
@@ -0,0 +1,397 @@
+import { describe, expect, it, vi, beforeEach, afterEach } from 'vitest';
+import { mkdirSync, writeFileSync, readFileSync, rmSync } from 'fs';
+import { join } from 'path';
+import { tmpdir } from 'os';
+import { BlueskyAdapter } from './bluesky.js';
+import { splitPostText } from './bluesky/utils.js';
+
+vi.mock('../config/io.js', () => ({
+ loadConfig: vi.fn(),
+}));
+
+const listUri = 'at://did:plc:tester/app.bsky.graph.list/abcd';
+
+function makeAdapter(overrides: Partial[0]> = {}) {
+ return new BlueskyAdapter({
+ enabled: true,
+ agentName: 'TestAgent',
+ groups: { '*': { mode: 'listen' } },
+ ...overrides,
+ });
+}
+
+describe('BlueskyAdapter', () => {
+ const originalFetch = globalThis.fetch;
+
+ beforeEach(() => {
+ globalThis.fetch = vi.fn();
+ });
+
+ afterEach(() => {
+ globalThis.fetch = originalFetch;
+ vi.restoreAllMocks();
+ });
+
+ it('uses groups wildcard and explicit overrides when resolving mode', () => {
+ const adapter = makeAdapter({
+ groups: {
+ '*': { mode: 'open' },
+ 'did:plc:explicit': { mode: 'disabled' },
+ },
+ });
+
+ const getDidMode = (adapter as any).getDidMode.bind(adapter);
+ expect(getDidMode('did:plc:explicit')).toBe('disabled');
+ expect(getDidMode('did:plc:other')).toBe('open');
+ });
+
+ it('expands list DIDs and respects explicit group overrides', async () => {
+ (globalThis.fetch as any).mockResolvedValue({
+ ok: true,
+ json: async () => ({
+ items: [
+ { subject: { did: 'did:plc:one' } },
+ { subject: { did: 'did:plc:two' } },
+ ],
+ }),
+ text: async () => '',
+ });
+
+ const adapter = makeAdapter({
+ lists: {
+ [listUri]: { mode: 'open' },
+ },
+ groups: {
+ '*': { mode: 'listen' },
+ 'did:plc:two': { mode: 'disabled' },
+ },
+ appViewUrl: 'https://public.api.bsky.app',
+ });
+
+ await (adapter as any).expandLists();
+
+ const listModes = (adapter as any).listModes as Record;
+ expect(listModes['did:plc:one']).toBe('open');
+ expect(listModes['did:plc:two']).toBeUndefined();
+ });
+
+ it('mention-only replies only on mention notifications', async () => {
+ const adapter = makeAdapter({
+ groups: { '*': { mode: 'mention-only' } },
+ });
+
+ const messages: any[] = [];
+ adapter.onMessage = async (msg) => {
+ messages.push(msg);
+ };
+
+ const notificationBase = {
+ uri: 'at://did:plc:author/app.bsky.feed.post/aaa',
+ cid: 'cid1',
+ author: { did: 'did:plc:author', handle: 'author.bsky.social' },
+ record: {
+ $type: 'app.bsky.feed.post',
+ text: 'Hello',
+ createdAt: new Date().toISOString(),
+ },
+ indexedAt: new Date().toISOString(),
+ };
+
+ await (adapter as any).processNotification({
+ ...notificationBase,
+ reason: 'mention',
+ });
+
+ await (adapter as any).processNotification({
+ ...notificationBase,
+ reason: 'reply',
+ });
+
+ expect(messages).toHaveLength(2);
+ expect(messages[0].isListeningMode).toBe(false);
+ expect(messages[1].isListeningMode).toBe(true);
+ });
+
+ it('uses post uri as chatId and defaults notification reply root to the post itself', async () => {
+ const adapter = makeAdapter();
+
+ const notification = {
+ uri: 'at://did:plc:author/app.bsky.feed.post/abc',
+ cid: 'cid-post',
+ author: { did: 'did:plc:author', handle: 'author.bsky.social' },
+ reason: 'reply',
+ record: {
+ $type: 'app.bsky.feed.post',
+ text: 'Hello',
+ createdAt: new Date().toISOString(),
+ },
+ indexedAt: new Date().toISOString(),
+ };
+
+ const messages: any[] = [];
+ adapter.onMessage = async (msg) => {
+ messages.push(msg);
+ };
+
+ await (adapter as any).processNotification(notification);
+
+ expect(messages[0].chatId).toBe(notification.uri);
+
+ const lastPostByChatId = (adapter as any).lastPostByChatId as Map;
+ const entry = lastPostByChatId.get(notification.uri);
+ expect(entry?.rootUri).toBe(notification.uri);
+ expect(entry?.rootCid).toBe(notification.cid);
+ });
+
+ it('deduplicates Jetstream delivery after notifications', async () => {
+ const adapter = makeAdapter();
+
+ const messages: any[] = [];
+ adapter.onMessage = async (msg) => {
+ messages.push(msg);
+ };
+
+ const cid = 'cid-dup';
+ const notification = {
+ uri: 'at://did:plc:author/app.bsky.feed.post/dup',
+ cid,
+ author: { did: 'did:plc:author', handle: 'author.bsky.social' },
+ reason: 'mention',
+ record: {
+ $type: 'app.bsky.feed.post',
+ text: 'Hello',
+ createdAt: new Date().toISOString(),
+ },
+ indexedAt: new Date().toISOString(),
+ };
+
+ await (adapter as any).processNotification(notification);
+
+ const event = {
+ data: JSON.stringify({
+ did: 'did:plc:author',
+ time_us: Date.now() * 1000,
+ identity: { handle: 'author.bsky.social' },
+ commit: {
+ collection: 'app.bsky.feed.post',
+ rkey: 'dup',
+ cid,
+ record: {
+ $type: 'app.bsky.feed.post',
+ text: 'Hello',
+ createdAt: new Date().toISOString(),
+ },
+ },
+ }),
+ };
+
+ await (adapter as any).handleMessageEvent(event);
+
+ expect(messages).toHaveLength(1);
+ });
+
+ it('excludes disabled DIDs from wantedDids', () => {
+ const adapter = makeAdapter({
+ wantedDids: ['did:plc:disabled'],
+ groups: {
+ '*': { mode: 'listen' },
+ 'did:plc:disabled': { mode: 'disabled' },
+ },
+ });
+
+ const wanted = (adapter as any).getWantedDids();
+ expect(wanted).toEqual([]);
+ });
+
+ it('splits long replies into multiple posts', () => {
+ const text = Array.from({ length: 120 }, () => 'word').join(' ');
+ const chunks = splitPostText(text);
+ expect(chunks.length).toBeGreaterThan(1);
+ const segmenter = new Intl.Segmenter();
+ const graphemeCount = (s: string) => [...segmenter.segment(s)].length;
+ expect(chunks.every(chunk => graphemeCount(chunk) <= 300)).toBe(true);
+ const total = chunks.reduce((sum, chunk) => sum + graphemeCount(chunk), 0);
+ expect(total).toBeGreaterThan(300);
+ });
+
+ it('non-post Jetstream events are dropped without calling onMessage', async () => {
+ const adapter = makeAdapter({ wantedDids: ['did:plc:author'] });
+ const messages: any[] = [];
+ adapter.onMessage = async (msg) => { messages.push(msg); };
+
+ const likeEvent = {
+ data: JSON.stringify({
+ did: 'did:plc:author',
+ time_us: Date.now() * 1000,
+ commit: {
+ operation: 'create',
+ collection: 'app.bsky.feed.like',
+ rkey: 'aaa',
+ cid: 'cid-like',
+ record: {
+ $type: 'app.bsky.feed.like',
+ subject: { uri: 'at://did:plc:other/app.bsky.feed.post/xyz', cid: 'cid-post' },
+ createdAt: new Date().toISOString(),
+ },
+ },
+ }),
+ };
+
+ await (adapter as any).handleMessageEvent(likeEvent);
+ expect(messages).toHaveLength(0);
+ });
+
+ it('embedLines are included in extraContext for posts with images', async () => {
+ const adapter = makeAdapter({ wantedDids: ['did:plc:author'] });
+ const messages: any[] = [];
+ adapter.onMessage = async (msg) => { messages.push(msg); };
+
+ const eventWithEmbed = {
+ data: JSON.stringify({
+ did: 'did:plc:author',
+ time_us: Date.now() * 1000,
+ commit: {
+ operation: 'create',
+ collection: 'app.bsky.feed.post',
+ rkey: 'bbb',
+ cid: 'cid-embed',
+ record: {
+ $type: 'app.bsky.feed.post',
+ text: 'Check this out',
+ createdAt: new Date().toISOString(),
+ embed: {
+ $type: 'app.bsky.embed.images',
+ images: [{ alt: 'A cat photo' }],
+ },
+ },
+ },
+ }),
+ };
+
+ await (adapter as any).handleMessageEvent(eventWithEmbed);
+ expect(messages).toHaveLength(1);
+ expect(messages[0].extraContext?.['Embeds']).toContain('1 image');
+ });
+
+ it('sendMessage throws when kill switch is active', async () => {
+ const adapter = makeAdapter();
+ (adapter as any).runtimeDisabled = true;
+
+ await expect(adapter.sendMessage({ chatId: 'some-chat', text: 'hello' }))
+ .rejects.toThrow('kill switch');
+ });
+
+ it('reloadConfig preserves handle and appPassword when new config omits them', async () => {
+ const { loadConfig } = await import('../config/io.js');
+ vi.mocked(loadConfig).mockReturnValue({
+ channels: {
+ bluesky: {
+ enabled: true,
+ groups: { '*': { mode: 'open' } },
+ // handle and appPassword intentionally absent (set via env vars)
+ },
+ },
+ } as any);
+
+ const adapter = makeAdapter({ handle: 'env@bsky.social', appPassword: 'env-pass' });
+ (adapter as any).reloadConfig();
+
+ expect((adapter as any).config.handle).toBe('env@bsky.social');
+ expect((adapter as any).config.appPassword).toBe('env-pass');
+ expect((adapter as any).config.groups?.['*']?.mode).toBe('open');
+ });
+
+ it('loadState does not override config wantedDids from persisted state', () => {
+ const tempDir = join(tmpdir(), `bluesky-test-${Date.now()}`);
+ mkdirSync(tempDir, { recursive: true });
+
+ const statePath = join(tempDir, 'bluesky-jetstream.json');
+ writeFileSync(statePath, JSON.stringify({
+ version: 1,
+ agents: {
+ TestAgent: {
+ cursor: 123456,
+ wantedDids: ['did:plc:stale-from-state'],
+ wantedCollections: ['app.bsky.feed.like'],
+ auth: { did: 'did:plc:auth', handle: 'test.bsky.social' },
+ },
+ },
+ }));
+
+ const adapter = makeAdapter({
+ wantedDids: ['did:plc:from-config'],
+ wantedCollections: ['app.bsky.feed.post'],
+ });
+ // Point adapter at our temp state file and load it
+ (adapter as any).statePath = statePath;
+ (adapter as any).loadState();
+
+ // Config values must remain authoritative -- state must not overwrite them
+ expect((adapter as any).config.wantedDids).toEqual(['did:plc:from-config']);
+ expect((adapter as any).config.wantedCollections).toEqual(['app.bsky.feed.post']);
+ // Cursor and auth should still be restored from state
+ expect((adapter as any).lastCursor).toBe(123456);
+ expect((adapter as any).sessionDid).toBe('did:plc:auth');
+
+ rmSync(tempDir, { recursive: true, force: true });
+ });
+
+ it('flushState does not persist wantedDids or wantedCollections', () => {
+ const tempDir = join(tmpdir(), `bluesky-test-${Date.now()}`);
+ mkdirSync(tempDir, { recursive: true });
+
+ const statePath = join(tempDir, 'bluesky-jetstream.json');
+ const adapter = makeAdapter({
+ wantedDids: ['did:plc:configured'],
+ wantedCollections: ['app.bsky.feed.post'],
+ });
+ (adapter as any).statePath = statePath;
+ (adapter as any).lastCursor = 999;
+ (adapter as any).sessionDid = 'did:plc:me';
+ (adapter as any).stateDirty = true;
+
+ (adapter as any).flushState();
+
+ const state = JSON.parse(readFileSync(statePath, 'utf-8'));
+ const entry = state.agents.TestAgent;
+ expect(entry.cursor).toBe(999);
+ expect(entry).not.toHaveProperty('wantedDids');
+ expect(entry).not.toHaveProperty('wantedCollections');
+
+ rmSync(tempDir, { recursive: true, force: true });
+ });
+
+ it('pauseRuntime clears pending reconnect timer', () => {
+ vi.useFakeTimers();
+ try {
+ const adapter = makeAdapter();
+ (adapter as any).running = true;
+
+ // Simulate a scheduled reconnect
+ const timerCallback = vi.fn();
+ (adapter as any).reconnectTimer = setTimeout(timerCallback, 60000);
+
+ (adapter as any).runtimeDisabled = true;
+ (adapter as any).pauseRuntime();
+
+ expect((adapter as any).reconnectTimer).toBeNull();
+ // Verify the timer was actually cleared (callback should not fire)
+ vi.advanceTimersByTime(60000);
+ expect(timerCallback).not.toHaveBeenCalled();
+ } finally {
+ vi.useRealTimers();
+ }
+ });
+
+ it('connect() refuses to proceed when runtimeDisabled is true', () => {
+ const adapter = makeAdapter({ wantedDids: ['did:plc:target'] });
+ (adapter as any).running = true;
+ (adapter as any).runtimeDisabled = true;
+ (adapter as any).ws = null;
+
+ // connect() should bail out before creating a WebSocket
+ (adapter as any).connect();
+
+ expect((adapter as any).ws).toBeNull();
+ });
+});
diff --git a/src/channels/bluesky.ts b/src/channels/bluesky.ts
new file mode 100644
index 0000000..27500e0
--- /dev/null
+++ b/src/channels/bluesky.ts
@@ -0,0 +1 @@
+export * from './bluesky/index.js';
diff --git a/src/channels/bluesky/adapter.ts b/src/channels/bluesky/adapter.ts
new file mode 100644
index 0000000..9ec4a2b
--- /dev/null
+++ b/src/channels/bluesky/adapter.ts
@@ -0,0 +1,1554 @@
+/**
+ * Bluesky Jetstream Channel Adapter (read-only by default)
+ *
+ * Uses the Jetstream WebSocket API to ingest events for selected DID(s).
+ * Messages are delivered to the agent in listening mode (no auto-replies).
+ */
+
+import { WebSocket } from 'undici';
+import { existsSync, mkdirSync, readFileSync, writeFileSync } from 'node:fs';
+import { dirname, join } from 'node:path';
+import type { ChannelAdapter } from '../types.js';
+import type { InboundMessage, OutboundFile, OutboundMessage } from '../../core/types.js';
+import { getDataDir } from '../../utils/paths.js';
+import { loadConfig } from '../../config/io.js';
+import { createLogger } from '../../logger.js';
+import type { BlueskyConfig, BlueskyInboundMessage, BlueskySource, DidMode, JetstreamEvent } from './types.js';
+import {
+ CURSOR_BACKTRACK_US,
+ DEFAULT_JETSTREAM_URL,
+ DEFAULT_NOTIFICATIONS_INTERVAL_SEC,
+ DEFAULT_NOTIFICATIONS_LIMIT,
+ DEFAULT_SERVICE_URL,
+ HANDLE_CACHE_MAX,
+ LAST_POST_CACHE_MAX,
+ SEEN_MESSAGE_IDS_MAX,
+ RECONNECT_BASE_MS,
+ RECONNECT_MAX_MS,
+ STATE_FILENAME,
+ STATE_FLUSH_INTERVAL_MS,
+ STATE_VERSION,
+} from './constants.js';
+import { extractPostDetails } from './formatter.js';
+import { AtpAgent } from '@atproto/api';
+import {
+ buildAtUri,
+ decodeJwtExp,
+ fetchWithTimeout,
+ getAppViewUrl,
+ isRecord,
+ normalizeList,
+ parseAtUri,
+ parseFacets,
+ pruneMap,
+ readString,
+ splitPostText,
+ truncate,
+ uniqueList,
+} from './utils.js';
+
+const log = createLogger('Bluesky');
+
+export class BlueskyAdapter implements ChannelAdapter {
+ readonly id = 'bluesky' as const;
+ readonly name = 'Bluesky';
+
+ private config: BlueskyConfig;
+ private ws: WebSocket | null = null;
+ private running = false;
+ private reconnectTimer: ReturnType | null = null;
+ private reconnectAttempts = 0;
+ private intentionalClose = false;
+ private lastCursor?: number;
+ private handleByDid = new Map();
+ private handleFetchInFlight = new Map>();
+ private lastHandleFetchAt = new Map();
+ private seenMessageIds = new Map();
+ private seenBaseMessageIds = new Map();
+ private lastPostByChatId = new Map();
+ private statePath?: string;
+ private stateDirty = false;
+ private stateFlushTimer: ReturnType | null = null;
+ private accessJwt?: string;
+ private refreshJwt?: string;
+ private sessionDid?: string;
+ private accessJwtExpiresAt?: number;
+ private refreshJwtExpiresAt?: number;
+ private didModes: Record = {};
+ private notificationsTimer: ReturnType | null = null;
+ private notificationsCursor?: string;
+ private notificationsInitialized = false;
+ private notificationsInFlight = false;
+ private listModes: Record = {};
+ private listRefreshInFlight = false;
+ private runtimePath?: string;
+ private runtimeTimer: ReturnType | null = null;
+ private runtimeDisabled = false;
+ private lastRuntimeRefreshAt?: string;
+ private lastRuntimeReloadAt?: string;
+ private readonly handleFetchCooldownMs = 5 * 60 * 1000;
+
+ onMessage?: (msg: InboundMessage) => Promise;
+ onCommand?: (command: string) => Promise;
+
+ private buildFormatterHints(shouldReply: boolean, didMode: DidMode) {
+ let actionsSection: string[];
+ if (shouldReply) {
+ // open or mention-only (notification mention) β bot will auto-post the reply
+ actionsSection = [
+ 'Your text response will be posted as a Bluesky reply.',
+ 'Like: `lettabot-bluesky like `',
+ 'NOTE: Bluesky does NOT support emoji reactions (no `` blocks).',
+ ];
+ } else if (didMode === 'mention-only') {
+ // mention-only but not a mention notification (reply/quote or Jetstream) β observing only
+ actionsSection = [
+ 'In mention-only mode, auto-replies are limited to @mention notifications. Your text response will NOT be auto-posted.',
+ 'Use the Bluesky skill to reply manually: `lettabot-bluesky post --reply-to --text "..."`',
+ 'Like: `lettabot-bluesky like `',
+ 'Posts over 300 chars require `--threaded` to create a reply thread.',
+ 'NOTE: Bluesky does NOT support emoji reactions (no `` blocks).',
+ ];
+ } else {
+ // listen β read-only, use CLI to act
+ actionsSection = [
+ 'This channel is read-only; your text response will NOT be posted.',
+ 'Use the Bluesky skill to reply/like/post (CLI: `lettabot-bluesky`, equivalent to `lettabot bluesky ...`).',
+ 'Reply: `lettabot-bluesky post --reply-to --text "..."`',
+ 'Like: `lettabot-bluesky like `',
+ 'Posts over 300 chars require `--threaded` to create a reply thread.',
+ 'NOTE: Bluesky does NOT support emoji reactions (no `` blocks).',
+ ];
+ }
+ return {
+ formatHint: 'Plain text only (no markdown, no tables).',
+ actionsSection,
+ };
+ }
+
+ constructor(config: BlueskyConfig) {
+ this.config = config;
+ this.loadDidModes();
+ if (config.agentName) {
+ const baseDir = getDataDir();
+ this.statePath = join(baseDir, STATE_FILENAME);
+ this.runtimePath = join(baseDir, 'bluesky-runtime.json');
+ }
+ }
+
+ async start(): Promise {
+ if (this.running) return;
+ this.running = true;
+ this.loadState();
+ this.startStateFlushTimer();
+ await this.maybeInitPostingIdentity();
+ if (!this.running) return;
+ await this.expandLists();
+ if (!this.running) return;
+ this.startRuntimeWatcher();
+ await this.checkRuntimeState();
+ if (!this.running) return;
+ if (!this.runtimeDisabled) {
+ this.startNotificationsPolling();
+ if (this.hasJetstreamTargets()) {
+ this.connect();
+ } else {
+ log.warn('Jetstream disabled (no wantedDids or list-expanded DIDs).');
+ }
+ }
+ }
+
+ async stop(): Promise {
+ this.running = false;
+ if (this.reconnectTimer) {
+ clearTimeout(this.reconnectTimer);
+ this.reconnectTimer = null;
+ }
+ if (this.stateFlushTimer) {
+ clearInterval(this.stateFlushTimer);
+ this.stateFlushTimer = null;
+ }
+ if (this.notificationsTimer) {
+ clearInterval(this.notificationsTimer);
+ this.notificationsTimer = null;
+ }
+ if (this.runtimeTimer) {
+ clearInterval(this.runtimeTimer);
+ this.runtimeTimer = null;
+ }
+ this.flushState();
+ if (this.ws) {
+ try {
+ this.ws.close();
+ } catch {
+ // ignore
+ }
+ this.ws = null;
+ }
+ }
+
+ isRunning(): boolean {
+ return this.running;
+ }
+
+ async sendMessage(_msg: OutboundMessage): Promise<{ messageId: string }> {
+ if (this.runtimeDisabled) {
+ throw new Error('Bluesky runtime disabled via kill switch.');
+ }
+
+ const target = this.lastPostByChatId.get(_msg.chatId);
+ if (!target) {
+ throw new Error('No recent post target to reply to.');
+ }
+
+ const chunks = splitPostText(_msg.text);
+ if (chunks.length === 0) {
+ throw new Error('Refusing to post empty reply.');
+ }
+
+ const rootUri = target.rootUri || target.uri;
+ const rootCid = target.rootCid || target.cid;
+ if (!rootUri || !rootCid) {
+ throw new Error('Missing reply root metadata.');
+ }
+
+ let currentTarget = {
+ uri: target.uri,
+ cid: target.cid,
+ rootUri,
+ rootCid,
+ };
+ let lastUri = '';
+ for (let i = 0; i < chunks.length; i++) {
+ const chunk = chunks[i];
+ const post = await this.createReply(chunk, currentTarget);
+ const postUri = post?.uri;
+ if (!postUri) {
+ throw new Error('Reply post returned no URI.');
+ }
+ const isLast = i === chunks.length - 1;
+ lastUri = postUri;
+ if (!isLast) {
+ const cid = post?.cid || await this.resolveRecordCid(postUri);
+ if (!cid) throw new Error('Reply post returned no CID for intermediate chunk.');
+ currentTarget = { uri: postUri, cid, rootUri, rootCid };
+ }
+ }
+ return { messageId: lastUri };
+ }
+
+ async editMessage(_chatId: string, _messageId: string, _text: string): Promise {
+ log.warn('editMessage is not supported (read-only channel).');
+ }
+
+ supportsEditing(): boolean {
+ return false;
+ }
+
+ async sendTypingIndicator(_chatId: string): Promise {
+ // No typing indicator on Bluesky
+ }
+
+ getFormatterHints() {
+ return {
+ supportsReactions: false,
+ supportsFiles: false,
+ formatHint: 'Plain text only (no markdown, no tables).',
+ };
+ }
+
+ async sendFile(_file: OutboundFile): Promise<{ messageId: string }>
+ {
+ throw new Error('sendFile is not supported.');
+ }
+
+ private connect(): void {
+ if (!this.running) return;
+ if (this.runtimeDisabled) return;
+ if (this.ws) return; // Already connected β prevent double-connections
+ if (!this.hasJetstreamTargets()) {
+ log.warn('Jetstream disabled (no wantedDids or list-expanded DIDs).');
+ return;
+ }
+
+ const url = this.buildJetstreamUrl();
+ log.info(`Connecting to Jetstream: ${url}`);
+
+ const ws = new WebSocket(url);
+ this.ws = ws;
+ let sawError = false;
+
+ ws.addEventListener('open', () => {
+ this.reconnectAttempts = 0;
+ log.info('Connected');
+ });
+
+ ws.addEventListener('message', (event) => {
+ this.handleMessageEvent(event).catch(err => {
+ log.error('Failed to process event:', err);
+ });
+ });
+
+ ws.addEventListener('error', (event) => {
+ if (sawError) return;
+ sawError = true;
+ const error = (event as { error?: unknown; message?: string }).error
+ || (event as { error?: unknown; message?: string }).message
+ || 'Unknown WebSocket error';
+ log.error('WebSocket error:', {
+ error,
+ url: this.buildJetstreamUrl(),
+ reconnectAttempts: this.reconnectAttempts,
+ });
+ // Some WebSocket errors never emit "close"; force a close to trigger reconnect.
+ if (this.ws === ws && !this.runtimeDisabled) {
+ try {
+ ws.close();
+ } catch {
+ // ignore
+ }
+ }
+ });
+
+ ws.addEventListener('close', () => {
+ if (this.ws !== ws) {
+ // Stale/orphaned connection closed β a new connection is already active, ignore
+ return;
+ }
+ this.ws = null;
+ log.warn('Disconnected');
+ if (this.intentionalClose) {
+ // reconnectJetstream() already called connect() β don't schedule another reconnect
+ this.intentionalClose = false;
+ return;
+ }
+ if (!this.runtimeDisabled) {
+ this.scheduleReconnect();
+ }
+ });
+ }
+
+ private scheduleReconnect(): void {
+ if (!this.running) return;
+ if (this.reconnectTimer) return;
+ if (!this.hasJetstreamTargets()) {
+ log.warn('Jetstream reconnect skipped (no wantedDids or list-expanded DIDs).');
+ return;
+ }
+
+ const delay = Math.min(RECONNECT_MAX_MS, RECONNECT_BASE_MS * Math.pow(2, this.reconnectAttempts));
+ this.reconnectAttempts += 1;
+
+ this.reconnectTimer = setTimeout(() => {
+ this.reconnectTimer = null;
+ this.connect();
+ }, delay);
+
+ log.info(`Reconnecting in ${delay}ms...`);
+ }
+
+ private buildJetstreamUrl(): string {
+ const base = this.config.jetstreamUrl || DEFAULT_JETSTREAM_URL;
+ const url = new URL(base);
+
+ const wantedDids = this.getWantedDids();
+ url.searchParams.delete('wantedDids');
+ for (const did of wantedDids) {
+ url.searchParams.append('wantedDids', did);
+ }
+
+ const wantedCollections = normalizeList(this.config.wantedCollections);
+ url.searchParams.delete('wantedCollections');
+ for (const collection of wantedCollections) {
+ url.searchParams.append('wantedCollections', collection);
+ }
+
+ const cursor = this.lastCursor !== undefined
+ ? Math.max(0, this.lastCursor - CURSOR_BACKTRACK_US)
+ : this.config.cursor;
+
+ if (cursor !== undefined) {
+ url.searchParams.set('cursor', String(cursor));
+ }
+
+ return url.toString();
+ }
+
+ private hasJetstreamTargets(): boolean {
+ return this.getWantedDids().length > 0;
+ }
+
+
+ private async handleMessageEvent(event: { data: unknown }): Promise {
+ const raw = typeof event.data === 'string'
+ ? event.data
+ : Buffer.from(event.data as ArrayBuffer).toString('utf-8');
+
+ let payload: JetstreamEvent;
+ try {
+ payload = JSON.parse(raw) as JetstreamEvent;
+ } catch {
+ log.warn('Received non-JSON message');
+ return;
+ }
+
+ if (typeof payload.time_us === 'number') {
+ this.lastCursor = payload.time_us;
+ this.stateDirty = true;
+ }
+
+ // Skip our own posts to prevent self-reply loops
+ if (payload.did && payload.did === this.sessionDid) {
+ return;
+ }
+
+ if (payload.did && payload.identity?.handle) {
+ this.handleByDid.set(payload.did, payload.identity.handle);
+ pruneMap(this.handleByDid, HANDLE_CACHE_MAX);
+ }
+ if (payload.did && payload.account?.handle) {
+ this.handleByDid.set(payload.did, payload.account.handle);
+ pruneMap(this.handleByDid, HANDLE_CACHE_MAX);
+ }
+
+ if (payload.did && !this.handleByDid.get(payload.did)) {
+ const resolved = await this.resolveHandleForDid(payload.did);
+ if (resolved) {
+ this.handleByDid.set(payload.did, resolved);
+ pruneMap(this.handleByDid, HANDLE_CACHE_MAX);
+ }
+ }
+
+ if (!payload.commit) {
+ return;
+ }
+
+ const did = payload.did || 'unknown';
+ const handle = payload.did ? this.handleByDid.get(payload.did) : undefined;
+ const { text, messageId, source, extraContext } = this.formatCommit(payload, handle);
+ if (!text) {
+ log.debug(`Dropping non-post Jetstream event: ${payload.commit?.collection} from ${did}`);
+ return;
+ }
+ if (messageId && (this.seenMessageIds.has(messageId) || this.seenBaseMessageIds.has(messageId))) return;
+
+ const timestamp = payload.time_us
+ ? new Date(Math.floor(payload.time_us / 1000))
+ : new Date();
+
+ const didMode = this.getDidMode(did);
+ if (didMode === 'disabled') {
+ return;
+ }
+
+ const isPost = payload.commit?.collection === 'app.bsky.feed.post';
+ const shouldReply = isPost && didMode === 'open';
+
+ const chatId = source?.uri ?? did;
+ const inbound: BlueskyInboundMessage = {
+ channel: 'bluesky',
+ chatId,
+ userId: did,
+ userHandle: handle,
+ userName: handle ? `@${handle}` : undefined,
+ messageId,
+ text,
+ timestamp,
+ messageType: 'public',
+ groupName: handle ? `@${handle}` : did,
+ isListeningMode: !shouldReply,
+ source,
+ extraContext,
+ formatterHints: this.buildFormatterHints(shouldReply, didMode),
+ };
+
+ if (payload.commit?.collection === 'app.bsky.feed.post' && source?.uri) {
+ // For standalone posts (not replies), root is the post itself.
+ // For reply posts, threadRootUri/Cid point to the conversation root.
+ this.lastPostByChatId.set(chatId, {
+ uri: source.uri,
+ cid: source.cid,
+ rootUri: source.threadRootUri ?? source.uri,
+ rootCid: source.threadRootCid ?? source.cid,
+ });
+ pruneMap(this.lastPostByChatId, LAST_POST_CACHE_MAX);
+ }
+
+ if (messageId) {
+ this.seenMessageIds.set(messageId, true);
+ pruneMap(this.seenMessageIds, SEEN_MESSAGE_IDS_MAX);
+ this.seenBaseMessageIds.set(messageId, true);
+ pruneMap(this.seenBaseMessageIds, SEEN_MESSAGE_IDS_MAX);
+ }
+ await this.onMessage?.(inbound);
+ }
+
+ private formatCommit(payload: JetstreamEvent, handle?: string): {
+ text: string;
+ messageId?: string;
+ source?: BlueskySource;
+ extraContext: Record;
+ } {
+ const commit = payload.commit || {};
+ const operation = commit.operation || 'commit';
+ const collection = commit.collection || 'unknown';
+ const uri = buildAtUri(payload.did, commit.collection, commit.rkey);
+
+ const source: BlueskySource = {
+ uri,
+ collection: commit.collection,
+ cid: commit.cid,
+ rkey: commit.rkey,
+ };
+
+ const extraContext: Record = {};
+ extraContext['Operation'] = `${operation} ${collection}`;
+ if (handle) {
+ extraContext['Handle'] = `@${handle}`;
+ }
+ if (payload.did) {
+ extraContext['DID'] = payload.did;
+ }
+ if (uri) {
+ extraContext['URI'] = uri;
+ }
+
+ const record = isRecord(commit.record) ? commit.record : undefined;
+
+ if (collection === 'app.bsky.feed.post' && record) {
+ const details = extractPostDetails(record);
+
+ if (details.createdAt) {
+ extraContext['Created'] = details.createdAt;
+ }
+ if (details.langs.length > 0) {
+ extraContext['Languages'] = details.langs.join(', ');
+ }
+ if (details.replyRefs.rootUri) {
+ extraContext['Thread root'] = details.replyRefs.rootUri;
+ }
+ if (details.replyRefs.parentUri) {
+ extraContext['Reply parent'] = details.replyRefs.parentUri;
+ }
+ if (details.embedLines.length > 0) {
+ extraContext['Embeds'] = details.embedLines.join(' | ');
+ }
+
+ if (details.replyRefs.rootUri) source.threadRootUri = details.replyRefs.rootUri;
+ if (details.replyRefs.rootCid) source.threadRootCid = details.replyRefs.rootCid;
+ if (details.replyRefs.parentUri) source.threadParentUri = details.replyRefs.parentUri;
+ if (details.replyRefs.parentCid) source.threadParentCid = details.replyRefs.parentCid;
+ return {
+ text: details.text || '',
+ extraContext,
+ messageId: commit.cid || commit.rkey,
+ source,
+ };
+ } else if ((collection === 'app.bsky.feed.like' || collection === 'app.bsky.feed.repost') && record) {
+ const subject = isRecord(record.subject) ? record.subject : undefined;
+ const subjectUri = subject ? readString(subject.uri) : undefined;
+ const subjectCid = subject ? readString(subject.cid) : undefined;
+ const createdAt = readString(record.createdAt);
+
+ if (subjectUri) {
+ extraContext['Subject'] = subjectUri;
+ }
+ if (createdAt) {
+ extraContext['Created'] = createdAt;
+ }
+
+ if (subjectUri) source.subjectUri = subjectUri;
+ if (subjectCid) source.subjectCid = subjectCid;
+ } else if ((collection === 'app.bsky.graph.follow' || collection === 'app.bsky.graph.block') && record) {
+ const subjectDid = readString(record.subject);
+ const createdAt = readString(record.createdAt);
+ if (subjectDid) {
+ extraContext['Subject DID'] = subjectDid;
+ }
+ if (createdAt) {
+ extraContext['Created'] = createdAt;
+ }
+ } else if (record) {
+ const createdAt = readString(record.createdAt);
+ if (createdAt) {
+ extraContext['Created'] = createdAt;
+ }
+ extraContext['Record'] = truncate(JSON.stringify(record));
+ }
+
+ return {
+ text: '', // No post text for non-post collections
+ extraContext,
+ messageId: commit.cid || commit.rkey,
+ source,
+ };
+ }
+
+ private getServiceUrl(): string {
+ const raw = this.config.serviceUrl || DEFAULT_SERVICE_URL;
+ return raw.replace(/\/+$/, '');
+ }
+
+ private isExpired(expiryMs?: number, skewMs = 60_000): boolean {
+ if (!expiryMs) return true;
+ return expiryMs - skewMs <= Date.now();
+ }
+
+ private async maybeInitPostingIdentity(): Promise {
+ if (!this.config.handle && !this.refreshJwt) return;
+ if (!this.config.appPassword && !this.refreshJwt) return;
+
+ try {
+ await this.ensureSession();
+ } catch (err) {
+ log.warn('Posting identity init failed:', err);
+ }
+ }
+
+ private async ensureSession(): Promise {
+ if (this.accessJwt && !this.isExpired(this.accessJwtExpiresAt)) {
+ return;
+ }
+
+ if (this.refreshJwt && !this.isExpired(this.refreshJwtExpiresAt)) {
+ try {
+ await this.refreshSessionWithRetry();
+ return;
+ } catch (err) {
+ log.warn('refreshSession failed, falling back to createSession:', err);
+ }
+ }
+
+ await this.createSessionWithRetry();
+ }
+
+ private async withRetry(fn: () => Promise, label: string, maxRetries = 3): Promise {
+ let lastError: Error | undefined;
+ for (let attempt = 0; attempt < maxRetries; attempt++) {
+ try {
+ return await fn();
+ } catch (err) {
+ lastError = err as Error;
+ if (attempt < maxRetries - 1) {
+ const delay = Math.min(5000, 1000 * Math.pow(2, attempt));
+ log.warn(`${label} failed (attempt ${attempt + 1}/${maxRetries}). Retrying in ${delay}ms.`);
+ await new Promise(resolve => setTimeout(resolve, delay));
+ }
+ }
+ }
+ throw lastError ?? new Error(`${label} failed`);
+ }
+
+ private async refreshSessionWithRetry(): Promise {
+ await this.withRetry(() => this.refreshSession(), 'refreshSession');
+ }
+
+ private async createSessionWithRetry(): Promise {
+ await this.withRetry(() => this.createSession(), 'createSession');
+ }
+
+ private async refreshSession(): Promise {
+ if (!this.refreshJwt) {
+ throw new Error('Missing refreshJwt');
+ }
+
+ const res = await fetchWithTimeout(`${this.getServiceUrl()}/xrpc/com.atproto.server.refreshSession`, {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ 'Authorization': `Bearer ${this.refreshJwt}`,
+ },
+ });
+
+ if (!res.ok) {
+ const detail = await res.text();
+ throw new Error(`refreshSession failed: ${detail}`);
+ }
+
+ const data = await res.json() as { accessJwt: string; refreshJwt: string; did: string; handle?: string };
+ this.applySession(data.accessJwt, data.refreshJwt, data.did, data.handle);
+ }
+
+ private async createSession(): Promise {
+ const identifier = this.config.handle;
+ const password = this.config.appPassword;
+ if (!identifier || !password) {
+ throw new Error('Missing Bluesky handle/appPassword for posting.');
+ }
+
+ const res = await fetchWithTimeout(`${this.getServiceUrl()}/xrpc/com.atproto.server.createSession`, {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ identifier, password }),
+ });
+
+ if (!res.ok) {
+ const detail = await res.text();
+ throw new Error(`createSession failed: ${detail}`);
+ }
+
+ const data = await res.json() as { accessJwt: string; refreshJwt: string; did: string; handle?: string };
+ this.applySession(data.accessJwt, data.refreshJwt, data.did, data.handle);
+ }
+
+ private applySession(accessJwt: string, refreshJwt: string, did: string, handle?: string): void {
+ this.accessJwt = accessJwt;
+ this.refreshJwt = refreshJwt;
+ this.sessionDid = did;
+ this.accessJwtExpiresAt = decodeJwtExp(accessJwt);
+ this.refreshJwtExpiresAt = decodeJwtExp(refreshJwt);
+ if (handle) {
+ this.handleByDid.set(did, handle);
+ }
+ this.stateDirty = true;
+ }
+
+ private static readonly DID_PATTERN = /^did:[a-z]+:[a-zA-Z0-9._:-]+$/;
+
+ private loadDidModes(): void {
+ const modes: Record = {};
+ const groups = this.config.groups || {};
+ for (const [did, config] of Object.entries(groups)) {
+ if (did === '*') continue;
+ if (!BlueskyAdapter.DID_PATTERN.test(did)) {
+ log.warn(`Ignoring groups entry with invalid DID: "${did}"`);
+ continue;
+ }
+ const mode = config?.mode;
+ if (mode === 'open' || mode === 'listen' || mode === 'mention-only' || mode === 'disabled') {
+ modes[did] = mode;
+ }
+ }
+ this.didModes = modes;
+ }
+
+ private getDidMode(did: string): DidMode {
+ const explicit = this.didModes[did];
+ if (explicit) return explicit;
+
+ const listMode = this.listModes[did];
+ if (listMode) return listMode;
+
+ const wildcardMode = this.config.groups?.['*']?.mode;
+ if (wildcardMode === 'open' || wildcardMode === 'listen' || wildcardMode === 'mention-only' || wildcardMode === 'disabled') {
+ return wildcardMode;
+ }
+
+ return 'listen';
+ }
+
+ private getWantedDids(): string[] {
+ const configured = normalizeList(this.config.wantedDids);
+ const disabledDids = new Set(
+ Object.entries(this.didModes)
+ .filter(([, mode]) => mode === 'disabled')
+ .map(([did]) => did),
+ );
+ const explicitAllowed = Object.entries(this.didModes)
+ .filter(([, mode]) => mode !== 'disabled')
+ .map(([did]) => did);
+ const listAllowed = Object.entries(this.listModes)
+ .filter(([, mode]) => mode !== 'disabled')
+ .map(([did]) => did);
+ const combined = uniqueList([...configured, ...explicitAllowed, ...listAllowed]);
+ return combined.filter(did => !disabledDids.has(did));
+ }
+
+ private getNotificationsConfig(): {
+ enabled: boolean;
+ intervalMs: number;
+ limit: number;
+ priority?: boolean;
+ reasons: string[];
+ backfill: boolean;
+ } | null {
+ const config = this.config.notifications;
+ if (config?.enabled === false) return null;
+
+ const hasAuth = !!(this.config.handle && this.config.appPassword) || !!this.refreshJwt;
+ if (!config?.enabled && !hasAuth) return null;
+ if (config?.enabled && !hasAuth) {
+ log.warn('Notifications enabled but no auth configured.');
+ return null;
+ }
+
+ const intervalSec = typeof config?.intervalSec === 'number' && config.intervalSec > 0
+ ? config.intervalSec
+ : DEFAULT_NOTIFICATIONS_INTERVAL_SEC;
+ const limit = typeof config?.limit === 'number' && config.limit > 0
+ ? config.limit
+ : DEFAULT_NOTIFICATIONS_LIMIT;
+ const reasons = config?.reasons && normalizeList(config.reasons).length > 0
+ ? normalizeList(config.reasons)
+ : ['mention', 'reply', 'quote'];
+ return {
+ enabled: true,
+ intervalMs: intervalSec * 1000,
+ limit,
+ priority: config?.priority,
+ reasons,
+ backfill: config?.backfill === true,
+ };
+ }
+
+ private startNotificationsPolling(): void {
+ const config = this.getNotificationsConfig();
+ if (!config) return;
+ if (this.notificationsTimer) return;
+ this.notificationsTimer = setInterval(() => {
+ this.pollNotifications().catch(err => {
+ log.error('Notifications poll failed:', err);
+ });
+ }, config.intervalMs);
+ this.pollNotifications().catch(err => {
+ log.error('Notifications poll failed:', err);
+ });
+ log.info(`Notifications polling every ${config.intervalMs / 1000}s`);
+ }
+
+ private async pollNotifications(): Promise {
+ const config = this.getNotificationsConfig();
+ if (!config || !this.running) return;
+ if (this.notificationsInFlight) return;
+ this.notificationsInFlight = true;
+
+ try {
+ await this.ensureSession();
+ if (!this.accessJwt) return;
+
+ const params = new URLSearchParams();
+ params.set('limit', String(config.limit));
+ if (this.notificationsCursor) {
+ params.set('cursor', this.notificationsCursor);
+ }
+ if (config.priority !== undefined) {
+ params.set('priority', config.priority ? 'true' : 'false');
+ }
+ for (const reason of config.reasons) {
+ params.append('reasons', reason);
+ }
+
+ const res = await fetchWithTimeout(`${getAppViewUrl(this.config.appViewUrl)}/xrpc/app.bsky.notification.listNotifications?${params}`, {
+ headers: { Authorization: `Bearer ${this.accessJwt}` },
+ });
+
+ if (res.status === 401) {
+ this.accessJwt = undefined;
+ this.accessJwtExpiresAt = undefined;
+ await this.ensureSession();
+ return;
+ }
+
+ if (!res.ok) {
+ const detail = await res.text();
+ throw new Error(`listNotifications failed: ${detail}`);
+ }
+
+ const data = await res.json() as {
+ cursor?: string;
+ notifications: Array<{
+ uri: string;
+ cid?: string;
+ author?: { did: string; handle?: string; displayName?: string };
+ reason: string;
+ reasonSubject?: string;
+ record?: Record;
+ indexedAt?: string;
+ isRead?: boolean;
+ }>;
+ };
+
+ const backfill = config.backfill;
+ const initializing = !this.notificationsInitialized;
+ let deferredCursor: string | undefined;
+
+ if (initializing) {
+ deferredCursor = data.cursor;
+ this.notificationsInitialized = true;
+ this.stateDirty = true;
+ if (!backfill) {
+ if (deferredCursor) {
+ this.notificationsCursor = deferredCursor;
+ this.stateDirty = true;
+ }
+ log.info('Notifications cursor initialized (skipping initial backlog).');
+ return;
+ }
+ if (!deferredCursor) {
+ log.warn('Notifications backfill enabled but API returned no cursor; may reprocess initial page.');
+ }
+ log.info('Notifications cursor initialized (backfill enabled).');
+ }
+
+ if (!initializing && data.cursor) {
+ this.notificationsCursor = data.cursor;
+ this.stateDirty = true;
+ }
+
+ const notifications = Array.isArray(data.notifications) ? data.notifications : [];
+ if (notifications.length === 0) {
+ if (initializing && deferredCursor) {
+ this.notificationsCursor = deferredCursor;
+ this.stateDirty = true;
+ }
+ return;
+ }
+
+ // Deliver oldest first
+ const ordered = [...notifications].reverse();
+ for (const notification of ordered) {
+ await this.processNotification(notification);
+ }
+
+ if (initializing && deferredCursor) {
+ this.notificationsCursor = deferredCursor;
+ this.stateDirty = true;
+ }
+ } finally {
+ this.notificationsInFlight = false;
+ }
+ }
+
+ private async expandLists(): Promise {
+ if (this.listRefreshInFlight) return;
+ const lists = this.config.lists || {};
+ const entries = Object.entries(lists).filter(([uri]) => uri && uri !== '*');
+ if (entries.length === 0) return;
+
+ this.listRefreshInFlight = true;
+ try {
+ if (!this.accessJwt && this.config.handle && this.config.appPassword) {
+ try {
+ await this.ensureSession();
+ } catch (err) {
+ log.warn('List expansion auth failed:', err);
+ }
+ }
+
+ const nextModes: Record = {};
+
+ for (const [listUri, config] of entries) {
+ const mode = config?.mode;
+ if (mode !== 'open' && mode !== 'listen' && mode !== 'mention-only' && mode !== 'disabled') {
+ continue;
+ }
+
+ const dids = await this.fetchListDids(listUri);
+ for (const did of dids) {
+ if (!did || !BlueskyAdapter.DID_PATTERN.test(did)) {
+ if (did) log.warn(`Skipping list entry with invalid DID: "${did}"`);
+ continue;
+ }
+ if (this.didModes[did]) {
+ // Explicit groups config takes precedence over list membership
+ log.debug(`List DID ${did} already explicitly configured, skipping list entry`);
+ continue;
+ }
+ if (!nextModes[did]) {
+ nextModes[did] = mode;
+ }
+ }
+ }
+
+ this.listModes = nextModes;
+ } catch (err) {
+ log.error('List expansion failed:', err);
+ } finally {
+ this.listRefreshInFlight = false;
+ }
+ }
+
+ private async fetchListDids(listUri: string): Promise {
+ const dids: string[] = [];
+ let cursor: string | undefined;
+ const limit = 100;
+ const maxPages = 50;
+ const base = getAppViewUrl(this.config.appViewUrl);
+
+ for (let page = 0; page < maxPages; page++) {
+ const params = new URLSearchParams();
+ params.set('list', listUri);
+ params.set('limit', String(limit));
+ if (cursor) params.set('cursor', cursor);
+
+ const res = await fetchWithTimeout(`${base}/xrpc/app.bsky.graph.getList?${params}`, {
+ headers: this.accessJwt ? { Authorization: `Bearer ${this.accessJwt}` } : undefined,
+ });
+
+ if (!res.ok) {
+ const detail = await res.text();
+ throw new Error(`getList failed: ${detail}`);
+ }
+
+ const data = await res.json() as {
+ cursor?: string;
+ items?: Array<{ subject?: { did?: string } }>;
+ };
+
+ const items = Array.isArray(data.items) ? data.items : [];
+ for (const item of items) {
+ const did = item?.subject?.did;
+ if (did) dids.push(did);
+ }
+
+ if (!data.cursor) break;
+ cursor = data.cursor;
+ if (page + 1 >= maxPages) {
+ log.warn(`fetchListDids: reached maxPages (${maxPages}) for list ${listUri}, truncating`);
+ }
+ }
+
+ return uniqueList(dids);
+ }
+
+ private startRuntimeWatcher(): void {
+ if (!this.runtimePath || this.runtimeTimer) return;
+ this.runtimeTimer = setInterval(() => {
+ this.checkRuntimeState().catch(err => {
+ log.error('Runtime check failed:', err);
+ });
+ }, 5000);
+ this.checkRuntimeState().catch(err => {
+ log.error('Runtime check failed:', err);
+ });
+ }
+
+ private async checkRuntimeState(): Promise {
+ if (!this.runtimePath) return;
+ if (!existsSync(this.runtimePath)) return;
+ let raw: { agents?: Record };
+ try {
+ raw = JSON.parse(readFileSync(this.runtimePath, 'utf-8'));
+ } catch {
+ log.warn('Failed to parse runtime state file, skipping');
+ return;
+ }
+
+ const agentKey = this.config.agentName || 'default';
+ const agentState = raw.agents?.[agentKey];
+ if (!agentState) return;
+
+ if (typeof agentState.disabled === 'boolean' && agentState.disabled !== this.runtimeDisabled) {
+ this.runtimeDisabled = agentState.disabled;
+ if (this.runtimeDisabled) {
+ this.pauseRuntime();
+ } else {
+ await this.resumeRuntime();
+ }
+ }
+
+ if (agentState.refreshListsAt && agentState.refreshListsAt !== this.lastRuntimeRefreshAt) {
+ this.lastRuntimeRefreshAt = agentState.refreshListsAt;
+ await this.expandLists();
+ if (!this.runtimeDisabled) {
+ this.reconnectJetstream();
+ }
+ }
+
+ if (agentState.reloadConfigAt && agentState.reloadConfigAt !== this.lastRuntimeReloadAt) {
+ this.lastRuntimeReloadAt = agentState.reloadConfigAt;
+ this.reloadConfig();
+ await this.expandLists();
+ if (!this.runtimeDisabled) {
+ this.reconnectJetstream();
+ }
+ }
+ }
+
+ private reloadConfig(): void {
+ try {
+ const nextConfig = loadConfig();
+ let nextBluesky: BlueskyConfig | undefined;
+ if (nextConfig.agents && nextConfig.agents.length > 0) {
+ const agent = nextConfig.agents.find(a => a.name === this.config.agentName);
+ nextBluesky = agent?.channels?.bluesky as BlueskyConfig | undefined;
+ } else {
+ nextBluesky = nextConfig.channels?.bluesky as BlueskyConfig | undefined;
+ }
+
+ if (!nextBluesky) {
+ log.warn('Config reload skipped (no bluesky config found).');
+ return;
+ }
+
+ this.config = {
+ ...this.config,
+ ...nextBluesky,
+ // Preserve env-var-sourced credentials if the new config doesn't supply them
+ handle: nextBluesky.handle ?? this.config.handle,
+ appPassword: nextBluesky.appPassword ?? this.config.appPassword,
+ agentName: this.config.agentName,
+ };
+ this.loadDidModes();
+ this.listModes = {};
+ this.maybeInitPostingIdentity().catch(err => {
+ log.warn('Posting identity init failed after reload:', err);
+ });
+ log.info('Config reloaded.');
+ } catch (err) {
+ log.warn('Config reload failed:', err);
+ }
+ }
+
+ private pauseRuntime(): void {
+ if (this.reconnectTimer) {
+ clearTimeout(this.reconnectTimer);
+ this.reconnectTimer = null;
+ }
+ if (this.ws) {
+ try {
+ this.ws.close();
+ } catch {
+ // ignore
+ }
+ this.ws = null;
+ }
+ if (this.notificationsTimer) {
+ clearInterval(this.notificationsTimer);
+ this.notificationsTimer = null;
+ }
+ log.info('Runtime disabled via kill switch.');
+ }
+
+ private async resumeRuntime(): Promise {
+ await this.expandLists();
+ this.startNotificationsPolling();
+ if (this.hasJetstreamTargets()) {
+ this.connect();
+ } else {
+ log.warn('Jetstream disabled (no wantedDids or list-expanded DIDs).');
+ }
+ log.info('Runtime re-enabled via kill switch.');
+ }
+
+ private reconnectJetstream(): void {
+ if (!this.hasJetstreamTargets()) {
+ log.warn('Jetstream reconnect skipped (no wantedDids or list-expanded DIDs).');
+ return;
+ }
+ if (this.ws) {
+ // Signal the close handler not to schedule its own reconnect β we're handling it below
+ this.intentionalClose = true;
+ try {
+ this.ws.close();
+ } catch {
+ // If close() throws the WebSocket may still close on its own, so reset the
+ // flag to let the close handler schedule a reconnect if that happens.
+ this.intentionalClose = false;
+ }
+ this.ws = null;
+ }
+ if (!this.runtimeDisabled) {
+ this.connect();
+ }
+ }
+
+ /**
+ * Parse text and generate facets for links, mentions, and hashtags.
+ * Uses an authenticated AtpAgent so that @mention handles resolve to DIDs.
+ * Falls back to unauthenticated detection if the session is unavailable.
+ */
+ private async parseFacets(text: string): Promise[]> {
+ if (this.accessJwt && this.refreshJwt && this.sessionDid) {
+ const agent = new AtpAgent({ service: this.getServiceUrl() });
+ const handle = this.handleByDid.get(this.sessionDid) ?? this.sessionDid;
+ await agent.resumeSession({
+ accessJwt: this.accessJwt,
+ refreshJwt: this.refreshJwt,
+ did: this.sessionDid,
+ handle,
+ active: true,
+ });
+ return parseFacets(text, agent);
+ }
+ return parseFacets(text);
+ }
+
+ private async resolveHandleForDid(did: string): Promise {
+ if (!did || did === 'unknown') return undefined;
+ const cached = this.handleByDid.get(did);
+ if (cached) return cached;
+
+ const lastFetched = this.lastHandleFetchAt.get(did);
+ if (lastFetched && Date.now() - lastFetched < this.handleFetchCooldownMs) {
+ return undefined;
+ }
+
+ const existing = this.handleFetchInFlight.get(did);
+ if (existing) return existing;
+
+ const promise = (async () => {
+ try {
+ const url = `${getAppViewUrl(this.config.appViewUrl)}/xrpc/app.bsky.actor.getProfile?actor=${encodeURIComponent(did)}`;
+ const headers: Record = {};
+
+ // Use authenticated endpoint if available for complete metadata
+ if (this.accessJwt && !this.isExpired(this.accessJwtExpiresAt)) {
+ headers['Authorization'] = `Bearer ${this.accessJwt}`;
+ }
+
+ const res = await fetchWithTimeout(url, { headers });
+ if (res.ok) {
+ const data = await res.json() as { handle?: string };
+ if (data.handle && typeof data.handle === 'string') {
+ this.handleByDid.set(did, data.handle);
+ pruneMap(this.handleByDid, HANDLE_CACHE_MAX);
+ return data.handle;
+ }
+ }
+ // Failed to resolve: apply cooldown to avoid hammering on repeated misses
+ this.lastHandleFetchAt.set(did, Date.now());
+ return undefined;
+ } catch {
+ // Network error: apply cooldown before retrying
+ this.lastHandleFetchAt.set(did, Date.now());
+ return undefined;
+ } finally {
+ this.handleFetchInFlight.delete(did);
+ }
+ })();
+
+ this.handleFetchInFlight.set(did, promise);
+ return promise;
+ }
+
+ private async processNotification(notification: {
+ uri: string;
+ cid?: string;
+ author?: { did: string; handle?: string; displayName?: string };
+ reason: string;
+ reasonSubject?: string;
+ record?: Record;
+ indexedAt?: string;
+ isRead?: boolean;
+ }): Promise {
+ const authorDid = notification.author?.did || 'unknown';
+ // Skip our own notifications to prevent self-reply loops
+ if (authorDid === this.sessionDid) return;
+
+ let authorHandle = notification.author?.handle;
+ if (authorDid && authorHandle) {
+ this.handleByDid.set(authorDid, authorHandle);
+ pruneMap(this.handleByDid, HANDLE_CACHE_MAX);
+ }
+ if (authorDid && !authorHandle) {
+ authorHandle = await this.resolveHandleForDid(authorDid);
+ if (authorHandle) {
+ this.handleByDid.set(authorDid, authorHandle);
+ pruneMap(this.handleByDid, HANDLE_CACHE_MAX);
+ }
+ }
+ const record = isRecord(notification.record) ? notification.record : undefined;
+ const recordType = record ? readString(record.$type) : undefined;
+ const timestamp = notification.indexedAt ? new Date(notification.indexedAt) : new Date();
+
+ const source: BlueskySource = {
+ uri: notification.uri,
+ cid: notification.cid,
+ };
+
+ const extraContext: Record = {};
+ extraContext['Operation'] = `notification ${notification.reason}`;
+ if (notification.reason) extraContext['NotificationReason'] = notification.reason;
+ if (authorHandle) {
+ extraContext['Handle'] = `@${authorHandle}`;
+ }
+ if (authorDid) {
+ extraContext['DID'] = authorDid;
+ }
+ if (notification.reasonSubject) {
+ extraContext['Subject'] = notification.reasonSubject;
+ }
+ if (notification.uri) {
+ extraContext['URI'] = notification.uri;
+ }
+
+ let postText = '';
+ if (recordType === 'app.bsky.feed.post' && record) {
+ const details = extractPostDetails(record);
+ postText = details.text || '';
+ if (details.createdAt) {
+ extraContext['Created'] = details.createdAt;
+ }
+ if (details.langs.length > 0) {
+ extraContext['Languages'] = details.langs.join(', ');
+ }
+ if (details.replyRefs.rootUri) {
+ extraContext['Thread root'] = details.replyRefs.rootUri;
+ }
+ if (details.replyRefs.parentUri) {
+ extraContext['Reply parent'] = details.replyRefs.parentUri;
+ }
+ if (details.embedLines.length > 0) {
+ extraContext['Embeds'] = details.embedLines.join(' | ');
+ }
+
+ if (details.replyRefs.rootUri) source.threadRootUri = details.replyRefs.rootUri;
+ if (details.replyRefs.rootCid) source.threadRootCid = details.replyRefs.rootCid;
+ if (details.replyRefs.parentUri) source.threadParentUri = details.replyRefs.parentUri;
+ if (details.replyRefs.parentCid) source.threadParentCid = details.replyRefs.parentCid;
+
+ const chatId = source.uri ?? authorDid;
+ this.lastPostByChatId.set(chatId, {
+ uri: notification.uri,
+ cid: notification.cid,
+ rootUri: source.threadRootUri ?? notification.uri,
+ rootCid: source.threadRootCid ?? notification.cid,
+ });
+ pruneMap(this.lastPostByChatId, LAST_POST_CACHE_MAX);
+ } else if (record) {
+ extraContext['Record'] = truncate(JSON.stringify(record));
+ }
+
+ const didMode = this.getDidMode(authorDid);
+ if (didMode === 'disabled') return;
+
+ const baseMsgId = notification.cid || notification.uri;
+ if (!baseMsgId) {
+ log.warn('Skipping notification with no cid or uri');
+ return;
+ }
+ // Cross-path dedup: if Jetstream already delivered this post (stored as bare CID), skip.
+ // This prevents double-delivery when both Jetstream and Notifications see the same post.
+ if (this.seenMessageIds.has(baseMsgId)) return;
+ // Within-notification dedup: use a reason-scoped key so the same post arriving with
+ // *different* reasons (e.g., "mention" and "reply") is delivered once per reason β
+ // each represents a distinct actionable event (mention vs. thread reply context).
+ const notificationMessageId = notification.reason ? `${notification.reason}:${baseMsgId}` : baseMsgId;
+ if (this.seenMessageIds.has(notificationMessageId)) return;
+
+ const actionable = notification.reason === 'mention'
+ || notification.reason === 'reply'
+ || notification.reason === 'quote';
+ const shouldReply = actionable
+ && recordType === 'app.bsky.feed.post'
+ && (didMode === 'open' || (didMode === 'mention-only' && notification.reason === 'mention'));
+
+ const chatId = source.uri ?? authorDid;
+ const inbound: BlueskyInboundMessage = {
+ channel: 'bluesky',
+ chatId,
+ userId: authorDid,
+ userHandle: authorHandle,
+ userName: authorHandle ? `@${authorHandle}` : undefined,
+ messageId: notification.cid || notification.uri,
+ text: postText,
+ timestamp,
+ messageType: 'public',
+ groupName: authorHandle ? `@${authorHandle}` : authorDid,
+ isListeningMode: !shouldReply,
+ source,
+ extraContext,
+ formatterHints: this.buildFormatterHints(shouldReply, didMode),
+ };
+
+ if (notificationMessageId) {
+ this.seenMessageIds.set(notificationMessageId, true);
+ pruneMap(this.seenMessageIds, SEEN_MESSAGE_IDS_MAX);
+ }
+ if (baseMsgId) {
+ this.seenBaseMessageIds.set(baseMsgId, true);
+ pruneMap(this.seenBaseMessageIds, SEEN_MESSAGE_IDS_MAX);
+ }
+ await this.onMessage?.(inbound);
+ }
+
+ private async createReply(text: string, target: { uri: string; cid?: string; rootUri?: string; rootCid?: string }, retried = false): Promise<{ uri?: string; cid?: string } | undefined> {
+ await this.ensureSession();
+ if (!this.accessJwt) throw new Error('[Bluesky] ensureSession() completed but accessJwt is not set.');
+ if (!this.sessionDid) throw new Error('[Bluesky] ensureSession() completed but sessionDid is not set.');
+
+ const rootUri = target.rootUri || target.uri;
+ const rootCid = target.rootCid || target.cid;
+ const parentUri = target.uri;
+ const parentCid = target.cid;
+
+ if (!rootUri || !rootCid || !parentUri || !parentCid) {
+ throw new Error('Missing reply root/parent metadata.');
+ }
+
+ // Parse facets for clickable links, mentions, hashtags
+ const facets = await this.parseFacets(text);
+
+ const record: Record = {
+ text,
+ createdAt: new Date().toISOString(),
+ reply: {
+ root: { uri: rootUri, cid: rootCid },
+ parent: { uri: parentUri, cid: parentCid },
+ },
+ };
+
+ // Add facets if any were detected (links, mentions, hashtags)
+ if (facets.length > 0) {
+ record.facets = facets;
+ }
+
+ const res = await fetchWithTimeout(`${this.getServiceUrl()}/xrpc/com.atproto.repo.createRecord`, {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ 'Authorization': `Bearer ${this.accessJwt}`,
+ },
+ body: JSON.stringify({
+ repo: this.sessionDid,
+ collection: 'app.bsky.feed.post',
+ record,
+ }),
+ });
+
+ if (res.status === 401) {
+ if (retried) throw new Error('[Bluesky] createReply: still unauthorized after re-auth.');
+ this.accessJwt = undefined;
+ this.sessionDid = undefined;
+ this.accessJwtExpiresAt = undefined;
+ await this.ensureSession();
+ return this.createReply(text, target, true);
+ }
+
+ if (!res.ok) {
+ const detail = await res.text();
+ throw new Error(`createRecord failed: ${detail}`);
+ }
+
+ const data = await res.json() as { uri?: string; cid?: string };
+ if (!data.cid && data.uri) {
+ data.cid = await this.resolveRecordCid(data.uri);
+ }
+ return data;
+ }
+
+ private async resolveRecordCid(uri: string): Promise {
+ const parsed = parseAtUri(uri);
+ if (!parsed) return undefined;
+
+ // Try PDS first (if on same server)
+ const qs = new URLSearchParams({
+ repo: parsed.did,
+ collection: parsed.collection,
+ rkey: parsed.rkey,
+ });
+ const res = await fetchWithTimeout(`${this.getServiceUrl()}/xrpc/com.atproto.repo.getRecord?${qs.toString()}`, {
+ headers: this.accessJwt ? { 'Authorization': `Bearer ${this.accessJwt}` } : undefined,
+ });
+ if (res.ok) {
+ const data = await res.json() as { cid?: string };
+ return data.cid;
+ }
+
+ // Fallback to AppView for cross-PDS records
+ try {
+ const appViewRes = await fetchWithTimeout(`${getAppViewUrl(this.config.appViewUrl)}/xrpc/app.bsky.feed.getPostThread?uri=${encodeURIComponent(uri)}`, {
+ headers: this.accessJwt ? { 'Authorization': `Bearer ${this.accessJwt}` } : undefined,
+ });
+ if (appViewRes.ok) {
+ const data = await appViewRes.json() as { thread?: { post?: { cid?: string } } };
+ return data.thread?.post?.cid;
+ }
+ } catch {
+ return undefined;
+ }
+
+ return undefined;
+ }
+
+ private loadState(): void {
+ if (!this.statePath || !this.config.agentName) return;
+ if (!existsSync(this.statePath)) return;
+ try {
+ const raw = JSON.parse(readFileSync(this.statePath, 'utf-8')) as {
+ version?: number;
+ agents?: Record;
+ };
+ const state = this.migrateState(raw);
+ const entry = state?.agents?.[this.config.agentName];
+ if (entry?.cursor !== undefined) {
+ this.lastCursor = entry.cursor;
+ }
+ // wantedDids and wantedCollections are NOT restored from state -- config is
+ // authoritative. State previously persisted these, but restoring them would
+ // silently override user edits to lettabot.yaml made while the bot was down.
+ // JWTs are not persisted; session DID and handle are non-secret and safe to store
+ if (entry?.auth?.did) {
+ this.sessionDid = entry.auth.did;
+ }
+ if (entry?.auth?.handle && entry?.auth?.did) {
+ this.handleByDid.set(entry.auth.did, entry.auth.handle);
+ }
+ if (entry?.notificationsCursor) {
+ this.notificationsCursor = entry.notificationsCursor;
+ this.notificationsInitialized = true;
+ }
+ } catch (err) {
+ log.warn('Failed to load cursor state:', err);
+ }
+ }
+
+ private migrateState(raw: { version?: number; agents?: Record } | null | undefined): {
+ version: number;
+ agents: Record;
+ } {
+ if (!raw || typeof raw !== 'object') {
+ return { version: STATE_VERSION, agents: {} };
+ }
+ // Accept any version; STATE_VERSION is written on next flush.
+ // Add version-specific migration logic here if the state shape ever changes.
+ return { version: STATE_VERSION, agents: raw.agents && typeof raw.agents === 'object' ? raw.agents : {} };
+ }
+
+ private startStateFlushTimer(): void {
+ if (!this.statePath || !this.config.agentName) return;
+ if (this.stateFlushTimer) return;
+ this.stateFlushTimer = setInterval(() => this.flushState(), STATE_FLUSH_INTERVAL_MS);
+ }
+
+ private flushState(): void {
+ if (!this.statePath || !this.config.agentName) return;
+ if (!this.stateDirty && this.lastCursor === undefined) return;
+
+ try {
+ mkdirSync(dirname(this.statePath), { recursive: true });
+ const existing = existsSync(this.statePath)
+ ? JSON.parse(readFileSync(this.statePath, 'utf-8'))
+ : {};
+ const agents = typeof existing.agents === 'object' && existing.agents
+ ? { ...existing.agents }
+ : {};
+ // Only persist non-secret session metadata; JWTs are re-acquired on startup
+ const auth = this.sessionDid
+ ? {
+ did: this.sessionDid,
+ handle: this.config.handle,
+ }
+ : undefined;
+
+ agents[this.config.agentName] = {
+ cursor: this.lastCursor,
+ auth,
+ notificationsCursor: this.notificationsCursor,
+ };
+ writeFileSync(this.statePath, JSON.stringify({
+ version: STATE_VERSION,
+ updatedAt: new Date().toISOString(),
+ agents,
+ }, null, 2), { mode: 0o600 });
+ this.stateDirty = false;
+ } catch (err) {
+ log.warn('Failed to persist cursor state:', err);
+ }
+ }
+}
diff --git a/src/channels/bluesky/cli.ts b/src/channels/bluesky/cli.ts
new file mode 100644
index 0000000..e250538
--- /dev/null
+++ b/src/channels/bluesky/cli.ts
@@ -0,0 +1,878 @@
+#!/usr/bin/env node
+/**
+ * lettabot-bluesky - Post, reply, like, or repost on Bluesky
+ *
+ * Usage:
+ * lettabot-bluesky post --text "Hello" --agent
+ * lettabot-bluesky post --text "Hello" --image data/outbound/photo.jpg --alt "Alt text" --agent
+ * lettabot-bluesky post --reply-to --text "Reply" --agent
+ * lettabot-bluesky post --text "Long..." --threaded --agent
+ * lettabot-bluesky like --agent
+ * lettabot-bluesky repost --agent
+ * lettabot-bluesky repost --text "Quote" --agent [--threaded]
+ * lettabot-bluesky profile --agent
+ * lettabot-bluesky thread --agent
+ * lettabot-bluesky author-feed --limit 25 --cursor --agent
+ * lettabot-bluesky list-feed --limit 25 --cursor --agent
+ * lettabot-bluesky search --query "..." --limit 25 --cursor --agent
+ * lettabot-bluesky notifications --limit 25 --cursor --reasons mention,reply --agent
+ * lettabot-bluesky block --agent
+ * lettabot-bluesky unblock --agent
+ * lettabot-bluesky mute --agent
+ * lettabot-bluesky unmute --agent
+ * lettabot-bluesky blocks --limit 50 --cursor --agent
+ * lettabot-bluesky mutes --limit 50 --cursor --agent
+ */
+
+import { readFileSync, statSync } from 'node:fs';
+import { resolve, extname, join } from 'node:path';
+import { loadAppConfigOrExit, normalizeAgents } from '../../config/index.js';
+import type { AgentConfig, BlueskyConfig } from '../../config/types.js';
+import { isPathAllowed } from '../../core/bot.js';
+import { createLogger } from '../../logger.js';
+import { DEFAULT_SERVICE_URL, POST_MAX_CHARS } from './constants.js';
+import { AtpAgent } from '@atproto/api';
+import { fetchWithTimeout, getAppViewUrl, parseAtUri, parseFacets, splitPostText } from './utils.js';
+
+const log = createLogger('Bluesky');
+
+// Bluesky supports JPEG, PNG, GIF, and WebP; max 976,560 bytes per image (AT Protocol lexicon limit)
+const BLUESKY_IMAGE_MIMES: Record = {
+ '.jpg': 'image/jpeg',
+ '.jpeg': 'image/jpeg',
+ '.png': 'image/png',
+ '.gif': 'image/gif',
+ '.webp': 'image/webp',
+};
+const BLUESKY_IMAGE_MAX_BYTES = 976_560;
+const BLUESKY_IMAGE_MAX_COUNT = 4;
+
+function usage(): void {
+ console.log(`\nUsage:\n lettabot-bluesky post --text "Hello" --agent \n lettabot-bluesky post --reply-to --text "Reply" --agent \n lettabot-bluesky post --text "Long..." --threaded --agent \n lettabot-bluesky like --agent \n lettabot-bluesky repost --agent \n lettabot-bluesky repost --text "Quote" --agent [--threaded]\n lettabot-bluesky profile --agent \n lettabot-bluesky thread --agent \n lettabot-bluesky author-feed --limit 25 --cursor --agent \n lettabot-bluesky list-feed --limit 25 --cursor --agent \n lettabot-bluesky search --query \"...\" --limit 25 --cursor --agent \n lettabot-bluesky notifications --limit 25 --cursor --reasons mention,reply --agent \n lettabot-bluesky block --agent \n lettabot-bluesky unblock --agent \n lettabot-bluesky mute --agent \n lettabot-bluesky unmute --agent \n lettabot-bluesky blocks --limit 50 --cursor --agent \n lettabot-bluesky mutes --limit 50 --cursor --agent \n`);
+}
+
+function resolveAgentConfig(agents: AgentConfig[], agentName?: string): AgentConfig {
+ if (agents.length === 0) {
+ throw new Error('No agents configured.');
+ }
+ if (agents.length === 1 && !agentName) {
+ return agents[0];
+ }
+ if (!agentName) {
+ throw new Error('Multiple agents configured. Use --agent .');
+ }
+ const exact = agents.find(agent => agent.name === agentName);
+ if (exact) return exact;
+ const lower = agentName.toLowerCase();
+ const found = agents.find(agent => agent.name.toLowerCase() === lower);
+ if (!found) throw new Error(`Agent not found: ${agentName}`);
+ return found;
+}
+
+function resolveBlueskyConfig(agent: AgentConfig): BlueskyConfig {
+ const config = agent.channels?.bluesky as BlueskyConfig | undefined;
+ if (!config || config.enabled === false) {
+ throw new Error(`Bluesky not configured for agent ${agent.name}.`);
+ }
+ if (!config.handle || !config.appPassword) {
+ throw new Error('Bluesky handle/app password not configured for this agent.');
+ }
+ return config;
+}
+
+
+async function createSession(serviceUrl: string, handle: string, appPassword: string): Promise<{ accessJwt: string; refreshJwt: string; handle: string; did: string }> {
+ const res = await fetchWithTimeout(`${serviceUrl}/xrpc/com.atproto.server.createSession`, {
+ method: 'POST',
+ headers: { 'Content-Type': 'application/json' },
+ body: JSON.stringify({ identifier: handle, password: appPassword }),
+ });
+ if (!res.ok) {
+ const detail = await res.text();
+ throw new Error(`createSession failed: ${detail}`);
+ }
+ const data = await res.json() as { accessJwt: string; refreshJwt: string; handle: string; did: string };
+ return data;
+}
+
+async function fetchJson(url: string, headers?: Record): Promise {
+ const res = await fetchWithTimeout(url, { headers });
+ if (!res.ok) {
+ const detail = await res.text();
+ throw new Error(`Request failed: ${detail}`);
+ }
+ return res.json();
+}
+
+async function postJson(url: string, body: Record, headers?: Record): Promise {
+ const res = await fetchWithTimeout(url, {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ ...(headers || {}),
+ },
+ body: JSON.stringify(body),
+ });
+ if (!res.ok) {
+ const detail = await res.text();
+ throw new Error(`Request failed: ${detail}`);
+ }
+ if (res.status === 204) return {};
+ return res.json();
+}
+
+async function getRecord(serviceUrl: string, accessJwt: string, uri: string): Promise<{ cid: string; value: Record }> {
+ const parsed = parseAtUri(uri);
+ if (!parsed) throw new Error(`Invalid at:// URI: ${uri}`);
+ const qs = new URLSearchParams({
+ repo: parsed.did,
+ collection: parsed.collection,
+ rkey: parsed.rkey,
+ });
+ const res = await fetchWithTimeout(`${serviceUrl}/xrpc/com.atproto.repo.getRecord?${qs.toString()}`, {
+ headers: { 'Authorization': `Bearer ${accessJwt}` },
+ });
+ if (!res.ok) {
+ const detail = await res.text();
+ throw new Error(`getRecord failed: ${detail}`);
+ }
+ const data = await res.json() as { cid?: string; value?: Record };
+ if (!data.cid || !data.value) throw new Error('getRecord missing cid/value');
+ return { cid: data.cid, value: data.value };
+}
+
+async function getPostFromAppView(appViewUrl: string, accessJwt: string, uri: string): Promise<{ cid: string; value: Record }> {
+ const res = await fetchWithTimeout(`${appViewUrl}/xrpc/app.bsky.feed.getPostThread?uri=${encodeURIComponent(uri)}`, {
+ headers: { 'Authorization': `Bearer ${accessJwt}` },
+ });
+ if (!res.ok) {
+ const detail = await res.text();
+ throw new Error(`getPostThread failed: ${detail}`);
+ }
+ const data = await res.json() as { thread?: { post?: { cid?: string; record?: Record } } };
+ const post = data.thread?.post;
+ if (!post?.cid || !post?.record) throw new Error('getPostThread missing cid/record');
+ return { cid: post.cid, value: post.record };
+}
+
+async function resolveHandleToDid(bluesky: BlueskyConfig, handle: string): Promise {
+ const appViewUrl = getAppViewUrl(bluesky.appViewUrl);
+ const url = `${appViewUrl}/xrpc/com.atproto.identity.resolveHandle?handle=${encodeURIComponent(handle)}`;
+ const data = await fetchJson(url) as { did?: string };
+ if (!data.did) throw new Error(`resolveHandle failed for ${handle}`);
+ return data.did;
+}
+
+const DID_PATTERN = /^did:[a-z]+:[a-zA-Z0-9._:-]+$/;
+
+async function resolveActorDid(bluesky: BlueskyConfig, actor: string): Promise {
+ if (actor.startsWith('did:')) {
+ if (!DID_PATTERN.test(actor)) throw new Error(`Invalid DID format: "${actor}"`);
+ return actor;
+ }
+ const did = await resolveHandleToDid(bluesky, actor);
+ if (!DID_PATTERN.test(did)) throw new Error(`Handle "${actor}" resolved to invalid DID: "${did}"`);
+ return did;
+}
+
+async function ensureCid(serviceUrl: string, appViewUrl: string, accessJwt: string, uri: string, cid?: string): Promise {
+ if (cid) return cid;
+ try {
+ const record = await getRecord(serviceUrl, accessJwt, uri);
+ return record.cid;
+ } catch (err) {
+ // Fallback to AppView if PDS lookup fails (e.g., cross-PDS records)
+ const record = await getPostFromAppView(appViewUrl, accessJwt, uri);
+ return record.cid;
+ }
+}
+
+async function createRecord(
+ serviceUrl: string,
+ accessJwt: string,
+ repo: string,
+ collection: string,
+ record: Record,
+): Promise<{ uri?: string; cid?: string }> {
+ const res = await fetchWithTimeout(`${serviceUrl}/xrpc/com.atproto.repo.createRecord`, {
+ method: 'POST',
+ headers: {
+ 'Content-Type': 'application/json',
+ 'Authorization': `Bearer ${accessJwt}`,
+ },
+ body: JSON.stringify({ repo, collection, record }),
+ });
+ if (!res.ok) {
+ const detail = await res.text();
+ throw new Error(`createRecord failed: ${detail}`);
+ }
+ return res.json() as Promise<{ uri?: string; cid?: string }>;
+}
+
+async function uploadBlob(
+ serviceUrl: string,
+ accessJwt: string,
+ filePath: string,
+ mimeType: string,
+): Promise<{ blob: unknown }> {
+ const data = readFileSync(filePath);
+ const res = await fetchWithTimeout(`${serviceUrl}/xrpc/com.atproto.repo.uploadBlob`, {
+ method: 'POST',
+ headers: {
+ Authorization: `Bearer ${accessJwt}`,
+ 'Content-Type': mimeType,
+ },
+ body: data,
+ });
+ if (!res.ok) {
+ const detail = await res.text();
+ throw new Error(`uploadBlob failed (${mimeType}): ${detail}`);
+ }
+ const result = await res.json() as { blob?: unknown };
+ if (!result.blob) throw new Error('uploadBlob response missing blob field');
+ return { blob: result.blob };
+}
+
+type ImageFeatures = { sendFileDir?: string; sendFileMaxSize?: number } | undefined;
+type ImageEntry = { path: string; alt: string };
+
+async function validateAndUploadImages(
+ serviceUrl: string,
+ accessJwt: string,
+ entries: ImageEntry[],
+ features: ImageFeatures,
+): Promise> {
+ if (entries.length === 0) return [];
+ if (entries.length > BLUESKY_IMAGE_MAX_COUNT) {
+ throw new Error(`Too many images: max ${BLUESKY_IMAGE_MAX_COUNT}, got ${entries.length}`);
+ }
+
+ const workingDir = process.cwd();
+ const allowedDir = resolve(workingDir, features?.sendFileDir ?? join('data', 'outbound'));
+ const maxSize = Math.min(features?.sendFileMaxSize ?? 50 * 1024 * 1024, BLUESKY_IMAGE_MAX_BYTES);
+
+ const result: Array<{ image: unknown; alt: string }> = [];
+ for (const { path: imagePath, alt } of entries) {
+ const resolvedPath = resolve(workingDir, imagePath);
+
+ if (!await isPathAllowed(resolvedPath, allowedDir)) {
+ throw new Error(`Image path is outside the allowed directory (${allowedDir}): ${imagePath}`);
+ }
+
+ let size: number;
+ try {
+ size = statSync(resolvedPath).size;
+ } catch {
+ throw new Error(`Image file not found or not readable: ${imagePath}`);
+ }
+
+ if (size > maxSize) {
+ throw new Error(`Image too large (${size} bytes, max ${maxSize}): ${imagePath}`);
+ }
+
+ const imageExt = extname(resolvedPath).toLowerCase();
+ const mimeType = BLUESKY_IMAGE_MIMES[imageExt];
+ if (!mimeType) {
+ throw new Error(
+ `Unsupported image type "${imageExt}" for ${imagePath}. Supported: ${Object.keys(BLUESKY_IMAGE_MIMES).join(', ')}`,
+ );
+ }
+
+ const { blob } = await uploadBlob(serviceUrl, accessJwt, resolvedPath, mimeType);
+ result.push({ image: blob, alt });
+ }
+ return result;
+}
+
+async function handlePost(
+ bluesky: BlueskyConfig,
+ features: ImageFeatures,
+ text: string,
+ replyTo?: string,
+ threaded = false,
+ images: ImageEntry[] = [],
+): Promise {
+ const serviceUrl = (bluesky.serviceUrl || DEFAULT_SERVICE_URL).replace(/\/+$/, '');
+ const appViewUrl = getAppViewUrl(bluesky.appViewUrl);
+ const session = await createSession(serviceUrl, bluesky.handle!, bluesky.appPassword!);
+ const charCount = [...new Intl.Segmenter().segment(text)].length;
+ if (charCount > POST_MAX_CHARS && !threaded) {
+ throw new Error(`Post is ${charCount} chars. Use --threaded to split into a thread.`);
+ }
+
+ const chunks = threaded ? splitPostText(text) : [text.trim()];
+ if (!chunks[0] || !chunks[0].trim()) {
+ throw new Error('Refusing to post empty text.');
+ }
+
+ if (images.length > 0 && threaded && chunks.length > 1) {
+ log.warn('Images will only be attached to the first post in the thread.');
+ }
+
+ const imageBlobs = await validateAndUploadImages(serviceUrl, session.accessJwt, images, features);
+
+ const agent = new AtpAgent({ service: serviceUrl });
+ await agent.resumeSession({ accessJwt: session.accessJwt, refreshJwt: session.refreshJwt, did: session.did, handle: session.handle, active: true });
+
+ let rootUri: string | undefined;
+ let rootCid: string | undefined;
+ let parentUri: string | undefined;
+ let parentCid: string | undefined;
+
+ if (replyTo) {
+ let parent;
+ try {
+ parent = await getRecord(serviceUrl, session.accessJwt, replyTo);
+ } catch (err) {
+ // Fallback to AppView if PDS lookup fails (e.g., cross-PDS records)
+ parent = await getPostFromAppView(appViewUrl, session.accessJwt, replyTo);
+ }
+ parentUri = replyTo;
+ parentCid = parent.cid;
+ const reply = (parent.value.reply as { root?: { uri?: string; cid?: string } } | undefined) || undefined;
+ rootUri = reply?.root?.uri || parentUri;
+ rootCid = reply?.root?.cid || parentCid;
+ }
+
+ const createdAt = new Date().toISOString();
+ let lastUri = '';
+
+ for (let i = 0; i < chunks.length; i++) {
+ const chunk = chunks[i];
+
+ // Parse facets for clickable links, mentions, hashtags
+ const facets = await parseFacets(chunk, agent);
+
+ const record: Record = {
+ text: chunk,
+ createdAt,
+ };
+
+ // Add facets if any were detected (links, mentions, hashtags)
+ if (facets.length > 0) {
+ record.facets = facets;
+ }
+
+ if (parentUri && parentCid && rootUri && rootCid) {
+ record.reply = {
+ root: { uri: rootUri, cid: rootCid },
+ parent: { uri: parentUri, cid: parentCid },
+ };
+ }
+
+ if (i === 0 && imageBlobs.length > 0) {
+ record.embed = { $type: 'app.bsky.embed.images', images: imageBlobs };
+ }
+
+ const created = await createRecord(serviceUrl, session.accessJwt, session.did, 'app.bsky.feed.post', record);
+ if (!created.uri) throw new Error('createRecord returned no uri');
+ lastUri = created.uri;
+
+ if (i === 0 && !replyTo && chunks.length > 1) {
+ rootUri = created.uri;
+ rootCid = await ensureCid(serviceUrl, appViewUrl, session.accessJwt, created.uri, created.cid);
+ parentUri = rootUri;
+ parentCid = rootCid;
+ } else if (i < chunks.length - 1) {
+ parentUri = created.uri;
+ parentCid = await ensureCid(serviceUrl, appViewUrl, session.accessJwt, created.uri, created.cid);
+ if (!rootUri || !rootCid) {
+ rootUri = parentUri;
+ rootCid = parentCid;
+ }
+ }
+ }
+
+ console.log(`β Posted: ${lastUri}`);
+}
+
+async function handleQuote(
+ bluesky: BlueskyConfig,
+ targetUri: string,
+ text: string,
+ threaded = false,
+): Promise {
+ const serviceUrl = (bluesky.serviceUrl || DEFAULT_SERVICE_URL).replace(/\/+$/, '');
+ const appViewUrl = getAppViewUrl(bluesky.appViewUrl);
+ const session = await createSession(serviceUrl, bluesky.handle!, bluesky.appPassword!);
+ const charCount = [...new Intl.Segmenter().segment(text)].length;
+ if (charCount > POST_MAX_CHARS && !threaded) {
+ throw new Error(`Post is ${charCount} chars. Use --threaded to split into a thread.`);
+ }
+
+ let target;
+ try {
+ target = await getRecord(serviceUrl, session.accessJwt, targetUri);
+ } catch (err) {
+ // Fallback to AppView if PDS lookup fails (e.g., cross-PDS records)
+ target = await getPostFromAppView(appViewUrl, session.accessJwt, targetUri);
+ }
+ const chunks = threaded ? splitPostText(text) : [text.trim()];
+ if (!chunks[0] || !chunks[0].trim()) {
+ throw new Error('Refusing to post empty text.');
+ }
+
+ const agent = new AtpAgent({ service: serviceUrl });
+ await agent.resumeSession({ accessJwt: session.accessJwt, refreshJwt: session.refreshJwt, did: session.did, handle: session.handle, active: true });
+
+ let rootUri: string | undefined;
+ let rootCid: string | undefined;
+ let parentUri: string | undefined;
+ let parentCid: string | undefined;
+ const createdAt = new Date().toISOString();
+
+ let lastUri = '';
+ for (let i = 0; i < chunks.length; i++) {
+ const chunk = chunks[i];
+
+ // Parse facets for clickable links, mentions, hashtags
+ const facets = await parseFacets(chunk, agent);
+
+ const record: Record = {
+ text: chunk,
+ createdAt,
+ };
+
+ // Add facets if any were detected (links, mentions, hashtags)
+ if (facets.length > 0) {
+ record.facets = facets;
+ }
+
+ if (i === 0) {
+ record.embed = {
+ $type: 'app.bsky.embed.record',
+ record: {
+ uri: targetUri,
+ cid: target.cid,
+ },
+ };
+ }
+
+ if (parentUri && parentCid && rootUri && rootCid) {
+ record.reply = {
+ root: { uri: rootUri, cid: rootCid },
+ parent: { uri: parentUri, cid: parentCid },
+ };
+ }
+
+ const created = await createRecord(serviceUrl, session.accessJwt, session.did, 'app.bsky.feed.post', record);
+ if (!created.uri) throw new Error('createRecord returned no uri');
+ lastUri = created.uri;
+
+ if (i === 0 && chunks.length > 1) {
+ rootUri = created.uri;
+ rootCid = await ensureCid(serviceUrl, appViewUrl, session.accessJwt, created.uri, created.cid);
+ parentUri = rootUri;
+ parentCid = rootCid;
+ } else if (i < chunks.length - 1) {
+ parentUri = created.uri;
+ parentCid = await ensureCid(serviceUrl, appViewUrl, session.accessJwt, created.uri, created.cid);
+ if (!rootUri || !rootCid) {
+ rootUri = parentUri;
+ rootCid = parentCid;
+ }
+ }
+ }
+
+ console.log(`β Quoted: ${lastUri}`);
+}
+
+async function handleSubjectRecord(
+ bluesky: BlueskyConfig,
+ uri: string,
+ collection: 'app.bsky.feed.like' | 'app.bsky.feed.repost',
+): Promise {
+ const serviceUrl = (bluesky.serviceUrl || DEFAULT_SERVICE_URL).replace(/\/+$/, '');
+ const appViewUrl = getAppViewUrl(bluesky.appViewUrl);
+ const session = await createSession(serviceUrl, bluesky.handle!, bluesky.appPassword!);
+
+ let record;
+ try {
+ record = await getRecord(serviceUrl, session.accessJwt, uri);
+ } catch (err) {
+ // Fallback to AppView if PDS lookup fails (e.g., cross-PDS records)
+ record = await getPostFromAppView(appViewUrl, session.accessJwt, uri);
+ }
+
+ const createdAt = new Date().toISOString();
+ const res = await createRecord(serviceUrl, session.accessJwt, session.did, collection, {
+ subject: { uri, cid: record.cid },
+ createdAt,
+ });
+
+ if (!res.uri) throw new Error('createRecord returned no uri');
+ console.log(`β ${collection === 'app.bsky.feed.like' ? 'Liked' : 'Reposted'}: ${uri}`);
+}
+
+async function handleMute(bluesky: BlueskyConfig, actor: string, muted: boolean): Promise {
+ const serviceUrl = (bluesky.serviceUrl || DEFAULT_SERVICE_URL).replace(/\/+$/, '');
+ const session = await createSession(serviceUrl, bluesky.handle!, bluesky.appPassword!);
+ const did = await resolveActorDid(bluesky, actor);
+ const endpoint = muted ? 'app.bsky.graph.muteActor' : 'app.bsky.graph.unmuteActor';
+ await postJson(`${serviceUrl}/xrpc/${endpoint}`, { actor: did }, { 'Authorization': `Bearer ${session.accessJwt}` });
+ console.log(`β ${muted ? 'Muted' : 'Unmuted'}: ${did}`);
+}
+
+async function handleBlock(bluesky: BlueskyConfig, actor: string): Promise {
+ const serviceUrl = (bluesky.serviceUrl || DEFAULT_SERVICE_URL).replace(/\/+$/, '');
+ const session = await createSession(serviceUrl, bluesky.handle!, bluesky.appPassword!);
+ const did = await resolveActorDid(bluesky, actor);
+ const createdAt = new Date().toISOString();
+ const res = await createRecord(serviceUrl, session.accessJwt, session.did, 'app.bsky.graph.block', {
+ subject: did,
+ createdAt,
+ });
+ if (!res.uri) throw new Error('createRecord returned no uri');
+ console.log(`β Blocked: ${did} (${res.uri})`);
+}
+
+async function handleUnblock(bluesky: BlueskyConfig, blockUri: string): Promise {
+ const parsed = parseAtUri(blockUri);
+ if (!parsed || parsed.collection !== 'app.bsky.graph.block') {
+ throw new Error('unblock requires a block record URI (at://.../app.bsky.graph.block/...)');
+ }
+ const serviceUrl = (bluesky.serviceUrl || DEFAULT_SERVICE_URL).replace(/\/+$/, '');
+ const session = await createSession(serviceUrl, bluesky.handle!, bluesky.appPassword!);
+ await postJson(
+ `${serviceUrl}/xrpc/com.atproto.repo.deleteRecord`,
+ {
+ repo: session.did,
+ collection: parsed.collection,
+ rkey: parsed.rkey,
+ },
+ { 'Authorization': `Bearer ${session.accessJwt}` },
+ );
+ console.log(`β Unblocked: ${blockUri}`);
+}
+
+async function handleReadCommand(
+ bluesky: BlueskyConfig,
+ command: string,
+ uriArg: string,
+ query: string,
+ cursor: string,
+ limit?: number,
+ reasons?: string[],
+ priority?: boolean,
+): Promise {
+ const appViewUrl = getAppViewUrl(bluesky.appViewUrl);
+ const serviceUrl = (bluesky.serviceUrl || DEFAULT_SERVICE_URL).replace(/\/+$/, '');
+ const effectiveLimit = limit ?? 25;
+
+ if (command === 'resolve') {
+ if (!uriArg) throw new Error('Missing handle');
+ const url = `${appViewUrl}/xrpc/com.atproto.identity.resolveHandle?handle=${encodeURIComponent(uriArg)}`;
+ const data = await fetchJson(url);
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'profile') {
+ if (!uriArg) throw new Error('Missing actor');
+ const url = `${appViewUrl}/xrpc/app.bsky.actor.getProfile?actor=${encodeURIComponent(uriArg)}`;
+ const data = await fetchJson(url);
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'thread') {
+ if (!uriArg) throw new Error('Missing post URI');
+ const url = `${appViewUrl}/xrpc/app.bsky.feed.getPostThread?uri=${encodeURIComponent(uriArg)}`;
+ const data = await fetchJson(url);
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'author-feed') {
+ if (!uriArg) throw new Error('Missing actor');
+ const qs = new URLSearchParams();
+ qs.set('actor', uriArg);
+ qs.set('limit', String(effectiveLimit));
+ if (cursor) qs.set('cursor', cursor);
+ const url = `${appViewUrl}/xrpc/app.bsky.feed.getAuthorFeed?${qs.toString()}`;
+ const data = await fetchJson(url);
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'list-feed') {
+ if (!uriArg) throw new Error('Missing list URI');
+ const qs = new URLSearchParams();
+ qs.set('list', uriArg);
+ qs.set('limit', String(effectiveLimit));
+ if (cursor) qs.set('cursor', cursor);
+ const url = `${appViewUrl}/xrpc/app.bsky.feed.getListFeed?${qs.toString()}`;
+ const data = await fetchJson(url);
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'actor-feeds') {
+ if (!uriArg) throw new Error('Missing actor');
+ const qs = new URLSearchParams();
+ qs.set('actor', uriArg);
+ qs.set('limit', String(effectiveLimit));
+ if (cursor) qs.set('cursor', cursor);
+ const url = `${appViewUrl}/xrpc/app.bsky.feed.getActorFeeds?${qs.toString()}`;
+ const data = await fetchJson(url);
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'followers') {
+ if (!uriArg) throw new Error('Missing actor');
+ const qs = new URLSearchParams();
+ qs.set('actor', uriArg);
+ qs.set('limit', String(effectiveLimit));
+ if (cursor) qs.set('cursor', cursor);
+ const url = `${appViewUrl}/xrpc/app.bsky.graph.getFollowers?${qs.toString()}`;
+ const data = await fetchJson(url);
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'follows') {
+ if (!uriArg) throw new Error('Missing actor');
+ const qs = new URLSearchParams();
+ qs.set('actor', uriArg);
+ qs.set('limit', String(effectiveLimit));
+ if (cursor) qs.set('cursor', cursor);
+ const url = `${appViewUrl}/xrpc/app.bsky.graph.getFollows?${qs.toString()}`;
+ const data = await fetchJson(url);
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'lists') {
+ if (!uriArg) throw new Error('Missing actor');
+ const qs = new URLSearchParams();
+ qs.set('actor', uriArg);
+ qs.set('limit', String(effectiveLimit));
+ if (cursor) qs.set('cursor', cursor);
+ const url = `${appViewUrl}/xrpc/app.bsky.graph.getLists?${qs.toString()}`;
+ const data = await fetchJson(url);
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'search' || command === 'timeline' || command === 'notifications' || command === 'blocks' || command === 'mutes') {
+ const session = await createSession(serviceUrl, bluesky.handle!, bluesky.appPassword!);
+ if (command === 'search') {
+ if (!query) throw new Error('Missing query');
+ const qs = new URLSearchParams();
+ qs.set('q', query);
+ qs.set('limit', String(effectiveLimit));
+ if (cursor) qs.set('cursor', cursor);
+ const url = `${serviceUrl}/xrpc/app.bsky.feed.searchPosts?${qs.toString()}`;
+ const data = await fetchJson(url, { 'Authorization': `Bearer ${session.accessJwt}` });
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'timeline') {
+ const qs = new URLSearchParams();
+ qs.set('limit', String(effectiveLimit));
+ if (cursor) qs.set('cursor', cursor);
+ const url = `${serviceUrl}/xrpc/app.bsky.feed.getTimeline?${qs.toString()}`;
+ const data = await fetchJson(url, { 'Authorization': `Bearer ${session.accessJwt}` });
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'notifications') {
+ const qs = new URLSearchParams();
+ qs.set('limit', String(effectiveLimit));
+ if (cursor) qs.set('cursor', cursor);
+ if (priority) qs.set('priority', 'true');
+ if (reasons && reasons.length > 0) {
+ for (const reason of reasons) qs.append('reasons', reason);
+ }
+ const url = `${serviceUrl}/xrpc/app.bsky.notification.listNotifications?${qs.toString()}`;
+ const data = await fetchJson(url, { 'Authorization': `Bearer ${session.accessJwt}` });
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'blocks') {
+ const qs = new URLSearchParams();
+ qs.set('limit', String(effectiveLimit));
+ if (cursor) qs.set('cursor', cursor);
+ const url = `${serviceUrl}/xrpc/app.bsky.graph.getBlocks?${qs.toString()}`;
+ const data = await fetchJson(url, { 'Authorization': `Bearer ${session.accessJwt}` });
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+
+ if (command === 'mutes') {
+ const qs = new URLSearchParams();
+ qs.set('limit', String(effectiveLimit));
+ if (cursor) qs.set('cursor', cursor);
+ const url = `${serviceUrl}/xrpc/app.bsky.graph.getMutes?${qs.toString()}`;
+ const data = await fetchJson(url, { 'Authorization': `Bearer ${session.accessJwt}` });
+ console.log(JSON.stringify(data, null, 2));
+ return;
+ }
+ }
+
+ throw new Error(`Unknown read command: ${command}`);
+}
+
+async function main(): Promise {
+ const args = process.argv.slice(2);
+ const command = args.shift();
+ if (!command || command === 'help' || command === '--help' || command === '-h') {
+ usage();
+ process.exit(command ? 0 : 1);
+ }
+
+ let agentName = '';
+ let text = '';
+ let replyTo = '';
+ let threaded = false;
+ let uriArg = '';
+ let actor = '';
+ let blockUri = '';
+ let cursor = '';
+ let query = '';
+ let limit: number | undefined;
+ let reasons: string[] | undefined;
+ let priority = false;
+ const imageEntries: ImageEntry[] = [];
+
+ for (let i = 0; i < args.length; i++) {
+ const arg = args[i];
+ const next = args[i + 1];
+
+ if (arg === '--agent' && next) {
+ agentName = next;
+ i++;
+ } else if ((arg === '--text' || arg === '-t') && next) {
+ text = next;
+ i++;
+ } else if ((arg === '--query' || arg === '-q') && next) {
+ query = next;
+ i++;
+ } else if (arg === '--reply-to' && next) {
+ replyTo = next;
+ i++;
+ } else if (arg === '--actor' && next) {
+ actor = next;
+ i++;
+ } else if (arg === '--block-uri' && next) {
+ blockUri = next;
+ i++;
+ } else if (arg === '--cursor' && next) {
+ cursor = next;
+ i++;
+ } else if (arg === '--threaded') {
+ threaded = true;
+ } else if (arg === '--limit' && next) {
+ const parsed = parseInt(next, 10);
+ if (!Number.isNaN(parsed)) limit = parsed;
+ i++;
+ } else if (arg === '--reasons' && next) {
+ reasons = next.split(',').map(v => v.trim()).filter(Boolean);
+ i++;
+ } else if (arg === '--priority') {
+ priority = true;
+ } else if ((arg === '--image' || arg === '-i') && next) {
+ imageEntries.push({ path: next, alt: '' });
+ i++;
+ } else if ((arg === '--alt' || arg === '-a') && next) {
+ if (imageEntries.length === 0) throw new Error('--alt requires a preceding --image');
+ imageEntries[imageEntries.length - 1].alt = next;
+ i++;
+ } else if (arg === '--quote' && next) {
+ uriArg = next;
+ i++;
+ } else if (!arg.startsWith('-') && !uriArg) {
+ uriArg = arg;
+ } else {
+ log.warn(`Unknown arg: ${arg}`);
+ }
+ }
+
+ const config = loadAppConfigOrExit();
+ const agents = normalizeAgents(config);
+ const agent = resolveAgentConfig(agents, agentName || undefined);
+ const bluesky = resolveBlueskyConfig(agent);
+
+ if (command === 'post') {
+ if (!text) throw new Error('Missing --text');
+ await handlePost(bluesky, agent.features, text, replyTo || undefined, threaded, imageEntries);
+ return;
+ }
+
+ if (command === 'like') {
+ if (!uriArg) throw new Error('Missing post URI');
+ await handleSubjectRecord(bluesky, uriArg, 'app.bsky.feed.like');
+ return;
+ }
+
+ if (command === 'repost') {
+ if (!uriArg) throw new Error('Missing post URI');
+ if (text) {
+ await handleQuote(bluesky, uriArg, text, threaded);
+ } else {
+ await handleSubjectRecord(bluesky, uriArg, 'app.bsky.feed.repost');
+ }
+ return;
+ }
+
+ if (command === 'mute') {
+ const target = actor || uriArg;
+ if (!target) throw new Error('Missing actor');
+ await handleMute(bluesky, target, true);
+ return;
+ }
+
+ if (command === 'unmute') {
+ const target = actor || uriArg;
+ if (!target) throw new Error('Missing actor');
+ await handleMute(bluesky, target, false);
+ return;
+ }
+
+ if (command === 'block') {
+ const target = actor || uriArg;
+ if (!target) throw new Error('Missing actor');
+ await handleBlock(bluesky, target);
+ return;
+ }
+
+ if (command === 'unblock') {
+ const target = blockUri || uriArg;
+ if (!target) throw new Error('Missing block URI');
+ await handleUnblock(bluesky, target);
+ return;
+ }
+
+ if ([
+ 'resolve',
+ 'profile',
+ 'thread',
+ 'author-feed',
+ 'list-feed',
+ 'actor-feeds',
+ 'followers',
+ 'follows',
+ 'lists',
+ 'search',
+ 'timeline',
+ 'notifications',
+ 'blocks',
+ 'mutes',
+ ].includes(command)) {
+ await handleReadCommand(bluesky, command, uriArg, query, cursor, limit, reasons, priority);
+ return;
+ }
+
+ console.error(`Unknown command: ${command}`);
+ usage();
+ process.exit(1);
+}
+
+main().catch((err) => {
+ console.error(err instanceof Error ? err.message : err);
+ process.exit(1);
+});
diff --git a/src/channels/bluesky/constants.ts b/src/channels/bluesky/constants.ts
new file mode 100644
index 0000000..58ee7ec
--- /dev/null
+++ b/src/channels/bluesky/constants.ts
@@ -0,0 +1,15 @@
+export const DEFAULT_JETSTREAM_URL = 'wss://jetstream2.us-east.bsky.network/subscribe';
+export const RECONNECT_BASE_MS = 1000;
+export const RECONNECT_MAX_MS = 60000;
+export const CURSOR_BACKTRACK_US = 5_000_000; // 5 seconds
+export const STATE_FILENAME = 'bluesky-jetstream.json';
+export const STATE_FLUSH_INTERVAL_MS = 10_000;
+export const DEFAULT_SERVICE_URL = 'https://bsky.social';
+export const DEFAULT_APPVIEW_URL = 'https://public.api.bsky.app';
+export const POST_MAX_CHARS = 300;
+export const DEFAULT_NOTIFICATIONS_INTERVAL_SEC = 60;
+export const DEFAULT_NOTIFICATIONS_LIMIT = 50;
+export const HANDLE_CACHE_MAX = 10_000;
+export const LAST_POST_CACHE_MAX = 5_000;
+export const SEEN_MESSAGE_IDS_MAX = 5_000;
+export const STATE_VERSION = 1;
diff --git a/src/channels/bluesky/formatter.ts b/src/channels/bluesky/formatter.ts
new file mode 100644
index 0000000..94e8af3
--- /dev/null
+++ b/src/channels/bluesky/formatter.ts
@@ -0,0 +1,98 @@
+import { isRecord, readString, readStringArray, truncate } from './utils.js';
+
+export function parseReplyRefs(record: Record): {
+ rootUri?: string;
+ rootCid?: string;
+ parentUri?: string;
+ parentCid?: string;
+} {
+ const reply = isRecord(record.reply) ? record.reply : undefined;
+ if (!reply) return {};
+ const root = isRecord(reply.root) ? reply.root : undefined;
+ const parent = isRecord(reply.parent) ? reply.parent : undefined;
+ return {
+ rootUri: readString(root?.uri),
+ rootCid: readString(root?.cid),
+ parentUri: readString(parent?.uri),
+ parentCid: readString(parent?.cid),
+ };
+}
+
+export function extractPostDetails(record: Record): {
+ text?: string;
+ createdAt?: string;
+ langs: string[];
+ replyRefs: ReturnType;
+ embedLines: string[];
+} {
+ const text = readString(record.text)?.trim();
+ const createdAt = readString(record.createdAt);
+ const langs = readStringArray(record.langs);
+ const replyRefs = parseReplyRefs(record);
+ const embedLines = summarizeEmbed(record.embed);
+ return { text, createdAt, langs, replyRefs, embedLines };
+}
+
+export function summarizeEmbed(embed: unknown): string[] {
+ if (!isRecord(embed)) return [];
+
+ const embedType = readString(embed.$type);
+ const lines: string[] = [];
+
+ if (embedType === 'app.bsky.embed.images') {
+ const images = Array.isArray(embed.images) ? embed.images : [];
+ const altTexts = images
+ .map((img) => (isRecord(img) ? readString(img.alt) : undefined))
+ .filter((alt): alt is string => !!alt && alt.trim().length > 0);
+ const summary = `Embed: ${images.length} image(s)`;
+ if (altTexts.length > 0) {
+ lines.push(`${summary} (alt: ${truncate(altTexts[0], 120)})`);
+ } else {
+ lines.push(summary);
+ }
+ return lines;
+ }
+
+ if (embedType === 'app.bsky.embed.external') {
+ const external = isRecord(embed.external) ? embed.external : undefined;
+ const title = external ? readString(external.title) : undefined;
+ const uri = external ? readString(external.uri) : undefined;
+ const description = external ? readString(external.description) : undefined;
+ const titlePart = title ? ` "${truncate(title, 160)}"` : '';
+ const uriPart = uri ? ` ${uri}` : '';
+ lines.push(`Embed: link${titlePart}${uriPart}`);
+ if (description) {
+ lines.push(`Embed description: ${truncate(description, 240)}`);
+ }
+ return lines;
+ }
+
+ if (embedType === 'app.bsky.embed.record') {
+ const record = isRecord(embed.record) ? embed.record : undefined;
+ const uri = record ? readString(record.uri) : undefined;
+ if (uri) {
+ lines.push(`Embed: record ${uri}`);
+ } else {
+ lines.push('Embed: record');
+ }
+ return lines;
+ }
+
+ if (embedType === 'app.bsky.embed.recordWithMedia') {
+ const record = isRecord(embed.record) ? embed.record : undefined;
+ const uri = record ? readString(record.uri) : undefined;
+ if (uri) {
+ lines.push(`Embed: record ${uri}`);
+ } else {
+ lines.push('Embed: record');
+ }
+ lines.push(...summarizeEmbed(embed.media));
+ return lines;
+ }
+
+ if (embedType) {
+ lines.push(`Embed: ${embedType}`);
+ }
+
+ return lines;
+}
diff --git a/src/channels/bluesky/index.ts b/src/channels/bluesky/index.ts
new file mode 100644
index 0000000..0408d4d
--- /dev/null
+++ b/src/channels/bluesky/index.ts
@@ -0,0 +1,2 @@
+export { BlueskyAdapter } from './adapter.js';
+export type { BlueskyConfig, JetstreamEvent, JetstreamCommit, DidMode } from './types.js';
diff --git a/src/channels/bluesky/types.ts b/src/channels/bluesky/types.ts
new file mode 100644
index 0000000..a45bcee
--- /dev/null
+++ b/src/channels/bluesky/types.ts
@@ -0,0 +1,69 @@
+import type { InboundMessage } from '../../core/types.js';
+
+export type DidMode = 'open' | 'listen' | 'mention-only' | 'disabled';
+
+/**
+ * AT Protocol source identifiers attached to Bluesky inbound messages.
+ * Used by the adapter to construct replies, reposts, and likes.
+ */
+export interface BlueskySource {
+ uri?: string;
+ collection?: string;
+ cid?: string;
+ rkey?: string;
+ threadRootUri?: string;
+ threadParentUri?: string;
+ threadRootCid?: string;
+ threadParentCid?: string;
+ subjectUri?: string;
+ subjectCid?: string;
+}
+
+/**
+ * Bluesky-specific inbound message carrying AT Protocol source metadata
+ * and display context. Extends InboundMessage without polluting the core type.
+ */
+export interface BlueskyInboundMessage extends InboundMessage {
+ source?: BlueskySource;
+ extraContext?: Record;
+}
+
+export interface BlueskyConfig {
+ enabled?: boolean;
+ agentName?: string;
+ jetstreamUrl?: string;
+ wantedDids?: string[] | string;
+ wantedCollections?: string[] | string;
+ cursor?: number;
+ handle?: string;
+ appPassword?: string;
+ serviceUrl?: string;
+ appViewUrl?: string;
+ groups?: Record;
+ lists?: Record;
+ notifications?: {
+ enabled?: boolean;
+ intervalSec?: number;
+ limit?: number;
+ priority?: boolean;
+ reasons?: string[] | string;
+ backfill?: boolean;
+ };
+}
+
+export interface JetstreamCommit {
+ operation?: string;
+ collection?: string;
+ rkey?: string;
+ cid?: string;
+ record?: Record;
+}
+
+export interface JetstreamEvent {
+ kind?: string;
+ did?: string;
+ time_us?: number;
+ commit?: JetstreamCommit;
+ identity?: { handle?: string };
+ account?: { handle?: string };
+}
diff --git a/src/channels/bluesky/utils.ts b/src/channels/bluesky/utils.ts
new file mode 100644
index 0000000..57ea71a
--- /dev/null
+++ b/src/channels/bluesky/utils.ts
@@ -0,0 +1,172 @@
+export function normalizeList(value?: string[] | string): string[] {
+ if (!value) return [];
+ if (Array.isArray(value)) return value.map(v => v.trim()).filter(Boolean);
+ return value.split(',').map(v => v.trim()).filter(Boolean);
+}
+
+export function uniqueList(values: string[]): string[] {
+ const seen = new Set();
+ const result: string[] = [];
+ for (const value of values) {
+ if (!value || seen.has(value)) continue;
+ seen.add(value);
+ result.push(value);
+ }
+ return result;
+}
+
+export function truncate(value: string, max = 2000): string {
+ if (value.length <= max) return value;
+ return value.slice(0, max) + '...';
+}
+
+export function pruneMap(map: Map, max: number): void {
+ while (map.size > max) {
+ const oldest = map.keys().next().value;
+ if (!oldest) break;
+ map.delete(oldest);
+ }
+}
+
+export function buildAtUri(did?: string, collection?: string, rkey?: string): string | undefined {
+ if (!did || !collection || !rkey) return undefined;
+ return `at://${did}/${collection}/${rkey}`;
+}
+
+export function isRecord(value: unknown): value is Record {
+ return !!value && typeof value === 'object' && !Array.isArray(value);
+}
+
+export function readString(value: unknown): string | undefined {
+ return typeof value === 'string' ? value : undefined;
+}
+
+export function readStringArray(value: unknown): string[] {
+ return Array.isArray(value)
+ ? value.map(v => (typeof v === 'string' ? v.trim() : '')).filter(Boolean)
+ : [];
+}
+
+/** Default timeout for Bluesky API calls (15 seconds) */
+export const FETCH_TIMEOUT_MS = 15_000;
+
+/**
+ * fetch() wrapper with an AbortController timeout.
+ * Throws on timeout just like a network error.
+ */
+export async function fetchWithTimeout(
+ url: string,
+ init: RequestInit = {},
+ timeoutMs = FETCH_TIMEOUT_MS
+): Promise {
+ const controller = new AbortController();
+ const timer = setTimeout(() => controller.abort(), timeoutMs);
+ try {
+ return await fetch(url, { ...init, signal: controller.signal });
+ } finally {
+ clearTimeout(timer);
+ }
+}
+
+export function parseAtUri(uri: string): { did: string; collection: string; rkey: string } | undefined {
+ if (!uri.startsWith('at://')) return undefined;
+ const parts = uri.slice('at://'.length).split('/');
+ if (parts.length < 3) return undefined;
+ return { did: parts[0], collection: parts[1], rkey: parts[2] };
+}
+
+export function getAppViewUrl(appViewUrl?: string, defaultUrl = 'https://public.api.bsky.app'): string {
+ return (appViewUrl || defaultUrl).replace(/\/+$/, '');
+}
+
+export function splitPostText(text: string, maxChars = 300): string[] {
+ const segmenter = new Intl.Segmenter();
+ const graphemes = [...segmenter.segment(text)].map(s => s.segment);
+ if (graphemes.length === 0) return [];
+ if (graphemes.length <= maxChars) {
+ const trimmed = text.trim();
+ return trimmed ? [trimmed] : [];
+ }
+
+ const chunks: string[] = [];
+ let start = 0;
+
+ while (start < graphemes.length) {
+ let end = Math.min(start + maxChars, graphemes.length);
+
+ if (end < graphemes.length) {
+ let split = end;
+ for (let i = end - 1; i > start; i--) {
+ if (/\s/.test(graphemes[i])) {
+ split = i;
+ break;
+ }
+ }
+ end = split > start ? split : end;
+ }
+
+ let chunk = graphemes.slice(start, end).join('');
+ chunk = chunk.replace(/^\s+/, '').replace(/\s+$/, '');
+ if (chunk) chunks.push(chunk);
+
+ start = end;
+ while (start < graphemes.length && /\s/.test(graphemes[start])) {
+ start++;
+ }
+ }
+
+ return chunks;
+}
+
+import { AtpAgent, RichText } from '@atproto/api';
+
+/**
+ * Parse text and generate AT Protocol facets (links, mentions, hashtags).
+ * When an authenticated agent is provided, @mention handles are resolved to DIDs.
+ * Without an agent, links and hashtags work but mentions won't have DIDs.
+ */
+export async function parseFacets(text: string, agent?: AtpAgent): Promise[]> {
+ const rt = new RichText({ text });
+ if (agent) {
+ await rt.detectFacets(agent);
+ } else {
+ rt.detectFacetsWithoutResolution();
+ }
+ if (!rt.facets || rt.facets.length === 0) return [];
+ return rt.facets.map(facet => ({
+ index: { byteStart: facet.index.byteStart, byteEnd: facet.index.byteEnd },
+ features: facet.features.map(feature => {
+ const type = feature.$type;
+ if (type === 'app.bsky.richtext.facet#link') {
+ const f = feature as { $type: string; uri: string };
+ return { $type: type, uri: f.uri };
+ }
+ if (type === 'app.bsky.richtext.facet#mention') {
+ const f = feature as { $type: string; did: string };
+ return { $type: type, did: f.did };
+ }
+ if (type === 'app.bsky.richtext.facet#tag') {
+ const f = feature as { $type: string; tag: string };
+ return { $type: type, tag: f.tag };
+ }
+ return { $type: type };
+ }),
+ }));
+}
+
+export function decodeJwtExp(jwt: string): number | undefined {
+ const parts = jwt.split('.');
+ if (parts.length < 2) return undefined;
+ try {
+ const payload = parts[1].replace(/-/g, '+').replace(/_/g, '/');
+ const padded = payload.padEnd(payload.length + (4 - (payload.length % 4 || 4)), '=');
+ const json = Buffer.from(padded, 'base64').toString('utf-8');
+ const data = JSON.parse(json) as { exp?: number };
+ if (typeof data.exp === 'number') {
+ return data.exp * 1000;
+ }
+ } catch {
+ // ignore
+ }
+ return undefined;
+}
diff --git a/src/channels/factory.ts b/src/channels/factory.ts
index 1235374..7d6e5df 100644
--- a/src/channels/factory.ts
+++ b/src/channels/factory.ts
@@ -1,3 +1,4 @@
+import { BlueskyAdapter } from './bluesky.js';
import { DiscordAdapter } from './discord.js';
import { SignalAdapter } from './signal.js';
import { SlackAdapter } from './slack.js';
@@ -182,5 +183,30 @@ export function createChannelsForAgent(
}
}
+ // Bluesky: only start if there's something to subscribe to
+ if (agentConfig.channels.bluesky?.enabled) {
+ const bsky = agentConfig.channels.bluesky;
+ const hasWantedDids = !!bsky.wantedDids?.length;
+ const hasLists = !!(bsky.lists && Object.keys(bsky.lists).length > 0);
+ const hasAuth = !!bsky.handle;
+ const wantsNotifications = !!bsky.notifications?.enabled;
+ if (hasWantedDids || hasLists || hasAuth || wantsNotifications) {
+ adapters.push(new BlueskyAdapter({
+ agentName: agentConfig.name,
+ jetstreamUrl: bsky.jetstreamUrl,
+ wantedDids: bsky.wantedDids,
+ wantedCollections: bsky.wantedCollections,
+ cursor: bsky.cursor,
+ handle: bsky.handle,
+ appPassword: bsky.appPassword,
+ serviceUrl: bsky.serviceUrl,
+ appViewUrl: bsky.appViewUrl,
+ groups: bsky.groups,
+ lists: bsky.lists,
+ notifications: bsky.notifications,
+ }));
+ }
+ }
+
return adapters;
}
diff --git a/src/channels/index.ts b/src/channels/index.ts
index 93abc3b..1b0e6f3 100644
--- a/src/channels/index.ts
+++ b/src/channels/index.ts
@@ -10,3 +10,4 @@ export * from './slack.js';
export * from './whatsapp/index.js';
export * from './signal.js';
export * from './discord.js';
+export * from './bluesky.js';
diff --git a/src/channels/setup.ts b/src/channels/setup.ts
index b2a5b88..fec72e7 100644
--- a/src/channels/setup.ts
+++ b/src/channels/setup.ts
@@ -7,6 +7,7 @@
import { spawnSync } from 'node:child_process';
import * as p from '@clack/prompts';
+import type { BlueskyConfig } from '../config/types.js';
// ============================================================================
// Channel Metadata
@@ -18,6 +19,7 @@ export const CHANNELS = [
{ id: 'discord', displayName: 'Discord', hint: 'Bot token + Message Content intent' },
{ id: 'whatsapp', displayName: 'WhatsApp', hint: 'QR code pairing' },
{ id: 'signal', displayName: 'Signal', hint: 'signal-cli daemon' },
+ { id: 'bluesky', displayName: 'Bluesky', hint: 'Jetstream feed (read-only)' },
] as const;
export type ChannelId = typeof CHANNELS[number]['id'];
@@ -56,6 +58,8 @@ const GROUP_ID_HINTS: Record = {
'(e.g., 120363123456@g.us).',
signal:
'Group IDs appear in bot logs on first group message.',
+ bluesky:
+ 'Bluesky does not support groups. This setting is not used.',
};
// ============================================================================
@@ -562,6 +566,158 @@ export async function setupSignal(existing?: any): Promise {
};
}
+export async function setupBluesky(existing?: BlueskyConfig): Promise {
+ p.note(
+ 'Uses the Bluesky Jetstream WebSocket feed (read-only).\n' +
+ 'Provide one or more DID(s) to filter the stream.\n' +
+ 'Example DID: did:plc:i3n7ma327gght4kiea5dvpyn',
+ 'Bluesky Setup'
+ );
+
+ const didsRaw = await p.text({
+ message: 'Wanted DID(s) (comma-separated)',
+ placeholder: 'did:plc:...',
+ initialValue: Array.isArray(existing?.wantedDids)
+ ? existing.wantedDids.join(',')
+ : (existing?.wantedDids || ''),
+ });
+
+ if (p.isCancel(didsRaw)) {
+ p.cancel('Cancelled');
+ process.exit(0);
+ }
+
+ const wantedDids = typeof didsRaw === 'string'
+ ? didsRaw.split(',').map(s => s.trim()).filter(Boolean)
+ : [];
+
+ if (wantedDids.length === 0) {
+ p.log.warn('No DID provided. The stream may be very noisy without filters.');
+ }
+
+ const collectionsRaw = await p.text({
+ message: 'Wanted collections (optional, comma-separated)',
+ placeholder: 'app.bsky.feed.post',
+ initialValue: Array.isArray(existing?.wantedCollections)
+ ? existing.wantedCollections.join(',')
+ : (existing?.wantedCollections || ''),
+ });
+
+ if (p.isCancel(collectionsRaw)) {
+ p.cancel('Cancelled');
+ process.exit(0);
+ }
+
+ const wantedCollections = typeof collectionsRaw === 'string'
+ ? collectionsRaw.split(',').map(s => s.trim()).filter(Boolean)
+ : [];
+
+ const jetstreamUrl = await p.text({
+ message: 'Jetstream WebSocket URL (blank = default)',
+ placeholder: 'wss://jetstream2.us-east.bsky.network/subscribe',
+ initialValue: existing?.jetstreamUrl || '',
+ });
+
+ if (p.isCancel(jetstreamUrl)) {
+ p.cancel('Cancelled');
+ process.exit(0);
+ }
+
+ const defaultMode = await p.select({
+ message: 'Default Bluesky behavior',
+ options: [
+ { value: 'listen', label: 'Listen (recommended)', hint: 'Observe only' },
+ { value: 'open', label: 'Open', hint: 'Reply to posts' },
+ { value: 'mention-only', label: 'Mention-only', hint: 'Reply only when @mentioned' },
+ { value: 'disabled', label: 'Disabled', hint: 'Ignore all events' },
+ ],
+ initialValue: existing?.groups?.['*']?.mode || 'listen',
+ });
+
+ if (p.isCancel(defaultMode)) {
+ p.cancel('Cancelled');
+ process.exit(0);
+ }
+
+ const enablePosting = await p.confirm({
+ message: 'Configure Bluesky posting credentials? (required to reply)',
+ initialValue: !!(existing?.handle && existing?.appPassword),
+ });
+
+ if (p.isCancel(enablePosting)) {
+ p.cancel('Cancelled');
+ process.exit(0);
+ }
+
+ let handle: string | undefined;
+ let appPassword: string | undefined;
+ let serviceUrl: string | undefined;
+
+ if (enablePosting) {
+ p.note(
+ 'Replies require a Bluesky app password.\n' +
+ 'Create one in Settings β App passwords.',
+ 'Bluesky Posting'
+ );
+
+ const handleInput = await p.text({
+ message: 'Bluesky handle (e.g., you.bsky.social)',
+ placeholder: 'you.bsky.social',
+ initialValue: existing?.handle || '',
+ });
+
+ if (p.isCancel(handleInput)) {
+ p.cancel('Cancelled');
+ process.exit(0);
+ }
+
+ const appPasswordInput = await p.password({
+ message: 'Bluesky app password (format: xxxx-xxxx-xxxx-xxxx)',
+ validate: (v) => {
+ if (!v) return 'App password is required.';
+ if (!/^[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}-[a-z0-9]{4}$/.test(v)) {
+ return 'Expected format: xxxx-xxxx-xxxx-xxxx (lowercase letters and digits).';
+ }
+ },
+ });
+
+ if (p.isCancel(appPasswordInput)) {
+ p.cancel('Cancelled');
+ process.exit(0);
+ }
+
+ const serviceUrlInput = await p.text({
+ message: 'ATProto service URL (blank = https://bsky.social)',
+ placeholder: 'https://bsky.social',
+ initialValue: existing?.serviceUrl || '',
+ });
+
+ if (p.isCancel(serviceUrlInput)) {
+ p.cancel('Cancelled');
+ process.exit(0);
+ }
+
+ handle = handleInput || undefined;
+ appPassword = appPasswordInput || undefined;
+ serviceUrl = serviceUrlInput || undefined;
+ }
+
+ const groups = {
+ '*': { mode: defaultMode as 'open' | 'listen' | 'mention-only' | 'disabled' },
+ };
+
+ return {
+ enabled: true,
+ wantedDids,
+ wantedCollections: wantedCollections.length > 0 ? wantedCollections : undefined,
+ jetstreamUrl: jetstreamUrl || undefined,
+ groups,
+ handle,
+ appPassword,
+ serviceUrl,
+ };
+}
+
/** Get the setup function for a channel */
export function getSetupFunction(id: ChannelId): (existing?: any) => Promise {
const setupFunctions: Record Promise> = {
@@ -570,6 +726,7 @@ export function getSetupFunction(id: ChannelId): (existing?: any) => Promise Add a channel (telegram, slack, discord, whatsapp, signal)
channels remove Remove a channel
+ bluesky Manage Bluesky and run action commands (post/like/repost/read)
logout Logout from Letta Platform (revoke OAuth tokens)
skills Configure which skills are enabled
skills status Show skills status
@@ -268,6 +270,8 @@ Examples:
lettabot channels # Interactive channel management
lettabot channels add discord # Add Discord integration
lettabot channels remove telegram # Remove Telegram
+ lettabot bluesky post --text "Hello" --agent MyAgent
+ lettabot bluesky like at://did:plc:.../app.bsky.feed.post/... --agent MyAgent
lettabot todo add "Deliver morning report" --recurring "daily 8am"
lettabot todo list --actionable
lettabot pairing list telegram # Show pending Telegram pairings
@@ -306,6 +310,288 @@ function getDefaultTodoAgentKey(): string {
return configuredName;
}
+const BLUESKY_MANAGEMENT_ACTIONS = new Set([
+ 'add-did',
+ 'add-list',
+ 'set-default',
+ 'refresh-lists',
+ 'disable',
+ 'enable',
+ 'status',
+]);
+
+function showBlueskyCommandHelp(): void {
+ console.log(`
+Bluesky Commands:
+ # Management
+ bluesky add-did --agent [--mode ]
+ bluesky add-list --agent [--mode ]
+ bluesky set-default --agent
+ bluesky refresh-lists --agent
+ bluesky disable --agent
+ bluesky enable --agent
+ bluesky status --agent
+
+ # Actions (same behavior as lettabot-bluesky)
+ bluesky post --text "Hello" --agent
+ bluesky post --reply-to --text "Reply" --agent
+ bluesky like --agent
+ bluesky repost --agent
+ bluesky profile