Files
community-ade/docs/ade-phase1-orchestration-design.md
Ani (Annie Tunturi) 00382055c6 Initial commit: Community ADE foundation
- Project structure: docs/, src/, tests/, proto/
- Research synthesis: Letta vs commercial ADEs
- Architecture: Redis Streams queue design
- Phase 1 orchestration design
- Execution plan and project state tracking
- Working subagent system (manager.ts fixes)

This is the foundation for a Community ADE built on Letta's
stateful agent architecture with git-native MemFS.

👾 Generated with [Letta Code](https://letta.com)

Co-Authored-By: Letta Code <noreply@letta.com>
2026-03-18 10:30:20 -04:00

308 lines
8.9 KiB
Markdown

# Phase 1: Orchestration Layer Design
**Date:** March 18, 2026
**Architect:** Researcher subagent
**Goal:** Design persistent task queue system for Community ADE
---
## 1. Core Data Model
```python
@dataclass
class Task:
id: UUID # Unique task identifier
subagent_type: str # "researcher", "coder", etc.
prompt: str # User prompt to subagent
system_prompt: Optional[str] # Override default system prompt
model: Optional[str] # Override default model
# State tracking
status: TaskStatus # pending/running/completed/failed/cancelled
priority: int = 100 # Lower = higher priority
created_at: datetime
started_at: Optional[datetime]
completed_at: Optional[datetime]
# Execution tracking
agent_id: Optional[UUID] # Assigned worker agent
retry_count: int = 0
max_retries: int = 3
# Results
result: Optional[dict] # Success result
error: Optional[str] # Failure message
exit_code: Optional[int] # Subprocess exit code
# Metadata
tags: List[str] # For filtering/grouping
user_id: str # Task owner
parent_task: Optional[UUID] # For task chains
```
### TaskStatus Enum
```python
class TaskStatus(Enum):
PENDING = "pending" # Waiting for worker
RUNNING = "running" # Assigned to worker
COMPLETED = "completed" # Success
FAILED = "failed" # Permanent failure (max retries)
CANCELLED = "cancelled" # User cancelled
STALLED = "stalled" # Worker crashed, needs recovery
```
---
## 2. State Machine
```
+-----------+
| PENDING |
+-----+-----+
| dequeue()
v
+--------+ +-------------+ +-----------+
| FAILED |<--------+ RUNNING +-------->| COMPLETED |
+--------+ fail() +------+------+ success +-----------+
^ max | |
| retries | |
+------------------+ | cancel()
retry() v
+-----------+
| CANCELLED |
+-----------+
^
| stall detected
+----------+
| STALLED |
+----------+
```
### Transitions
- `PENDING → RUNNING`: Worker dequeues task
- `RUNNING → COMPLETED`: Subagent succeeds
- `RUNNING → FAILED`: Subagent fails, max retries reached
- `RUNNING → STALLED`: Worker heartbeat timeout
- `STALLED → RUNNING`: Reassigned to new worker
- `FAILED → RUNNING`: Manual retry triggered
- Any → CANCELLED: User cancellation
---
## 3. Redis Data Structures
| Purpose | Structure | Key Pattern |
|---------|-----------|-------------|
| Task payload | Hash | `task:{task_id}` |
| Pending queue | Sorted Set (by priority) | `queue:pending` |
| Running set | Set | `queue:running` |
| Worker registry | Hash | `worker:{agent_id}` |
| Status index | Set per status | `status:{status}` |
| User tasks | Set | `user:{user_id}:tasks` |
### Example Redis Operations
```redis
# Enqueue (pending)
ZADD queue:pending {priority} {task_id}
HSET task:{task_id} status pending created_at {timestamp} ...
SADD status:pending {task_id}
# Dequeue (atomic)
WATCH queue:pending
task_id = ZPOPMIN queue:pending
MULTI
ZADD queue:running {now} {task_id}
HSET task:{task_id} status running agent_id {worker} started_at {now}
SMOVE status:pending status:running {task_id}
EXEC
# Complete
ZREM queue:running {task_id}
SADD status:completed {task_id}
HSET task:{task_id} status completed result {...} completed_at {now}
# Fail with retry
HINCRBY task:{task_id} retry_count 1
ZADD queue:pending {priority} {task_id} # Re-enqueue
SMOVE status:running status:pending {task_id}
HSET task:{task_id} status pending error {...}
# Stall recovery (cron job)
SMEMBERS queue:running
# For each task where worker heartbeat > threshold:
ZREM queue:running {task_id}
SADD status:stalled {task_id}
ZADD queue:pending {priority} {task_id}
```
---
## 4. Key API Methods
```python
class TaskQueue:
# Core operations
async def enqueue(task: Task) -> UUID
async def dequeue(worker_id: UUID, timeout_ms: int = 5000) -> Optional[Task]
async def complete(task_id: UUID, result: dict) -> None
async def fail(task_id: UUID, error: str, retryable: bool = True) -> None
async def cancel(task_id: UUID) -> None
# Management
async def retry(task_id: UUID) -> None # Manual retry
async def requeue_stalled(max_age_ms: int = 60000) -> int # Recover crashed
async def get_status(task_id: UUID) -> TaskStatus
async def list_by_user(user_id: str, status: Optional[str]) -> List[TaskSummary]
# Worker management
async def register_worker(agent_id: UUID, capacity: int) -> None
async def heartbeat(agent_id: UUID) -> None
async def unregister_worker(agent_id: UUID, reason: str) -> None
```
---
## 5. Integration with Existing Task Tool
### Current Flow
```
Task tool → immediate subprocess spawn → wait → return result
```
### New Flow (with persistence)
```
Task tool → enqueue() → return task_id (immediate)
Background worker → dequeue() → spawn subprocess → complete()/fail()
Caller polls/gets notification when task completes
```
### Changes to Task Tool Schema
```python
class TaskTool:
async def run(
self,
prompt: str,
subagent_type: str,
# ... existing args ...
persist: bool = False, # NEW: enqueue instead of immediate run
priority: int = 100, # NEW
tags: Optional[List[str]] = None # NEW
) -> TaskResult:
if persist:
task_id = await self.queue.enqueue(...)
return TaskResult(task_id=task_id, status="pending")
else:
# Legacy: immediate execution
...
```
### Worker Agent Integration
**Worker subscribes to queue:**
```python
async def worker_loop(agent_id: UUID):
while running:
task = await queue.dequeue(agent_id, timeout_ms=5000)
if task:
# Spawn subprocess
proc = await asyncio.create_subprocess_exec(
"letta", "run-agent", f"--task-id={task.id}",
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE
)
# Monitor and wait
stdout, stderr = await proc.communicate()
# Update queue based on result
if proc.returncode == 0:
await queue.complete(task.id, parse_result(stdout))
else:
await queue.fail(task.id, stderr.decode(), retryable=True)
```
---
## 6. Implementation Phases
### Phase 1a: In-Memory Prototype (Week 1)
- Python `asyncio.Queue` for pending tasks
- In-memory dict for task storage
- Single worker process
- No Redis dependency
### Phase 1b: Redis Integration (Week 2)
- Replace queue with Redis
- Add task persistence
- Implement retry logic
- Add stall recovery
### Phase 1c: Worker Pool (Week 3-4)
- Multiple worker processes
- Worker heartbeat monitoring
- Task assignment logic
- Graceful shutdown handling
### Phase 1d: API & CLI (Week 5-6)
- REST API for task management
- CLI commands for queue inspection
- Task status dashboard endpoint
- Webhook notifications
### Phase 1e: Integration (Week 7-8)
- Modify Task tool to use queue
- Add persistence flag
- Maintain backward compatibility
- Migration path for existing code
---
## 7. Retry Logic with Exponential Backoff
```python
async def retry_with_backoff(task_id: UUID):
task = await queue.get(task_id)
if task.retry_count >= task.max_retries:
await queue.fail(task_id, "Max retries exceeded", retryable=False)
return
# Exponential backoff: 2^retry_count seconds
delay = min(2 ** task.retry_count, 300) # Cap at 5 minutes
await asyncio.sleep(delay)
# Re-enqueue with same priority
await queue.enqueue(task, priority=task.priority)
```
---
## 8. Error Handling Strategy
| Error Type | Retry? | Action |
|------------|--------|--------|
| Subagent crash | Yes | Increment retry, requeue |
| Syntax error in code | No | Fail immediately |
| Timeout | Yes | Retry with longer timeout |
| API rate limit | Yes | Retry with exponential backoff |
| Out of memory | No | Fail, alert admin |
| Redis connection lost | Yes | Reconnect, retry operation |
---
## Next Steps
1. **Implement in-memory prototype** (Week 1)
2. **Add Redis persistence** (Week 2)
3. **Build worker pool** (Week 3-4)
4. **Integrate with Task tool** (Week 7-8)
5. **Write tests for queue durability** (ongoing)
---
*Design by Researcher subagent, March 18, 2026*