Skip to content

Await-able queues #1586

@aearly

Description

@aearly

I've been thinking about how we might make queues work better with async/await. It would be nice if certain operations returned promises, so they could be awaitable. However, most of the callbacks used in queues are really more event-based, rather than a single task that resolves once.

For example, q.push(task) also accepts a callback. We could make push() return a promise if you leave the callback off. However, push() also accepts an array of items to push, in which case, the callback is called multiple times. Promises can't resolve multiple times.

const done = await q.push([foo, bar, baz]) // how should this resolve?

We could make it so it only resolves when all the tasks complete, but this complicates internal logic.

The handling of errors in this case would also be a bit strange. The callback for push() is mainly useful for determining if an error occurred processing the item. If we return a Promise, suddenly that error becomes an unhandled rejection. If we return a promise from push() suddenly a lot of existing code starts throwing unhandled rejections, as most people dont use the push() callback. We could change it so the promise resolves with the error object in those cases.

There are several event callbacks you can register, empty, saturated, error etc. It seems really difficult to make these awaitable in a useful way. I've thought about changing the QueueObject API so that event handlers are methods, rather than bare properties.

q.error((err, task) => console.log(`Queue error: ${err.message} for ${task}`))

//rather than

q.error = (err, task) => console.log(`Queue error: ${err.message} for ${task}`)

In this case, we could make it so calling the method without passing a function returns a promise that resolves the next time the event triggers.

const [err, task] = await q.error()

This is a bit clunky. To repeatedly await errors, you'd have to use an infinite loop:

var q = async.queue(taskFn)

const errorLoop = async () => {
  while(q.length) { // not a good condition to test for
    const [err, task] = await q.error()  // if no error occurs, this waits here forever
    console.log(err)
    q.push(task)
  }
}
errorLoop() // fire and forget, hopefully it exits when the queue drains.

It's also a bit complicated to implement internally. We have to keep track of the promises for each event, creating promise references for each and tracking them appropriately.

Returning a promise on the next firing of the event would be the most useful for the drain callback. It makes the most common use of the queue work nicely in the context of an async function:

async processItems(items) {
  const q = async.queue(processAsync, 5)
  q.push(items)
  await q.drain()
}

These changes add slightly more utility to the queue in async/await but might be more trouble than they're worth.


Another idea for the event-style callbacks is to return an Async Iterator:

const iterator = q.error()

for await ([err, item] of iterator) {
  //...
}

These would still have to be wrapped in a fire-and-forget async function, unless you used one of the for await loops to drive the lifecycle of the queue within the main async function. I think we would also have to assume that a drain event means the queue has ended, thus also ending all the async iterators.

Lots of implementation complexity here as well.

Metadata

Metadata

Assignees

No one assigned

    Projects

    No projects

    Milestone

    Relationships

    None yet

    Development

    No branches or pull requests

    Issue actions