Async task queue with concurrency control, priority, pause/resume, backpressure, timeout, retry, and stats. Zero dependencies. TypeScript-first.
npm install @corvid-agent/queueRun async tasks with controlled concurrency:
import { Queue } from "@corvid-agent/queue";
const queue = new Queue({ concurrency: 3 });
// Add tasks — returns a promise with the result
const result = await queue.add(async () => {
const res = await fetch("/api/data");
return res.json();
});
// Add multiple tasks at once
const results = await queue.addAll([
() => fetchUser(1),
() => fetchUser(2),
() => fetchUser(3),
]);Higher priority tasks run first:
const queue = new Queue({ concurrency: 1 });
queue.add(() => lowPriorityWork()); // priority: 0 (default)
queue.add(() => urgentWork(), { priority: 10 }); // runs first
queue.add(() => mediumWork(), { priority: 5 }); // runs secondLimit queue size to prevent unbounded memory growth:
const queue = new Queue({ concurrency: 2, maxSize: 100 });
try {
await queue.add(() => work());
} catch (err) {
// QueueFullError when queue exceeds maxSize
}const queue = new Queue({ autoStart: false }); // starts paused
queue.add(() => task1());
queue.add(() => task2());
queue.resume(); // start processing
queue.pause(); // stop starting new tasks (running tasks finish)
queue.resume(); // continueCancel tasks with AbortController:
const controller = new AbortController();
const promise = queue.add(() => fetchData(), {
signal: controller.signal,
});
controller.abort(); // removes from queue, rejects with TaskAbortedErrorReject tasks that take too long:
import { TaskTimeoutError } from "@corvid-agent/queue";
try {
await queue.add(() => slowOperation(), { timeout: 5000 });
} catch (err) {
if (err instanceof TaskTimeoutError) {
console.log(`Timed out after ${err.timeout}ms`);
}
}Automatically retry failed tasks with optional backoff:
// Constant delay between retries
await queue.add(() => flakeyApi(), {
retries: 3,
retryDelay: 1000,
});
// Exponential backoff: 100ms, 200ms, 400ms
await queue.add(() => flakeyApi(), {
retries: 3,
retryDelay: (attempt) => 100 * Math.pow(2, attempt - 1),
});Timeouts and aborts are not retried — only thrown errors trigger retries.
Track cumulative queue performance:
const queue = new Queue({ concurrency: 5 });
// ... process tasks ...
console.log(queue.stats);
// { processed: 100, succeeded: 95, failed: 5, retries: 12, timedOut: 2 }
queue.resetStats(); // reset all countersconst queue = new Queue({ concurrency: 5 });
queue.on("active", () => console.log(`Task started. Active: ${queue.active}`));
queue.on("completed", (result) => console.log("Task done:", result));
queue.on("error", (err) => console.error("Task failed:", err));
queue.on("idle", () => console.log("All done"));
queue.on("empty", () => console.log("Pending queue empty"));
// Wait for specific states
await queue.onIdle(); // all tasks completed
await queue.onEmpty(); // pending queue empty
await queue.onDrained(); // everything processedMap, filter, and iterate with concurrency control:
import { map, each, filter } from "@corvid-agent/queue";
// Concurrent map — preserves order
const users = await map(userIds, (id) => fetchUser(id), { concurrency: 5 });
// Concurrent side effects
await each(emails, (email) => sendNotification(email), { concurrency: 10 });
// Concurrent filter
const alive = await filter(servers, async (server) => {
const res = await fetch(server.healthUrl);
return res.ok;
}, { concurrency: 20 });| Option | Type | Default | Description |
|---|---|---|---|
concurrency |
number |
1 |
Max concurrent tasks |
maxSize |
number |
Infinity |
Max pending tasks (backpressure) |
autoStart |
boolean |
true |
Start processing on add |
| Option | Type | Default | Description |
|---|---|---|---|
priority |
number |
0 |
Higher = runs first |
signal |
AbortSignal |
- | Cancel the task |
timeout |
number |
- | Reject after ms with TaskTimeoutError |
retries |
number |
0 |
Retry attempts on failure |
retryDelay |
number | (attempt) => number |
0 |
Delay between retries (ms) |
| Property | Type | Description |
|---|---|---|
queue.size |
number |
Pending tasks |
queue.active |
number |
Running tasks |
queue.isPaused |
boolean |
Whether paused |
queue.isIdle |
boolean |
Nothing running or pending |
queue.stats |
QueueStats |
Cumulative { processed, succeeded, failed, retries, timedOut } |
| Method | Description |
|---|---|
add(fn, opts?) |
Add a task, returns Promise<T> |
addAll(fns, opts?) |
Add multiple tasks |
pause() |
Stop starting new tasks |
resume() |
Resume processing |
clear() |
Remove all pending tasks |
onIdle() |
Wait until idle |
onEmpty() |
Wait until pending is empty |
onDrained() |
Wait until all processed |
on(event, listener) |
Add event listener |
off(event, listener) |
Remove event listener |
resetStats() |
Reset all statistics to zero |
Concurrent map preserving order.
Concurrent iteration.
Concurrent filter preserving order.
QueueFullError— thrown whenmaxSizeis exceededTaskAbortedError— thrown when a task is aborted or queue is clearedTaskTimeoutError— thrown when a task exceeds itstimeout
MIT