bot0 Task Queue Manager
Overview
The Task Queue Manager lives inside the Hub and handles scheduling, prioritization, and dispatch of all tasks — whether from users, the proactive engine, or external webhooks.
Key principle: The queue manager decides WHEN and WHERE tasks run. It doesn't decide WHAT to do (that's the proactive engine or user).
Why It Lives in the Hub
| Reason | Explanation |
|---|---|
| Daemon awareness | Hub already tracks which daemons are online, busy, and their capabilities |
| Minimal latency | Tasks go directly from queue to daemon via existing WebSocket |
| Single source of truth | No sync needed between separate services |
| Natural fit | Hub already routes tasks — queue is just smarter routing |
Architecture
┌─────────────────────────────────────────────────────────────────────────────┐
│ HUB │
│ │
│ ┌─────────────────────────────────────────────────────────────────────┐ │
│ │ TASK QUEUE MANAGER │ │
│ │ │ │
│ │ ┌────────────────────────────────────────────────────────────┐ │ │
│ │ │ INTAKE │ │ │
│ │ │ │ │ │
│ │ │ • Receive tasks from all sources │ │ │
│ │ │ • Validate task schema │ │ │
│ │ │ • Assign priority (or accept provided) │ │ │
│ │ │ • Estimate resource requirements │ │ │
│ │ │ • Check rate limits │ │ │
│ │ │ • Apply backpressure if needed │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────┬───────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌────────────────────────────────────────────────────────────┐ │ │
│ │ │ PRIORITY QUEUES │ │ │
│ │ │ │ │ │
│ │ │ ┌───────────┐ ┌───────────┐ ┌───────────┐ ┌───────────┐ │ │ │
│ │ │ │ CRITICAL │ │ HIGH │ │ NORMAL │ │ LOW │ │ │ │
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ │ │ User │ │ Time- │ │ Standard │ │ Backgrnd │ │ │ │
│ │ │ │ urgent │ │ sensitive │ │ work │ │ cleanup │ │ │ │
│ │ │ │ │ │ │ │ │ │ │ │ │ │
│ │ │ │ < 30s │ │ < 5min │ │ < 1hr │ │ When idle │ │ │ │
│ │ │ └───────────┘ └───────────┘ └───────────┘ └───────────┘ │ │ │
│ │ │ │ │ │
│ │ │ Reserved capacity: 20% always available for CRITICAL │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────┬───────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌────────────────────────────────────────────────────────────┐ │ │
│ │ │ SCHEDULER │ │ │
│ │ │ │ │ │
│ │ │ • Match tasks to available daemons │ │ │
│ │ │ • Respect capability requirements │ │ │
│ │ │ • Load balance across daemons │ │ │
│ │ │ • Enforce concurrency limits │ │ │
│ │ │ • Handle deadlines (bump priority if tight) │ │ │
│ │ │ • Respect scheduled_for times │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────┬───────────────────────────────┘ │ │
│ │ │ │ │
│ │ ▼ │ │
│ │ ┌────────────────────────────────────────────────────────────┐ │ │
│ │ │ DISPATCHER │ │ │
│ │ │ │ │ │
│ │ │ • Send task to daemon via WebSocket │ │ │
│ │ │ • Track in-flight tasks │ │ │
│ │ │ • Handle timeouts │ │ │
│ │ │ • Process results/errors │ │ │
│ │ │ • Trigger retries on failure │ │ │
│ │ │ • Update task status │ │ │
│ │ │ │ │ │
│ │ └─────────────────────────────┬───────────────────────────────┘ │ │
│ │ │ │ │
│ └─────────────────────────────────┼────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────┼────────────────────────────────────┐ │
│ │ DAEMON REGISTRY │ │
│ │ │ │ │
│ │ daemon-1: online, 2/5 tasks, [computer_use, browser, code] │ │
│ │ daemon-2: online, 0/5 tasks, [computer_use, gpu] │ │
│ │ daemon-3: offline │ │
│ │ │ │ │
│ └─────────────────────────────────┼────────────────────────────────────┘ │
│ │ │
│ ┌─────────────────────────────────┼────────────────────────────────────┐ │
│ │ WEBSOCKET SERVER │ │
│ │ │ │ │
│ │ Connections to all daemons │ │ │
│ │ ▼ │ │
│ └─────────────────────────────────┼────────────────────────────────────┘ │
│ │ │
└────────────────────────────────────┼────────────────────────────────────────┘
│
┌──────────────┼──────────────┐
▼ ▼ ▼
daemon-1 daemon-2 daemon-3
Task Sources
All tasks flow through the queue manager, regardless of origin:
┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐ ┌─────────────────┐
│ User │ │ Proactive │ │ Webhook │ │ Schedule │
│ │ │ Engine │ │ (External) │ │ (Cron) │
│ Desktop app │ │ │ │ │ │ │
│ Telegram │ │ Observations │ │ GitHub │ │ Daily report │
│ CLI │ │ → Actions │ │ Stripe │ │ Weekly review │
│ │ │ │ │ Slack │ │ │
└────────┬────────┘ └────────┬────────┘ └────────┬────────┘ └────────┬────────┘
│ │ │ │
└────────────────────┴────────────────────┴────────────────────┘
│
▼
TASK QUEUE MANAGER
Task Schema
typescript
interface Task { // Identity id: string; // Unique task ID user_id: string; // Owner idempotency_key?: string; // Prevent duplicates // Source source: { type: "user" | "proactive" | "webhook" | "schedule"; id?: string; // Trigger ID, schedule ID, etc. channel?: string; // "desktop", "telegram", etc. }; // Content type: string; // "computer_use", "api_call", "analysis", etc. input: string; // The actual task description context: Record; // Additional context // Priority & Scheduling priority: "critical" | "high" | "normal" | "low"; deadline?: Date; // Must complete by (hard deadline) sla?: Date; // Should complete by (soft deadline) scheduled_for?: Date; // Don't start before this time // Resource Requirements requirements: { capabilities?: string[]; // ["computer_use", "gpu", "browser"] target_daemon?: string; // Must run on specific daemon excluded_daemons?: string[]; // Don't run on these estimated_duration?: number; // Seconds memory_mb?: number; // Memory requirement cpu_intensive?: boolean; // Affects scheduling }; // Execution Configuration execution: { timeout: number; // Max execution time (seconds) max_attempts: number; // Retry limit (default: 3) retry_delay: number; // Base delay between retries (seconds) retry_backoff: "linear" | "exponential"; }; // State status: TaskStatus; assigned_daemon?: string; attempts: Attempt[]; // Results result?: any; error?: TaskError; // Timestamps created_at: Date; queued_at?: Date; scheduled_at?: Date; started_at?: Date; completed_at?: Date; // Metadata metadata?: Record; parent_task_id?: string; // For sub-tasks correlation_id?: string; // Group related tasks } type TaskStatus = | "pending" // Just created, not yet queued | "queued" // In priority queue | "scheduled" // Assigned to daemon, waiting to start | "running" // Currently executing | "completed" // Successfully finished | "failed" // Permanently failed (max retries exceeded) | "cancelled" // Cancelled by user or system | "timeout" // Exceeded execution timeout | "stale"; // Deadline passed before execution interface Attempt { attempt_number: number; daemon_id: string; started_at: Date; ended_at?: Date; status: "running" | "success" | "failed" | "timeout"; error?: TaskError; } interface TaskError { code: string; message: string; retryable: boolean; details?: Record; }
Priority System
Priority Levels
| Priority | SLA | Use Cases | Capacity |
|---|---|---|---|
| CRITICAL | < 30 seconds | User-initiated urgent, payment failures, security alerts | 20% reserved |
| HIGH | < 5 minutes | Time-sensitive proactive, approval responses, meeting prep | 30% |
| NORMAL | < 1 hour | Standard proactive, user non-urgent, reports | 40% |
| LOW | When idle | Background maintenance, cleanup, optimization | 10% |
Priority Assignment
typescript
function assignPriority(task: Task): Priority { // Explicit priority from source if (task.priority) return task.priority; // User-initiated is at least HIGH if (task.source.type === "user") { return task.source.channel === "desktop" ? "high" : "normal"; } // Deadline-based escalation if (task.deadline) { const timeUntil = task.deadline.getTime() - Date.now(); const estimatedDuration = task.requirements.estimated_duration || 300; if (timeUntil < estimatedDuration * 1.5) return "critical"; if (timeUntil < estimatedDuration * 3) return "high"; } // Proactive engine tasks default to NORMAL if (task.source.type === "proactive") return "normal"; // Scheduled tasks default to NORMAL if (task.source.type === "schedule") return "normal"; // Webhooks depend on type if (task.source.type === "webhook") { if (task.context.event_type?.includes("payment")) return "high"; if (task.context.event_type?.includes("security")) return "critical"; return "normal"; } return "normal"; }
Dynamic Priority Escalation
Tasks can be automatically escalated as deadlines approach:
typescript
function checkDeadlineEscalation(task: Task): void { if (!task.deadline || task.priority === "critical") return; const timeUntil = task.deadline.getTime() - Date.now(); const estimatedDuration = task.requirements.estimated_duration || 300; // Escalate if running out of time if (timeUntil < estimatedDuration * 2 && task.priority !== "critical") { task.priority = "critical"; requeue(task); log.warn(`Task ${task.id} escalated to CRITICAL due to deadline`); } } // Run every 30 seconds setInterval(() => { for (const task of queuedTasks) { checkDeadlineEscalation(task); } }, 30000);
Scheduling Logic
Daemon Selection
typescript
interface DaemonState { id: string; name: string; user_id: string; status: "online" | "offline" | "busy"; capabilities: string[]; active_tasks: number; max_concurrent_tasks: number; current_load: number; // 0-1 last_heartbeat: Date; } function selectDaemon(task: Task, daemons: DaemonState[]): DaemonState | null { // Filter candidates let candidates = daemons.filter(d => { // Must be online if (d.status === "offline") return false; // Must belong to task owner if (d.user_id !== task.user_id) return false; // Must have capacity if (d.active_tasks >= d.max_concurrent_tasks) return false; // Check capability requirements if (task.requirements.capabilities) { const hasAll = task.requirements.capabilities.every( cap => d.capabilities.includes(cap) ); if (!hasAll) return false; } // Specific daemon requested if (task.requirements.target_daemon) { return d.id === task.requirements.target_daemon; } // Excluded daemons if (task.requirements.excluded_daemons?.includes(d.id)) { return false; } return true; }); if (candidates.length === 0) return null; // Sort by preference candidates.sort((a, b) => { // Prefer less loaded const loadDiff = a.current_load - b.current_load; if (Math.abs(loadDiff) > 0.2) return loadDiff; // Prefer fewer active tasks return a.active_tasks - b.active_tasks; }); return candidates[0]; }
No Available Daemon
When no daemon can handle a task:
typescript
function handleNoDaemon(task: Task): void { // Check if specific daemon was required if (task.requirements.target_daemon) { const daemon = getDaemon(task.requirements.target_daemon); if (!daemon) { // Daemon doesn't exist failTask(task, { code: "DAEMON_NOT_FOUND", message: `Daemon ${task.requirements.target_daemon} not found`, retryable: false }); return; } if (daemon.status === "offline") { // Queue for when daemon comes online task.status = "queued"; task.metadata = { ...task.metadata, waiting_for_daemon: daemon.id }; notifyUser(task.user_id, { type: "task_queued", message: `Task queued - waiting for ${daemon.name} to come online` }); return; } } // No capable daemon available if (task.priority === "critical") { // Notify user immediately notifyUser(task.user_id, { type: "task_blocked", message: "Urgent task cannot run - no capable daemon available", task_id: task.id }); } // Keep in queue, will retry when daemon available task.status = "queued"; }
Concurrency Control
Limits Configuration
yaml
concurrency: global: max_total_tasks: 1000 # Across all users max_tasks_per_daemon: 5 # Per daemon per_user: max_concurrent_tasks: 20 # Per user across all daemons max_queued_tasks: 100 # Max tasks waiting in queue per_priority: critical: reserved_capacity: 0.20 # 20% always available max_wait_time: 30 # Seconds before escalation high: max_concurrent_percent: 0.40 normal: max_concurrent_percent: 0.35 low: max_concurrent_percent: 0.15 max_concurrent_absolute: 5 # Never more than 5 LOW tasks per_source: proactive: max_concurrent: 10 # Limit proactive engine max_per_minute: 30 user: max_concurrent: 15 # User gets priority webhook: max_concurrent: 5
Enforcement
typescript
class ConcurrencyManager { canAcceptTask(task: Task): { allowed: boolean; reason?: string } { const userTasks = this.getActiveTasksForUser(task.user_id); const userQueued = this.getQueuedTasksForUser(task.user_id); // Check user limits if (userTasks.length >= this.limits.per_user.max_concurrent_tasks) { // Allow CRITICAL to preempt LOW if (task.priority === "critical") { const lowTask = userTasks.find(t => t.priority === "low"); if (lowTask) { this.preempt(lowTask, task); return { allowed: true }; } } return { allowed: false, reason: "User concurrent limit reached" }; } // Check queue depth if (userQueued.length >= this.limits.per_user.max_queued_tasks) { return { allowed: false, reason: "User queue limit reached" }; } // Check source limits const sourceLimit = this.limits.per_source[task.source.type]; if (sourceLimit) { const sourceTasks = userTasks.filter(t => t.source.type === task.source.type); if (sourceTasks.length >= sourceLimit.max_concurrent) { return { allowed: false, reason: `${task.source.type} concurrent limit reached` }; } } return { allowed: true }; } preempt(lowTask: Task, highTask: Task): void { // Cancel low priority task lowTask.status = "cancelled"; lowTask.metadata = { ...lowTask.metadata, preempted_by: highTask.id }; // Notify daemon to stop this.sendToDaemon(lowTask.assigned_daemon, { type: "cancel_task", task_id: lowTask.id }); // Requeue low task lowTask.status = "queued"; this.enqueue(lowTask); log.info(`Preempted task ${lowTask.id} for ${highTask.id}`); } }
Rate Limiting & Backpressure
Rate Limits
yaml
rate_limits: per_source: proactive: tasks_per_minute: 50 tasks_per_hour: 500 burst_limit: 100 user: tasks_per_minute: 60 tasks_per_hour: 1000 webhook: tasks_per_minute: 100 tasks_per_hour: 2000 global: tasks_per_minute: 500 tasks_per_hour: 10000
Backpressure Signals
When the queue is overwhelmed, send backpressure signals:
typescript
interface BackpressureSignal { type: "backpressure"; level: "warning" | "critical"; queue_depth: number; estimated_wait_time: number; recommendations: string[]; } function checkBackpressure(): void { const queueDepth = getQueueDepth(); if (queueDepth > this.limits.backpressure.critical_threshold) { // Critical backpressure this.emitBackpressure({ type: "backpressure", level: "critical", queue_depth: queueDepth, estimated_wait_time: this.estimateWaitTime(), recommendations: [ "Pause proactive engine", "Reject LOW priority tasks", "Consider adding more daemons" ] }); // Auto-actions this.pauseProactiveEngine(); this.rejectLowPriorityTasks(); } else if (queueDepth > this.limits.backpressure.warning_threshold) { // Warning backpressure this.emitBackpressure({ type: "backpressure", level: "warning", queue_depth: queueDepth, estimated_wait_time: this.estimateWaitTime(), recommendations: [ "Reduce proactive batch frequency", "Delay non-urgent tasks" ] }); // Notify proactive engine to slow down this.notifyProactiveEngine({ action: "slow_down", factor: 2 }); } }
Proactive Engine Response to Backpressure
typescript
// In Proactive Engine function handleBackpressure(signal: BackpressureSignal): void { if (signal.level === "critical") { // Stop generating new tasks this.pause(); log.warn("Proactive engine paused due to critical backpressure"); } else if (signal.level === "warning") { // Reduce frequency this.batchWindow *= 2; // Double the batch window this.maxTasksPerBatch /= 2; // Halve the tasks per batch log.info("Proactive engine slowed down due to backpressure"); } }
Failure Handling
Error Classification
typescript
type ErrorCategory = "transient" | "daemon_failure" | "permanent" | "timeout"; function classifyError(error: TaskError): ErrorCategory { // Network errors are transient if (error.code.startsWith("NETWORK_")) return "transient"; if (error.code === "CONNECTION_LOST") return "transient"; // Daemon crashed if (error.code === "DAEMON_CRASHED") return "daemon_failure"; if (error.code === "DAEMON_UNRESPONSIVE") return "daemon_failure"; // Permanent errors if (error.code === "INVALID_INPUT") return "permanent"; if (error.code === "PERMISSION_DENIED") return "permanent"; if (error.code === "RESOURCE_NOT_FOUND") return "permanent"; // Timeout if (error.code === "EXECUTION_TIMEOUT") return "timeout"; // Default to transient (safer to retry) return "transient"; }
Retry Strategy
typescript
interface RetryStrategy { shouldRetry(task: Task, error: TaskError): boolean; getDelay(task: Task): number; } const defaultRetryStrategy: RetryStrategy = { shouldRetry(task: Task, error: TaskError): boolean { // Check retry limit if (task.attempts.length >= task.execution.max_attempts) { return false; } // Check error type const category = classifyError(error); if (category === "permanent") return false; // Check deadline if (task.deadline && task.deadline < new Date()) { return false; } return true; }, getDelay(task: Task): number { const attempt = task.attempts.length; const baseDelay = task.execution.retry_delay; if (task.execution.retry_backoff === "exponential") { // Exponential backoff with jitter const delay = baseDelay * Math.pow(2, attempt - 1); const jitter = delay * 0.2 * Math.random(); return Math.min(delay + jitter, 600); // Max 10 minutes } else { // Linear backoff return baseDelay * attempt; } } };
Retry Flow
Task fails
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Classify error │
│ │
│ transient → Retry with backoff │
│ daemon_failure → Reassign to different daemon │
│ timeout → Retry with longer timeout (if allowed) │
│ permanent → Mark failed, notify user │
│ │
└─────────────────────────────────────────────────────────────┘
│
▼
┌─────────────────────────────────────────────────────────────┐
│ Check retry eligibility │
│ │
│ • Under max attempts? │
│ • Before deadline? │
│ • Error is retryable? │
│ │
└─────────────────────────────────────────────────────────────┘
│
├─── YES ───► Schedule retry with delay
│
└─── NO ────► Mark permanently failed, notify user
Daemon Failure Handling
typescript
function handleDaemonDisconnect(daemonId: string): void { // Get all tasks running on this daemon const affectedTasks = getTasksOnDaemon(daemonId); for (const task of affectedTasks) { // Record the failure task.attempts.push({ attempt_number: task.attempts.length + 1, daemon_id: daemonId, started_at: task.started_at, ended_at: new Date(), status: "failed", error: { code: "DAEMON_DISCONNECTED", message: "Daemon disconnected unexpectedly", retryable: true } }); // Try to reassign to another daemon task.requirements.excluded_daemons = [ ...(task.requirements.excluded_daemons || []), daemonId // Don't assign back to failed daemon immediately ]; task.status = "queued"; this.enqueue(task); log.warn(`Task ${task.id} requeued after daemon ${daemonId} disconnect`); } // Mark daemon as offline updateDaemonStatus(daemonId, "offline"); }
Task Lifecycle
┌──────────────────────────────────────┐
│ │
▼ │
CREATED ───► QUEUED ───► SCHEDULED ───► RUNNING ───► COMPLETED │
│ │ │ │
│ │ │ │
│ │ └──► FAILED ────► (retry?) ─────┘
│ │ │
│ │ └──► PERMANENTLY_FAILED
│ │
└──────────────┴──► CANCELLED (user cancelled)
│
└──► STALE (deadline passed before start)
State Transitions
typescript
const validTransitions: Record = { pending: ["queued", "cancelled"], queued: ["scheduled", "cancelled", "stale"], scheduled: ["running", "queued", "cancelled"], // Back to queued if daemon unavailable running: ["completed", "failed", "timeout", "cancelled"], failed: ["queued", "permanently_failed"], // Queued for retry completed: [], // Terminal cancelled: [], // Terminal timeout: ["queued", "permanently_failed"], // Retry or give up stale: [], // Terminal permanently_failed: [] // Terminal };
Observability
Metrics
typescript
interface QueueMetrics { // Queue depth queue_depth_total: number; queue_depth_by_priority: Record; queue_depth_by_source: Record; // Throughput tasks_enqueued_per_minute: number; tasks_completed_per_minute: number; tasks_failed_per_minute: number; // Latency avg_queue_wait_time_ms: number; avg_execution_time_ms: number; p99_queue_wait_time_ms: number; p99_execution_time_ms: number; // Daemons daemons_online: number; daemons_busy: number; total_daemon_capacity: number; used_daemon_capacity: number; // Errors retry_rate: number; failure_rate: number; timeout_rate: number; }
Logging
typescript
// Task lifecycle events log.info("task.created", { task_id, source, priority }); log.info("task.queued", { task_id, queue_depth, estimated_wait }); log.info("task.scheduled", { task_id, daemon_id }); log.info("task.started", { task_id, daemon_id }); log.info("task.completed", { task_id, duration_ms, daemon_id }); log.warn("task.failed", { task_id, error, attempt, will_retry }); log.error("task.permanently_failed", { task_id, error, total_attempts }); // System events log.info("daemon.connected", { daemon_id, capabilities }); log.warn("daemon.disconnected", { daemon_id, affected_tasks }); log.warn("backpressure.warning", { queue_depth, estimated_wait }); log.error("backpressure.critical", { queue_depth, actions_taken });
Database Schema
sql
-- Tasks table CREATE TABLE tasks ( id UUID PRIMARY KEY, user_id UUID NOT NULL REFERENCES users(id), idempotency_key VARCHAR(255), -- Source source_type VARCHAR(50) NOT NULL, source_id VARCHAR(255), source_channel VARCHAR(50), -- Content task_type VARCHAR(100) NOT NULL, input TEXT NOT NULL, context JSONB DEFAULT '{}', -- Priority & Scheduling priority VARCHAR(20) NOT NULL DEFAULT 'normal', deadline TIMESTAMP, sla TIMESTAMP, scheduled_for TIMESTAMP, -- Requirements requirements JSONB DEFAULT '{}', -- Execution config timeout_seconds INTEGER DEFAULT 300, max_attempts INTEGER DEFAULT 3, retry_delay_seconds INTEGER DEFAULT 30, retry_backoff VARCHAR(20) DEFAULT 'exponential', -- State status VARCHAR(30) NOT NULL DEFAULT 'pending', assigned_daemon UUID REFERENCES daemons(id), -- Results result JSONB, error JSONB, -- Timestamps created_at TIMESTAMP NOT NULL DEFAULT NOW(), queued_at TIMESTAMP, scheduled_at TIMESTAMP, started_at TIMESTAMP, completed_at TIMESTAMP, -- Metadata metadata JSONB DEFAULT '{}', parent_task_id UUID REFERENCES tasks(id), correlation_id UUID, -- Indexes UNIQUE(user_id, idempotency_key) ); CREATE INDEX idx_tasks_user_status ON tasks(user_id, status); CREATE INDEX idx_tasks_status_priority ON tasks(status, priority); CREATE INDEX idx_tasks_daemon ON tasks(assigned_daemon) WHERE status = 'running'; CREATE INDEX idx_tasks_deadline ON tasks(deadline) WHERE status IN ('queued', 'scheduled'); CREATE INDEX idx_tasks_correlation ON tasks(correlation_id); -- Task attempts table CREATE TABLE task_attempts ( id UUID PRIMARY KEY, task_id UUID NOT NULL REFERENCES tasks(id), attempt_number INTEGER NOT NULL, daemon_id UUID NOT NULL REFERENCES daemons(id), started_at TIMESTAMP NOT NULL, ended_at TIMESTAMP, status VARCHAR(20) NOT NULL, error JSONB, UNIQUE(task_id, attempt_number) ); -- Daemons table (may already exist) CREATE TABLE daemons ( id UUID PRIMARY KEY, user_id UUID NOT NULL REFERENCES users(id), name VARCHAR(255) NOT NULL, status VARCHAR(20) NOT NULL DEFAULT 'offline', capabilities TEXT[] DEFAULT '{}', max_concurrent_tasks INTEGER DEFAULT 5, active_tasks INTEGER DEFAULT 0, current_load FLOAT DEFAULT 0, last_heartbeat TIMESTAMP, created_at TIMESTAMP NOT NULL DEFAULT NOW() ); CREATE INDEX idx_daemons_user_status ON daemons(user_id, status);
Hub Code Structure
apps/hub/
├── src/
│ ├── index.ts # Entry point
│ │
│ ├── websocket/
│ │ ├── server.ts # WebSocket server
│ │ ├── handlers.ts # Message handlers
│ │ └── protocol.ts # Message types
│ │
│ ├── daemons/
│ │ ├── registry.ts # Daemon tracking
│ │ ├── heartbeat.ts # Health checking
│ │ └── capabilities.ts # Capability matching
│ │
│ ├── queue/ # TASK QUEUE MANAGER
│ │ ├── index.ts # Main orchestrator
│ │ ├── intake.ts # Task intake & validation
│ │ ├── priority.ts # Priority queues
│ │ ├── scheduler.ts # Daemon matching
│ │ ├── dispatcher.ts # Task dispatch
│ │ ├── retry.ts # Retry logic
│ │ ├── concurrency.ts # Limits enforcement
│ │ ├── backpressure.ts # Backpressure handling
│ │ └── metrics.ts # Queue metrics
│ │
│ ├── storage/
│ │ ├── tasks.ts # Task persistence
│ │ └── cache.ts # In-memory cache
│ │
│ └── api/
│ ├── routes.ts # HTTP endpoints
│ └── middleware.ts # Auth, rate limiting
│
├── Dockerfile
├── fly.toml
└── package.json
Integration Points
Proactive Engine → Queue Manager
typescript
// Proactive engine submits tasks const response = await hub.submitTask({ source: { type: "proactive", id: "trigger_123" }, type: "analysis", input: "Analyze revenue drop", priority: "normal", requirements: { capabilities: ["api_call"] } }); if (response.status === "rate_limited") { // Back off and retry later await this.handleBackpressure(response.backpressure); }
User → Queue Manager (via Desktop/Telegram)
typescript
// User submits task from desktop const task = await hub.submitTask({ source: { type: "user", channel: "desktop" }, type: "computer_use", input: "Open Figma and export assets", priority: "high", // User tasks default to high requirements: { capabilities: ["computer_use"], target_daemon: "primary" // User's local daemon } });
Queue Manager → Daemon
typescript
// Dispatch task to daemon websocket.send(daemon.connection, { type: "task.execute", task: { id: task.id, type: task.type, input: task.input, context: task.context, timeout: task.execution.timeout } }); // Daemon responds with progress/completion websocket.on("task.progress", (msg) => { ... }); websocket.on("task.completed", (msg) => { ... }); websocket.on("task.failed", (msg) => { ... });
Summary
| Component | Responsibility |
|---|---|
| Intake | Validate, prioritize, rate limit |
| Priority Queues | Organize by urgency |
| Scheduler | Match tasks to daemons |
| Dispatcher | Send to daemons, track execution |
| Retry Handler | Handle failures, schedule retries |
| Concurrency Manager | Enforce limits, preemption |
| Backpressure Handler | Protect system from overload |
The queue manager is the traffic controller that ensures tasks run efficiently across your daemon fleet.