13

So here's an oxymoron: I want to create an asynchronous blocking queue in javascript/typescript (if you can implement it without typescript, that's fine). Basically I want to implement something like Java's BlockingQueue expect instead of it actually being blocking, it would be async and I can await dequeues.

Here's the interface I want to implement:

interface AsyncBlockingQueue<T> {
  enqueue(t: T): void;
  dequeue(): Promise<T>;
}

And I'd use it like so:

// enqueue stuff somewhere else

async function useBlockingQueue() {
  // as soon as something is enqueued, the promise will be resolved:
  const value = await asyncBlockingQueue.dequeue();
  // this will cause it to await for a second value
  const secondValue = await asyncBlockingQueue.dequeue();
}

Any ideas?

1 Answer 1

20

It's quite simple actually, dequeue will create a promise that enqueue will resolve. We just have to keep the resolvers in a queue - and also care about the case where values are enqueued before they are dequeued, keeping the already fulfilled promises in a queue.

class AsyncBlockingQueue {
  constructor() {
    // invariant: at least one of the arrays is empty
    this.resolvers = [];
    this.promises = [];
  }
  _add() {
    this.promises.push(new Promise(resolve => {
      this.resolvers.push(resolve);
    }));
  }
  enqueue(t) {
    // if (this.resolvers.length) this.resolvers.shift()(t);
    // else this.promises.push(Promise.resolve(t));
    if (!this.resolvers.length) this._add();
    this.resolvers.shift()(t);
  }
  dequeue() {
    if (!this.promises.length) this._add();
    return this.promises.shift();
  }
  // now some utilities:
  isEmpty() { // there are no values available
    return !this.promises.length; // this.length <= 0
  }
  isBlocked() { // it's waiting for values
    return !!this.resolvers.length; // this.length < 0
  }
  get length() {
    return this.promises.length - this.resolvers.length;
  }
  [Symbol.asyncIterator]() {
    // Todo: Use AsyncIterator.from()
    return {
      next: () => this.dequeue().then(value => ({done: false, value})),
      [Symbol.asyncIterator]() { return this; },
    };
  }
}

I don't know TypeScript, but presumably it's simple to add the the necessary type annotations.

For better performance, use a Queue implementation with circular buffers instead of plain arrays, e.g. this one. You might also use only a single queue and remember whether you currently store promises or resolvers.

Sign up to request clarification or add additional context in comments.

5 Comments

I'd like to thank stackoverflow for making me add more ! to reach the character minimum
@Bergi In this solution, why is there the check if (!this.resolvers.length) in enqueue? The logic appears to encode the creation of a Promise (and a resolver), and then the immediate removal of the resolver. What am I missing? Does the length of the array of resolvers ever get above 1?
@52d6c6af The length of the resolvers goes above 0 when there were more dequeue() calls than enqueue() calls, the length of the promises goes above 0 when there were more enqueue() calls than dequeue() calls.
@52d6c6af No, I don't think this has to do anything with the trampolines that I know
Thank you for your solution! I've added a requeue function that adds a Promise at the head of the array. This way I can do a NO-ACK and still maintain the sequence of enqueued items: ` requeue(t) { if (!this.promises.length) { this._add() } else { this.promises.unshift(new Promise(resolve => { this.resolvingFunctions.unshift(resolve) })) } this.resolvingFunctions.shift()(t) } `

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.