0

I have a django backend and I'm trying to poll the status of multiple Celery Tasks (https://docs.celeryq.dev/en/stable/index.html) every 3 seconds.

So for example I have 4 task ids:

  1. 3099023
  2. 3493494
  3. 4309349
  4. 5498458

So I have to call http.get<...>(backend) every 3 seconds 4 times with each id, until the state of a id is "FINISHED". This should stop the poll of the finished task.

Is there a simple way to do that?

I was trying to do that in a for loop:

  1. Iterating through my ids
  2. Create a timer for each id
  3. In each timer poll the id with a backend http call
  4. Continue if response of the http call is not finished - otherwise stop

1 Answer 1

1

I think that we can do something better using rxjs, this is an example of using rxjs operators to achieve your goal.

NOTE: This is just an example without Components/Services to make the things simple in the example.

import { interval, of, mergeMap, from, switchMap, takeWhile } from 'rxjs';


const ids = [3099023, 3493494, 4309349, 5498458];

from(ids).pipe(
  mergeMap((id) => 
    interval(3000).pipe(
      switchMap((val) => mockHttpCallExample(val, id)),
      takeWhile((httpResonse) => httpResonse !== 'FINISHED')
  ))).subscribe(val => console.log(val))


const mockHttpCallExample = (val, id) => {
  let httpResponse;

  if (id % 2 === 0 && val === 4) {
    httpResponse = 'FINISHED'
  } else if (id % 2 !== 0 && val === 5) {
    httpResponse = 'FINISHED'
  } else {
    httpResponse = 'OK'
  }

  console.log(id, val);

  return of(httpResponse)
} 

What are we doing here:

  1. With the from operator, we are emitting a value for each id
  2. In the mergeMap, every 3s we are making an http request to the API
  3. The process will end when we receive the 'FINISHED' string from the mock API call
Sign up to request clarification or add additional context in comments.

4 Comments

Wow this is super smart! Thank you for your reply and the answer. Another question: How can I display the information of subscribe(val => console.log(val)) at the frontend (with updates every interval?) of each return?
You're welcome ! Let's say that this logic with rxjs is in a service, you will not subscribe in the service but the service's method will just return the observables and you will subscribe in the component (or use the async pipe). In this way you can have these information in the component. Ofc there are also other ways to achieve this.
I got that so far! Thank you so much Dragan! How can I use the latest state after FINISHED? Any idea how I can work with the object that came after a finish?
Got it! inclusive=True. So good! Thank you again and have a great evening

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.