3

Right now I have the following code:

import axios from 'axios'

const urls = ['https://google.com', 'https://yahoo.com']

async function* requests() {
  for (const url of urls) {
    yield axios.get(url)
  }
}

;(async () => {
  for await (const n of requests()) {
    console.log(n.config.url) // prints https://google.com and then https://yahoo.com
  }
})()

As is, the requests won't block the single thread of node, but they will happen in sequence. I'm wondering if it would be possible to change the code to force parallelism.

8
  • 4
    would Promise.all solve your problem? Commented Feb 6, 2020 at 14:31
  • Not sure what you mean. How would I use Promise.all to parallelize things? I also need to use the response as they are available because we have a memory constrain. Commented Feb 6, 2020 at 14:35
  • 2
    in your case, if you await Promise.all(requests()) it will return you an array with the resolved promises of axios.get() So it will run all requests in parallel, and Promise.all() resolves when every Promise inside the iterator has resolved Commented Feb 6, 2020 at 14:39
  • That's not good enough, I can't hold every one of those responses in memory as I stated before. I need to use them as they are available, like a stream. Commented Feb 6, 2020 at 14:49
  • 2
    You can batch them, instead of every request, iterate every n requests. Or use rxjs Commented Feb 6, 2020 at 14:54

3 Answers 3

5

The "simpler" no-deps way would be to batch them and yield every batch with Promise.all

import axios from 'axios'

const urls = [
  'https://jsonplaceholder.typicode.com/todos/1', 
  'https://jsonplaceholder.typicode.com/posts/1',
  'https://jsonplaceholder.typicode.com/users/1',
  'https://jsonplaceholder.typicode.com/comments/1'
]

async function* requests(batchSize = 1) {
  let batchedRequests = [];
  for (const url of urls) {
    batchedRequests.push(axios.get(url));
    if (batchedRequests.length === batchSize) {
      yield Promise.all(batchedRequests);
      batchedRequests = [];
    }
  }
  if (batchedRequests.length) { //if there are requests left in batch
    yield Promise.all(batchedRequests);
  }
}

;(async () => {
  for await (const batch of requests(2)) {
    batch.forEach(n => console.log(n.config.url)) // prints https://google.com and then https://yahoo.com
  }
})()

You can use rxjs to achieve similar results, with the advantages that observables have in terms of flexibility, but it's another library and can be more complex if you're not familiar with reactive streams. Here is a detailed post I found on the topic: https://medium.com/@ravishivt/batch-processing-with-rxjs-6408b0761f39

Sign up to request clarification or add additional context in comments.

Comments

0

You can use lfi, which supports "concurrent iterables":

pipe(
  asConcur(urls),
  mapConcur(url => axios.get(url)),
  forEachConcur(response => console.log(n.config.url)),
)

Each URL will move through the pipeline of operations without being blocked by other URLs.

Comments

0

You can use a normal iterator on Promise to do that without any external lib.

Here is a simple example in TypeScript to help with understanding typing:

import axios from 'axios'

const urls = ['https://google.com', 'https://yahoo.com']

function* requests(): IterableIterator<Promise<AxiosResponse<any>>> {
  for (const url of urls) {
    yield axios.get(url)
  }
}

async function* parallelize<T>(
  it: IterableIterator<Promise<T>>,
  parallelSize = 100,
): AsyncIterableIterator<T> {
  const processing = new Set<Promise<T>>();

  let val = it.next();
  while (!val.done) {
    const value = val.value;

    processing.add(value);
    value.then(() => processing.delete(value));

    if (processing.size >= parallelSize) {
      yield Promise.race(processing.values());
    }

    val = it.next();
  }

  while (processing.size) {
    yield Promise.race(processing.values());
  }
}

for await (const res of parallelize(requests())) {
  console.log(res.config.url); // prints https://google.com and https://yahoo.com (orders depends on who completes first)
}

What is even more awesome with this approach is that it will start new requests as soon as one is done and not serialize batches of x requests. It will constantly keep x requests running.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.