0

I'm having trouble with RxJS and the correct way to handle an array of request. Let's say I have an array of about 50 requests as follows:

let requestCounter = 0;
function makeRequest(timeToDelay) {
  return of('Request Complete!').pipe(delay(timeToDelay));
}

const requestArray = []
for(let i=0;i<25;i++){
  requestArray.push(makeRequest(3000)); //3 seconds request
  requestArray.push(makeRequest(1000)); //1 second request
}

My goal is to:

  • Launch the requests in parallel
  • Only 5 can run a the same moment
  • When a request is done, the next in the array starts
  • When a request is done(success or error), I need to increment my variable 'requestCounter' by one (requestCounter++)
  • When my last request in the queue is done, I need to subscribe to this event and handle an array resulting of every requests result

So far the closest I've been to do this is by following the response in this post:

RxJS parallel queue with concurrent workers?

The thing is that I'm discovering RxJS and the exemple is way too complicated for me and I can't find how to handle the counter for each request.

Hope you can help me. (Sorry for the broken english, it is not my native language)

Edit: Final solution looks like this:

forkJoinConcurrent<T>(
    observables: Observable<T>[],
    concurrent: number
  ): Observable<T[]> {
    return from(observables).pipe(
      mergeMap((outerValue, outerIndex) => outerValue.pipe(
        tap(// my code ),
        last(),
        catchError(error => of(error)),
        map((innerValue, innerIndex) => ({index: outerIndex, value: innerValue})),
      ), concurrent),
      toArray(),
      map(a => (a.sort((l, r) => l.index - r.index).map(e => e.value))),
    );
  }
3
  • The answer from the link you posted does what you're asking for. You can use tap in an operator queue for side effects like increasing a counter. In the answer from the link you posted a single request is done after the last operator. The toArray operator combines all request to one array so you have to add tap(_ => requestCounter++) somewhere in the queue after last() but before toArray(). Commented Jul 1, 2019 at 9:44
  • Great, it does the job! My only problem is that if one of the calls fail, all the others are cancelled. Is there a way to make it keep going and process the error in the array returned at the end? Commented Jul 1, 2019 at 13:32
  • You have to catch errors on every single request with the catchError operator and return an alternative value stream instead, e.g. catchError(error => of(error)). Either add catchError at the end of the pipe where you create a request, i.e. in your makeRequest function (preferred), or add it within the inner pipe after last. Commented Jul 1, 2019 at 13:47

1 Answer 1

0

First of all you should use subject to store the request queue, and take a look at the mergeMap operator, there's a concurrency parameter you can set for max concurrency as well as an index variable to track the number of calls

https://www.learnrxjs.io/operators/transformation/mergemap.html

const requestArray=new Subject()
for(let i=0;i<25;i++){
  requestArray.next(makeRequest(3000)); //3 seconds request
  requestArray.next(makeRequest(1000)); //1 second request
}

requestArray.pipe(
mergeMap((res,index)=>of([res,index]),
res=>res,5),
map((res,index)=>{if(index===25) .... do your thing ; return res;})
).subscribe(console.log)
Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.