bot0-queue-manager.md

bot0 Task Queue Manager

Overview

The Task Queue Manager lives inside the Hub and handles scheduling, prioritization, and dispatch of all tasks — whether from users, the proactive engine, or external webhooks.

Key principle: The queue manager decides WHEN and WHERE tasks run. It doesn't decide WHAT to do (that's the proactive engine or user).


Why It Lives in the Hub

ReasonExplanation
Daemon awarenessHub already tracks which daemons are online, busy, and their capabilities
Minimal latencyTasks go directly from queue to daemon via existing WebSocket
Single source of truthNo sync needed between separate services
Natural fitHub already routes tasks — queue is just smarter routing

Architecture

┌─────────────────────────────────────────────────────────────────────────────┐
│                                    HUB                                       │
│                                                                             │
│  ┌─────────────────────────────────────────────────────────────────────┐   │
│  │                        TASK QUEUE MANAGER                            │   │
│  │                                                                      │   │
│  │   ┌────────────────────────────────────────────────────────────┐    │   │
│  │   │                       INTAKE                                │    │   │
│  │   │                                                             │    │   │
│  │   │  • Receive tasks from all sources                          │    │   │
│  │   │  • Validate task schema                                    │    │   │
│  │   │  • Assign priority (or accept provided)                    │    │   │
│  │   │  • Estimate resource requirements                          │    │   │
│  │   │  • Check rate limits                                       │    │   │
│  │   │  • Apply backpressure if needed                            │    │   │
│  │   │                                                             │    │   │
│  │   └─────────────────────────────┬───────────────────────────────┘    │   │
│  │                                 │                                    │   │
│  │                                 ▼                                    │   │
│  │   ┌────────────────────────────────────────────────────────────┐    │   │
│  │   │                   PRIORITY QUEUES                           │    │   │
│  │   │                                                             │    │   │
│  │   │  ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐  │    │   │
│  │   │  │ CRITICAL  │ │   HIGH    │ │  NORMAL   │ │    LOW    │  │    │   │
│  │   │  │           │ │           │ │           │ │           │  │    │   │
│  │   │  │ User      │ │ Time-     │ │ Standard  │ │ Backgrnd  │  │    │   │
│  │   │  │ urgent    │ │ sensitive │ │ work      │ │ cleanup   │  │    │   │
│  │   │  │           │ │           │ │           │ │           │  │    │   │
│  │   │  │ < 30s     │ │ < 5min    │ │ < 1hr     │ │ When idle │  │    │   │
│  │   │  └───────────┘ └───────────┘ └───────────┘ └───────────┘  │    │   │
│  │   │                                                             │    │   │
│  │   │  Reserved capacity: 20% always available for CRITICAL      │    │   │
│  │   │                                                             │    │   │
│  │   └─────────────────────────────┬───────────────────────────────┘    │   │
│  │                                 │                                    │   │
│  │                                 ▼                                    │   │
│  │   ┌────────────────────────────────────────────────────────────┐    │   │
│  │   │                      SCHEDULER                              │    │   │
│  │   │                                                             │    │   │
│  │   │  • Match tasks to available daemons                        │    │   │
│  │   │  • Respect capability requirements                         │    │   │
│  │   │  • Load balance across daemons                             │    │   │
│  │   │  • Enforce concurrency limits                              │    │   │
│  │   │  • Handle deadlines (bump priority if tight)               │    │   │
│  │   │  • Respect scheduled_for times                             │    │   │
│  │   │                                                             │    │   │
│  │   └─────────────────────────────┬───────────────────────────────┘    │   │
│  │                                 │                                    │   │
│  │                                 ▼                                    │   │
│  │   ┌────────────────────────────────────────────────────────────┐    │   │
│  │   │                      DISPATCHER                             │    │   │
│  │   │                                                             │    │   │
│  │   │  • Send task to daemon via WebSocket                       │    │   │
│  │   │  • Track in-flight tasks                                   │    │   │
│  │   │  • Handle timeouts                                         │    │   │
│  │   │  • Process results/errors                                  │    │   │
│  │   │  • Trigger retries on failure                              │    │   │
│  │   │  • Update task status                                      │    │   │
│  │   │                                                             │    │   │
│  │   └─────────────────────────────┬───────────────────────────────┘    │   │
│  │                                 │                                    │   │
│  └─────────────────────────────────┼────────────────────────────────────┘   │
│                                    │                                        │
│  ┌─────────────────────────────────┼────────────────────────────────────┐   │
│  │                      DAEMON REGISTRY                                  │   │
│  │                                 │                                     │   │
│  │   daemon-1: online, 2/5 tasks, [computer_use, browser, code]        │   │
│  │   daemon-2: online, 0/5 tasks, [computer_use, gpu]                  │   │
│  │   daemon-3: offline                                                  │   │
│  │                                 │                                     │   │
│  └─────────────────────────────────┼────────────────────────────────────┘   │
│                                    │                                        │
│  ┌─────────────────────────────────┼────────────────────────────────────┐   │
│  │                      WEBSOCKET SERVER                                 │   │
│  │                                 │                                     │   │
│  │   Connections to all daemons    │                                     │   │
│  │                                 ▼                                     │   │
│  └─────────────────────────────────┼────────────────────────────────────┘   │
│                                    │                                        │
└────────────────────────────────────┼────────────────────────────────────────┘
                                     │
                      ┌──────────────┼──────────────┐
                      ▼              ▼              ▼
                 daemon-1       daemon-2       daemon-3

Task Sources

All tasks flow through the queue manager, regardless of origin:

┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐  ┌─────────────────┐
│      User       │  │    Proactive    │  │    Webhook      │  │    Schedule     │
│                 │  │     Engine      │  │   (External)    │  │    (Cron)       │
│  Desktop app    │  │                 │  │                 │  │                 │
│  Telegram       │  │  Observations   │  │  GitHub         │  │  Daily report   │
│  CLI            │  │  → Actions      │  │  Stripe         │  │  Weekly review  │
│                 │  │                 │  │  Slack          │  │                 │
└────────┬────────┘  └────────┬────────┘  └────────┬────────┘  └────────┬────────┘
         │                    │                    │                    │
         └────────────────────┴────────────────────┴────────────────────┘
                                       │
                                       ▼
                              TASK QUEUE MANAGER

Task Schema

typescript
interface Task { // Identity id: string; // Unique task ID user_id: string; // Owner idempotency_key?: string; // Prevent duplicates // Source source: { type: "user" | "proactive" | "webhook" | "schedule"; id?: string; // Trigger ID, schedule ID, etc. channel?: string; // "desktop", "telegram", etc. }; // Content type: string; // "computer_use", "api_call", "analysis", etc. input: string; // The actual task description context: Record; // Additional context // Priority & Scheduling priority: "critical" | "high" | "normal" | "low"; deadline?: Date; // Must complete by (hard deadline) sla?: Date; // Should complete by (soft deadline) scheduled_for?: Date; // Don't start before this time // Resource Requirements requirements: { capabilities?: string[]; // ["computer_use", "gpu", "browser"] target_daemon?: string; // Must run on specific daemon excluded_daemons?: string[]; // Don't run on these estimated_duration?: number; // Seconds memory_mb?: number; // Memory requirement cpu_intensive?: boolean; // Affects scheduling }; // Execution Configuration execution: { timeout: number; // Max execution time (seconds) max_attempts: number; // Retry limit (default: 3) retry_delay: number; // Base delay between retries (seconds) retry_backoff: "linear" | "exponential"; }; // State status: TaskStatus; assigned_daemon?: string; attempts: Attempt[]; // Results result?: any; error?: TaskError; // Timestamps created_at: Date; queued_at?: Date; scheduled_at?: Date; started_at?: Date; completed_at?: Date; // Metadata metadata?: Record; parent_task_id?: string; // For sub-tasks correlation_id?: string; // Group related tasks } type TaskStatus = | "pending" // Just created, not yet queued | "queued" // In priority queue | "scheduled" // Assigned to daemon, waiting to start | "running" // Currently executing | "completed" // Successfully finished | "failed" // Permanently failed (max retries exceeded) | "cancelled" // Cancelled by user or system | "timeout" // Exceeded execution timeout | "stale"; // Deadline passed before execution interface Attempt { attempt_number: number; daemon_id: string; started_at: Date; ended_at?: Date; status: "running" | "success" | "failed" | "timeout"; error?: TaskError; } interface TaskError { code: string; message: string; retryable: boolean; details?: Record; }

Priority System

Priority Levels

PrioritySLAUse CasesCapacity
CRITICAL< 30 secondsUser-initiated urgent, payment failures, security alerts20% reserved
HIGH< 5 minutesTime-sensitive proactive, approval responses, meeting prep30%
NORMAL< 1 hourStandard proactive, user non-urgent, reports40%
LOWWhen idleBackground maintenance, cleanup, optimization10%

Priority Assignment

typescript
function assignPriority(task: Task): Priority { // Explicit priority from source if (task.priority) return task.priority; // User-initiated is at least HIGH if (task.source.type === "user") { return task.source.channel === "desktop" ? "high" : "normal"; } // Deadline-based escalation if (task.deadline) { const timeUntil = task.deadline.getTime() - Date.now(); const estimatedDuration = task.requirements.estimated_duration || 300; if (timeUntil < estimatedDuration * 1.5) return "critical"; if (timeUntil < estimatedDuration * 3) return "high"; } // Proactive engine tasks default to NORMAL if (task.source.type === "proactive") return "normal"; // Scheduled tasks default to NORMAL if (task.source.type === "schedule") return "normal"; // Webhooks depend on type if (task.source.type === "webhook") { if (task.context.event_type?.includes("payment")) return "high"; if (task.context.event_type?.includes("security")) return "critical"; return "normal"; } return "normal"; }

Dynamic Priority Escalation

Tasks can be automatically escalated as deadlines approach:

typescript
function checkDeadlineEscalation(task: Task): void { if (!task.deadline || task.priority === "critical") return; const timeUntil = task.deadline.getTime() - Date.now(); const estimatedDuration = task.requirements.estimated_duration || 300; // Escalate if running out of time if (timeUntil < estimatedDuration * 2 && task.priority !== "critical") { task.priority = "critical"; requeue(task); log.warn(`Task ${task.id} escalated to CRITICAL due to deadline`); } } // Run every 30 seconds setInterval(() => { for (const task of queuedTasks) { checkDeadlineEscalation(task); } }, 30000);

Scheduling Logic

Daemon Selection

typescript
interface DaemonState { id: string; name: string; user_id: string; status: "online" | "offline" | "busy"; capabilities: string[]; active_tasks: number; max_concurrent_tasks: number; current_load: number; // 0-1 last_heartbeat: Date; } function selectDaemon(task: Task, daemons: DaemonState[]): DaemonState | null { // Filter candidates let candidates = daemons.filter(d => { // Must be online if (d.status === "offline") return false; // Must belong to task owner if (d.user_id !== task.user_id) return false; // Must have capacity if (d.active_tasks >= d.max_concurrent_tasks) return false; // Check capability requirements if (task.requirements.capabilities) { const hasAll = task.requirements.capabilities.every( cap => d.capabilities.includes(cap) ); if (!hasAll) return false; } // Specific daemon requested if (task.requirements.target_daemon) { return d.id === task.requirements.target_daemon; } // Excluded daemons if (task.requirements.excluded_daemons?.includes(d.id)) { return false; } return true; }); if (candidates.length === 0) return null; // Sort by preference candidates.sort((a, b) => { // Prefer less loaded const loadDiff = a.current_load - b.current_load; if (Math.abs(loadDiff) > 0.2) return loadDiff; // Prefer fewer active tasks return a.active_tasks - b.active_tasks; }); return candidates[0]; }

No Available Daemon

When no daemon can handle a task:

typescript
function handleNoDaemon(task: Task): void { // Check if specific daemon was required if (task.requirements.target_daemon) { const daemon = getDaemon(task.requirements.target_daemon); if (!daemon) { // Daemon doesn't exist failTask(task, { code: "DAEMON_NOT_FOUND", message: `Daemon ${task.requirements.target_daemon} not found`, retryable: false }); return; } if (daemon.status === "offline") { // Queue for when daemon comes online task.status = "queued"; task.metadata = { ...task.metadata, waiting_for_daemon: daemon.id }; notifyUser(task.user_id, { type: "task_queued", message: `Task queued - waiting for ${daemon.name} to come online` }); return; } } // No capable daemon available if (task.priority === "critical") { // Notify user immediately notifyUser(task.user_id, { type: "task_blocked", message: "Urgent task cannot run - no capable daemon available", task_id: task.id }); } // Keep in queue, will retry when daemon available task.status = "queued"; }

Concurrency Control

Limits Configuration

yaml
concurrency: global: max_total_tasks: 1000 # Across all users max_tasks_per_daemon: 5 # Per daemon per_user: max_concurrent_tasks: 20 # Per user across all daemons max_queued_tasks: 100 # Max tasks waiting in queue per_priority: critical: reserved_capacity: 0.20 # 20% always available max_wait_time: 30 # Seconds before escalation high: max_concurrent_percent: 0.40 normal: max_concurrent_percent: 0.35 low: max_concurrent_percent: 0.15 max_concurrent_absolute: 5 # Never more than 5 LOW tasks per_source: proactive: max_concurrent: 10 # Limit proactive engine max_per_minute: 30 user: max_concurrent: 15 # User gets priority webhook: max_concurrent: 5

Enforcement

typescript
class ConcurrencyManager { canAcceptTask(task: Task): { allowed: boolean; reason?: string } { const userTasks = this.getActiveTasksForUser(task.user_id); const userQueued = this.getQueuedTasksForUser(task.user_id); // Check user limits if (userTasks.length >= this.limits.per_user.max_concurrent_tasks) { // Allow CRITICAL to preempt LOW if (task.priority === "critical") { const lowTask = userTasks.find(t => t.priority === "low"); if (lowTask) { this.preempt(lowTask, task); return { allowed: true }; } } return { allowed: false, reason: "User concurrent limit reached" }; } // Check queue depth if (userQueued.length >= this.limits.per_user.max_queued_tasks) { return { allowed: false, reason: "User queue limit reached" }; } // Check source limits const sourceLimit = this.limits.per_source[task.source.type]; if (sourceLimit) { const sourceTasks = userTasks.filter(t => t.source.type === task.source.type); if (sourceTasks.length >= sourceLimit.max_concurrent) { return { allowed: false, reason: `${task.source.type} concurrent limit reached` }; } } return { allowed: true }; } preempt(lowTask: Task, highTask: Task): void { // Cancel low priority task lowTask.status = "cancelled"; lowTask.metadata = { ...lowTask.metadata, preempted_by: highTask.id }; // Notify daemon to stop this.sendToDaemon(lowTask.assigned_daemon, { type: "cancel_task", task_id: lowTask.id }); // Requeue low task lowTask.status = "queued"; this.enqueue(lowTask); log.info(`Preempted task ${lowTask.id} for ${highTask.id}`); } }

Rate Limiting & Backpressure

Rate Limits

yaml
rate_limits: per_source: proactive: tasks_per_minute: 50 tasks_per_hour: 500 burst_limit: 100 user: tasks_per_minute: 60 tasks_per_hour: 1000 webhook: tasks_per_minute: 100 tasks_per_hour: 2000 global: tasks_per_minute: 500 tasks_per_hour: 10000

Backpressure Signals

When the queue is overwhelmed, send backpressure signals:

typescript
interface BackpressureSignal { type: "backpressure"; level: "warning" | "critical"; queue_depth: number; estimated_wait_time: number; recommendations: string[]; } function checkBackpressure(): void { const queueDepth = getQueueDepth(); if (queueDepth > this.limits.backpressure.critical_threshold) { // Critical backpressure this.emitBackpressure({ type: "backpressure", level: "critical", queue_depth: queueDepth, estimated_wait_time: this.estimateWaitTime(), recommendations: [ "Pause proactive engine", "Reject LOW priority tasks", "Consider adding more daemons" ] }); // Auto-actions this.pauseProactiveEngine(); this.rejectLowPriorityTasks(); } else if (queueDepth > this.limits.backpressure.warning_threshold) { // Warning backpressure this.emitBackpressure({ type: "backpressure", level: "warning", queue_depth: queueDepth, estimated_wait_time: this.estimateWaitTime(), recommendations: [ "Reduce proactive batch frequency", "Delay non-urgent tasks" ] }); // Notify proactive engine to slow down this.notifyProactiveEngine({ action: "slow_down", factor: 2 }); } }

Proactive Engine Response to Backpressure

typescript
// In Proactive Engine function handleBackpressure(signal: BackpressureSignal): void { if (signal.level === "critical") { // Stop generating new tasks this.pause(); log.warn("Proactive engine paused due to critical backpressure"); } else if (signal.level === "warning") { // Reduce frequency this.batchWindow *= 2; // Double the batch window this.maxTasksPerBatch /= 2; // Halve the tasks per batch log.info("Proactive engine slowed down due to backpressure"); } }

Failure Handling

Error Classification

typescript
type ErrorCategory = "transient" | "daemon_failure" | "permanent" | "timeout"; function classifyError(error: TaskError): ErrorCategory { // Network errors are transient if (error.code.startsWith("NETWORK_")) return "transient"; if (error.code === "CONNECTION_LOST") return "transient"; // Daemon crashed if (error.code === "DAEMON_CRASHED") return "daemon_failure"; if (error.code === "DAEMON_UNRESPONSIVE") return "daemon_failure"; // Permanent errors if (error.code === "INVALID_INPUT") return "permanent"; if (error.code === "PERMISSION_DENIED") return "permanent"; if (error.code === "RESOURCE_NOT_FOUND") return "permanent"; // Timeout if (error.code === "EXECUTION_TIMEOUT") return "timeout"; // Default to transient (safer to retry) return "transient"; }

Retry Strategy

typescript
interface RetryStrategy { shouldRetry(task: Task, error: TaskError): boolean; getDelay(task: Task): number; } const defaultRetryStrategy: RetryStrategy = { shouldRetry(task: Task, error: TaskError): boolean { // Check retry limit if (task.attempts.length >= task.execution.max_attempts) { return false; } // Check error type const category = classifyError(error); if (category === "permanent") return false; // Check deadline if (task.deadline && task.deadline < new Date()) { return false; } return true; }, getDelay(task: Task): number { const attempt = task.attempts.length; const baseDelay = task.execution.retry_delay; if (task.execution.retry_backoff === "exponential") { // Exponential backoff with jitter const delay = baseDelay * Math.pow(2, attempt - 1); const jitter = delay * 0.2 * Math.random(); return Math.min(delay + jitter, 600); // Max 10 minutes } else { // Linear backoff return baseDelay * attempt; } } };

Retry Flow

Task fails
     │
     ▼
┌─────────────────────────────────────────────────────────────┐
│  Classify error                                              │
│                                                             │
│  transient → Retry with backoff                             │
│  daemon_failure → Reassign to different daemon              │
│  timeout → Retry with longer timeout (if allowed)           │
│  permanent → Mark failed, notify user                       │
│                                                             │
└─────────────────────────────────────────────────────────────┘
     │
     ▼
┌─────────────────────────────────────────────────────────────┐
│  Check retry eligibility                                     │
│                                                             │
│  • Under max attempts?                                      │
│  • Before deadline?                                         │
│  • Error is retryable?                                      │
│                                                             │
└─────────────────────────────────────────────────────────────┘
     │
     ├─── YES ───► Schedule retry with delay
     │
     └─── NO ────► Mark permanently failed, notify user

Daemon Failure Handling

typescript
function handleDaemonDisconnect(daemonId: string): void { // Get all tasks running on this daemon const affectedTasks = getTasksOnDaemon(daemonId); for (const task of affectedTasks) { // Record the failure task.attempts.push({ attempt_number: task.attempts.length + 1, daemon_id: daemonId, started_at: task.started_at, ended_at: new Date(), status: "failed", error: { code: "DAEMON_DISCONNECTED", message: "Daemon disconnected unexpectedly", retryable: true } }); // Try to reassign to another daemon task.requirements.excluded_daemons = [ ...(task.requirements.excluded_daemons || []), daemonId // Don't assign back to failed daemon immediately ]; task.status = "queued"; this.enqueue(task); log.warn(`Task ${task.id} requeued after daemon ${daemonId} disconnect`); } // Mark daemon as offline updateDaemonStatus(daemonId, "offline"); }

Task Lifecycle

                                    ┌──────────────────────────────────────┐
                                    │                                      │
                                    ▼                                      │
CREATED ───► QUEUED ───► SCHEDULED ───► RUNNING ───► COMPLETED            │
               │              │            │                               │
               │              │            │                               │
               │              │            └──► FAILED ────► (retry?) ─────┘
               │              │                    │
               │              │                    └──► PERMANENTLY_FAILED
               │              │
               └──────────────┴──► CANCELLED (user cancelled)
                                          │
                                          └──► STALE (deadline passed before start)

State Transitions

typescript
const validTransitions: Record = { pending: ["queued", "cancelled"], queued: ["scheduled", "cancelled", "stale"], scheduled: ["running", "queued", "cancelled"], // Back to queued if daemon unavailable running: ["completed", "failed", "timeout", "cancelled"], failed: ["queued", "permanently_failed"], // Queued for retry completed: [], // Terminal cancelled: [], // Terminal timeout: ["queued", "permanently_failed"], // Retry or give up stale: [], // Terminal permanently_failed: [] // Terminal };

Observability

Metrics

typescript
interface QueueMetrics { // Queue depth queue_depth_total: number; queue_depth_by_priority: Record; queue_depth_by_source: Record; // Throughput tasks_enqueued_per_minute: number; tasks_completed_per_minute: number; tasks_failed_per_minute: number; // Latency avg_queue_wait_time_ms: number; avg_execution_time_ms: number; p99_queue_wait_time_ms: number; p99_execution_time_ms: number; // Daemons daemons_online: number; daemons_busy: number; total_daemon_capacity: number; used_daemon_capacity: number; // Errors retry_rate: number; failure_rate: number; timeout_rate: number; }

Logging

typescript
// Task lifecycle events log.info("task.created", { task_id, source, priority }); log.info("task.queued", { task_id, queue_depth, estimated_wait }); log.info("task.scheduled", { task_id, daemon_id }); log.info("task.started", { task_id, daemon_id }); log.info("task.completed", { task_id, duration_ms, daemon_id }); log.warn("task.failed", { task_id, error, attempt, will_retry }); log.error("task.permanently_failed", { task_id, error, total_attempts }); // System events log.info("daemon.connected", { daemon_id, capabilities }); log.warn("daemon.disconnected", { daemon_id, affected_tasks }); log.warn("backpressure.warning", { queue_depth, estimated_wait }); log.error("backpressure.critical", { queue_depth, actions_taken });

Database Schema

sql
-- Tasks table CREATE TABLE tasks ( id UUID PRIMARY KEY, user_id UUID NOT NULL REFERENCES users(id), idempotency_key VARCHAR(255), -- Source source_type VARCHAR(50) NOT NULL, source_id VARCHAR(255), source_channel VARCHAR(50), -- Content task_type VARCHAR(100) NOT NULL, input TEXT NOT NULL, context JSONB DEFAULT '{}', -- Priority & Scheduling priority VARCHAR(20) NOT NULL DEFAULT 'normal', deadline TIMESTAMP, sla TIMESTAMP, scheduled_for TIMESTAMP, -- Requirements requirements JSONB DEFAULT '{}', -- Execution config timeout_seconds INTEGER DEFAULT 300, max_attempts INTEGER DEFAULT 3, retry_delay_seconds INTEGER DEFAULT 30, retry_backoff VARCHAR(20) DEFAULT 'exponential', -- State status VARCHAR(30) NOT NULL DEFAULT 'pending', assigned_daemon UUID REFERENCES daemons(id), -- Results result JSONB, error JSONB, -- Timestamps created_at TIMESTAMP NOT NULL DEFAULT NOW(), queued_at TIMESTAMP, scheduled_at TIMESTAMP, started_at TIMESTAMP, completed_at TIMESTAMP, -- Metadata metadata JSONB DEFAULT '{}', parent_task_id UUID REFERENCES tasks(id), correlation_id UUID, -- Indexes UNIQUE(user_id, idempotency_key) ); CREATE INDEX idx_tasks_user_status ON tasks(user_id, status); CREATE INDEX idx_tasks_status_priority ON tasks(status, priority); CREATE INDEX idx_tasks_daemon ON tasks(assigned_daemon) WHERE status = 'running'; CREATE INDEX idx_tasks_deadline ON tasks(deadline) WHERE status IN ('queued', 'scheduled'); CREATE INDEX idx_tasks_correlation ON tasks(correlation_id); -- Task attempts table CREATE TABLE task_attempts ( id UUID PRIMARY KEY, task_id UUID NOT NULL REFERENCES tasks(id), attempt_number INTEGER NOT NULL, daemon_id UUID NOT NULL REFERENCES daemons(id), started_at TIMESTAMP NOT NULL, ended_at TIMESTAMP, status VARCHAR(20) NOT NULL, error JSONB, UNIQUE(task_id, attempt_number) ); -- Daemons table (may already exist) CREATE TABLE daemons ( id UUID PRIMARY KEY, user_id UUID NOT NULL REFERENCES users(id), name VARCHAR(255) NOT NULL, status VARCHAR(20) NOT NULL DEFAULT 'offline', capabilities TEXT[] DEFAULT '{}', max_concurrent_tasks INTEGER DEFAULT 5, active_tasks INTEGER DEFAULT 0, current_load FLOAT DEFAULT 0, last_heartbeat TIMESTAMP, created_at TIMESTAMP NOT NULL DEFAULT NOW() ); CREATE INDEX idx_daemons_user_status ON daemons(user_id, status);

Hub Code Structure

apps/hub/
├── src/
│   ├── index.ts                 # Entry point
│   │
│   ├── websocket/
│   │   ├── server.ts            # WebSocket server
│   │   ├── handlers.ts          # Message handlers
│   │   └── protocol.ts          # Message types
│   │
│   ├── daemons/
│   │   ├── registry.ts          # Daemon tracking
│   │   ├── heartbeat.ts         # Health checking
│   │   └── capabilities.ts      # Capability matching
│   │
│   ├── queue/                   # TASK QUEUE MANAGER
│   │   ├── index.ts             # Main orchestrator
│   │   ├── intake.ts            # Task intake & validation
│   │   ├── priority.ts          # Priority queues
│   │   ├── scheduler.ts         # Daemon matching
│   │   ├── dispatcher.ts        # Task dispatch
│   │   ├── retry.ts             # Retry logic
│   │   ├── concurrency.ts       # Limits enforcement
│   │   ├── backpressure.ts      # Backpressure handling
│   │   └── metrics.ts           # Queue metrics
│   │
│   ├── storage/
│   │   ├── tasks.ts             # Task persistence
│   │   └── cache.ts             # In-memory cache
│   │
│   └── api/
│       ├── routes.ts            # HTTP endpoints
│       └── middleware.ts        # Auth, rate limiting
│
├── Dockerfile
├── fly.toml
└── package.json

Integration Points

Proactive Engine → Queue Manager

typescript
// Proactive engine submits tasks const response = await hub.submitTask({ source: { type: "proactive", id: "trigger_123" }, type: "analysis", input: "Analyze revenue drop", priority: "normal", requirements: { capabilities: ["api_call"] } }); if (response.status === "rate_limited") { // Back off and retry later await this.handleBackpressure(response.backpressure); }

User → Queue Manager (via Desktop/Telegram)

typescript
// User submits task from desktop const task = await hub.submitTask({ source: { type: "user", channel: "desktop" }, type: "computer_use", input: "Open Figma and export assets", priority: "high", // User tasks default to high requirements: { capabilities: ["computer_use"], target_daemon: "primary" // User's local daemon } });

Queue Manager → Daemon

typescript
// Dispatch task to daemon websocket.send(daemon.connection, { type: "task.execute", task: { id: task.id, type: task.type, input: task.input, context: task.context, timeout: task.execution.timeout } }); // Daemon responds with progress/completion websocket.on("task.progress", (msg) => { ... }); websocket.on("task.completed", (msg) => { ... }); websocket.on("task.failed", (msg) => { ... });

Summary

ComponentResponsibility
IntakeValidate, prioritize, rate limit
Priority QueuesOrganize by urgency
SchedulerMatch tasks to daemons
DispatcherSend to daemons, track execution
Retry HandlerHandle failures, schedule retries
Concurrency ManagerEnforce limits, preemption
Backpressure HandlerProtect system from overload

The queue manager is the traffic controller that ensures tasks run efficiently across your daemon fleet.