- Project structure: docs/, src/, tests/, proto/ - Research synthesis: Letta vs commercial ADEs - Architecture: Redis Streams queue design - Phase 1 orchestration design - Execution plan and project state tracking - Working subagent system (manager.ts fixes) This is the foundation for a Community ADE built on Letta's stateful agent architecture with git-native MemFS. 👾 Generated with [Letta Code](https://letta.com) Co-Authored-By: Letta Code <noreply@letta.com>
12 KiB
Phase 1 Execution Plan: Orchestration Layer
Date: March 18, 2026
Status: Ready for Implementation
Estimated Duration: 6 weeks
Owner: TBD
Overview
This document provides actionable implementation guidance for Phase 1 of the Community ADE, based on synthesized research from commercial tools (Intent, Warp) and open-source alternatives (Aider, Cline, Agno).
Key Research Insights
1. Letta's Competitive Position
✅ Strongest Open-Source Position:
- No competitor combines: stateful agents + hierarchical memory + git-native persistence + subagent orchestration
- Aider has git integration but no agent memory
- Cline is session-based with no persistence
- Agno lacks Letta's memory architecture
⚠️ Commercial Tools Lead in UX:
- Warp: Terminal-native with rich context (@file, images)
- Intent: Specification-driven development
- Both have web dashboards; Letta needs one
2. Technical Pattern Validation
Redis + Workers (Selected for Phase 1):
- ✅ Proven pattern (Celery uses Redis under hood)
- ✅ Simpler than Temporal for our use case
- ✅ More control over data model
- ⚠️ Temporal deferred to Phase 2 evaluation
React + FastAPI (Selected for Phase 2):
- ✅ Industry standard
- ✅ shadcn/ui provides accessible components
- ✅ TanStack Query for caching/real-time sync
Phase 1 Scope
Goals
- Replace in-process Task execution with persistent queue
- Ensure tasks survive agent restarts
- Support 5+ concurrent workers
- Maintain backward compatibility
Out of Scope (Phase 2+)
- Web dashboard (Phase 2)
- Temporal workflows (Phase 2 evaluation)
- GitHub integration (Phase 3)
- Computer Use (Phase 4)
Implementation Breakdown
Week 1: In-Memory Prototype
Deliverables:
TaskQueueclass with asyncio.Queue- Task dataclass with all fields
- Worker process skeleton
- Basic enqueue/dequeue/complete/fail operations
Testing:
# Test: Task survives worker crash
# Test: Concurrent task execution
# Test: Priority ordering
Code Structure:
letta_ade/
├── __init__.py
├── queue/
│ ├── __init__.py
│ ├── models.py # Task dataclass, enums
│ ├── memory_queue.py # Week 1 implementation
│ └── base.py # Abstract base class
└── worker/
├── __init__.py
└── runner.py # Worker process logic
Week 2: Redis Integration
Deliverables:
- Redis connection manager
- Task serialization (JSON/pickle)
- Atomic dequeue with WATCH/MULTI/EXEC
- Status tracking (Sets per status)
Redis Schema:
# Task storage
HSET task:{uuid} field value ...
# Priority queue (pending)
ZADD queue:pending {priority} {task_id}
# Running tasks
ZADD queue:running {started_at} {task_id}
# Status index
SADD status:pending {task_id}
SADD status:running {task_id}
SADD status:completed {task_id}
SADD status:failed {task_id}
# User index
SADD user:{user_id}:tasks {task_id}
Dependencies:
[dependencies]
redis = { version = "^5.0", extras = ["hiredis"] }
Week 3-4: Worker Pool + Heartbeat
Deliverables:
- Multiple worker processes
- Worker heartbeat (every 30s)
- Stall detection (2x heartbeat timeout)
- Graceful shutdown handling
- Worker capacity management
Worker Logic:
async def worker_loop(agent_id: UUID, queue: TaskQueue):
while running:
# Send heartbeat
await queue.heartbeat(agent_id)
# Try to get task (5s timeout)
task = await queue.dequeue(agent_id, timeout_ms=5000)
if task:
# Spawn subagent process
proc = await asyncio.create_subprocess_exec(
"letta", "run-agent",
f"--task-id={task.id}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Wait for completion
stdout, stderr = await proc.communicate()
# Update queue
if proc.returncode == 0:
await queue.complete(task.id, parse_result(stdout))
else:
await queue.fail(task.id, stderr.decode())
# Brief pause to prevent tight loop
await asyncio.sleep(0.1)
Stall Recovery (Cron job):
async def recover_stalled_tasks(queue: TaskQueue, max_age: timedelta):
"""Requeue tasks from crashed workers."""
stalled = await queue.find_stalled(max_age)
for task_id in stalled:
await queue.requeue(task_id)
Week 5: API Layer
Deliverables:
- FastAPI application structure
- REST endpoints (CRUD for tasks)
- WebSocket endpoint for real-time updates
- Authentication middleware
REST Endpoints:
@app.post("/tasks")
async def create_task(task: TaskCreate) -> TaskResponse:
"""Enqueue a new task."""
task_id = await queue.enqueue(task)
return TaskResponse(task_id=task_id, status="pending")
@app.get("/tasks/{task_id}")
async def get_task(task_id: UUID) -> Task:
"""Get task status and result."""
return await queue.get(task_id)
@app.get("/tasks")
async def list_tasks(
user_id: str,
status: Optional[TaskStatus] = None
) -> List[TaskSummary]:
"""List tasks with optional filtering."""
return await queue.list_by_user(user_id, status)
@app.post("/tasks/{task_id}/cancel")
async def cancel_task(task_id: UUID):
"""Cancel a pending or running task."""
await queue.cancel(task_id)
@app.post("/tasks/{task_id}/retry")
async def retry_task(task_id: UUID):
"""Retry a failed task."""
await queue.retry(task_id)
WebSocket:
@app.websocket("/ws")
async def websocket_endpoint(websocket: WebSocket):
await websocket.accept()
# Subscribe to Redis pub/sub for updates
pubsub = redis.pubsub()
pubsub.subscribe("task_updates")
async for message in pubsub.listen():
if message["type"] == "message":
await websocket.send_json(message["data"])
Week 6: Task Tool Integration
Deliverables:
- Modify existing Task tool to use queue
persistflag for backward compatibility- Polling support for task completion
- Migration guide for existing code
Modified Task Tool:
class TaskTool:
async def run(
self,
prompt: str,
subagent_type: str,
# ... existing args ...
persist: bool = False, # NEW
priority: int = 100, # NEW
wait: bool = False, # NEW
timeout: int = 300, # NEW
) -> TaskResult:
if persist:
# Enqueue and optionally wait
task_id = await self.queue.enqueue(...)
if wait:
# Poll for completion
result = await self._wait_for_task(task_id, timeout)
return result
else:
# Return immediately with task_id
return TaskResult(task_id=task_id, status="pending")
else:
# Legacy immediate execution
return await self._execute_immediately(...)
Technical Specifications
Task Data Model
@dataclass
class Task:
id: UUID
subagent_type: str
prompt: str
system_prompt: Optional[str]
model: Optional[str]
# State
status: TaskStatus
priority: int = 100
created_at: datetime
started_at: Optional[datetime]
completed_at: Optional[datetime]
# Execution
agent_id: Optional[UUID]
retry_count: int = 0
max_retries: int = 3
# Results
result: Optional[dict]
error: Optional[str]
exit_code: Optional[int]
# Metadata
tags: List[str]
user_id: str
parent_task: Optional[UUID]
# Cost tracking (NEW)
input_tokens: int = 0
output_tokens: int = 0
estimated_cost: float = 0.0
Retry Logic
async def retry_with_backoff(task: Task) -> bool:
if task.retry_count >= task.max_retries:
return False # Permanent failure
# Exponential backoff: 2^retry_count seconds
delay = min(2 ** task.retry_count, 300) # Cap at 5 min
await asyncio.sleep(delay)
task.retry_count += 1
# Re-enqueue with same priority
await queue.enqueue(task, priority=task.priority)
return True
Error Classification
| Error | Retry? | Action |
|---|---|---|
| Subagent crash | Yes | Requeue with backoff |
| Syntax error | No | Fail immediately |
| API rate limit | Yes | Exponential backoff |
| Out of memory | No | Alert admin, fail |
| Redis connection | Yes | Reconnect, retry |
| Timeout | Yes | Retry with longer timeout |
Testing Strategy
Unit Tests
# test_queue.py
def test_enqueue_creates_pending_task():
def test_dequeue_removes_from_pending():
def test_complete_moves_to_completed():
def test_fail_triggers_retry():
def test_max_retries_exceeded():
def test_cancel_stops_running_task():
Integration Tests
# test_worker.py
async def test_worker_processes_task():
async def test_worker_handles_failure():
async def test_worker_heartbeat():
async def test_stall_recovery():
Durability Tests
# test_durability.py
async def test_tasks_survive_restart():
"""Enqueue tasks, restart Redis, verify tasks persist."""
async def test_worker_crash_recovery():
"""Kill worker mid-task, verify task requeued."""
async def test_concurrent_workers():
"""5 workers, 20 tasks, verify all complete."""
Dependencies
Required
redis = { version = "^5.0", extras = ["hiredis"] }
fastapi = "^0.115"
websockets = "^13.0"
pydantic = "^2.0"
Development
pytest = "^8.0"
pytest-asyncio = "^0.24"
httpx = "^0.27" # For FastAPI test client
Infrastructure
- Redis 7.0+ (local or cloud)
- Python 3.11+
Migration Guide
For Existing Task Tool Users
Before:
result = await task_tool.run(
prompt="Create a React component",
subagent_type="coder"
) # Blocks until complete
After (backward compatible):
# Same behavior (immediate execution)
result = await task_tool.run(
prompt="Create a React component",
subagent_type="coder",
persist=False # default
)
New (persistent):
# Fire-and-forget
task_id = await task_tool.run(
prompt="Create a React component",
subagent_type="coder",
persist=True
)
# Wait for completion
result = await task_tool.run(
prompt="Create a React component",
subagent_type="coder",
persist=True,
wait=True,
timeout=600
)
Success Criteria
| Metric | Target | Measurement |
|---|---|---|
| Task durability | 100% | Tasks never lost on restart |
| Throughput | 10 tasks/min | With 3 workers |
| Latency | <100ms | Enqueue → pending |
| Recovery time | <60s | Worker crash → requeue |
| API uptime | 99.9% | Health check endpoint |
| Backward compat | 100% | Existing tests pass |
Risk Mitigation
| Risk | Likelihood | Impact | Mitigation |
|---|---|---|---|
| Redis complexity | Low | Medium | Start with simple ops |
| Worker pool bugs | Medium | High | Extensive testing |
| Performance issues | Low | Medium | Load testing Week 5 |
| Migration breakage | Low | High | Full test suite |
Handoff to Phase 2
Phase 2 Prereqs:
- All Phase 1 success criteria met
- API documentation complete
- WebSocket tested with simple client
- Cost tracking working
Phase 2 Inputs:
- Task queue API (REST + WebSocket)
- Task data model
- Worker management API
- Redis schema
Appendix: Quick Reference
Redis Commands Cheat Sheet
# Start Redis
docker run -d -p 6379:6379 redis:7-alpine
# Monitor
redis-cli monitor
# Inspect keys
redis-cli KEYS "task:*"
redis-cli HGETALL task:abc-123
# Clear queue
redis-cli FLUSHDB
Development Commands
# Start worker
python -m letta_ade.worker.runner --agent-id worker-1
# Start API
uvicorn letta_ade.api:app --reload
# Run tests
pytest tests/ -v --tb=short
# Integration test
pytest tests/integration/ -v
Ready for implementation. Questions? See community-ade-research-synthesis-2026-03-18.md for full context.