0

So the project I am working on requires a distributed tasks system to process CPU intensive tasks. This is relatively straight forward, spin up celery and throw all the tasks in a queue and have celery do the rest.

The issue I have is that every user needs their own queue, and items within each users queue must be processed synchronously. So it there is a task in a users queue already processing, wait until it is finished before allowing a worker to pick up the next.

The closest I've come to something like this is having a fixed set of queues, and assigning them to users. Then having the users tasks picked off by celery workers fixed to a certain queue with a concurrency of 1.

The problem with this system is that I can't scale my workers to process a backlog of user tasks.

Is there a way I can configure celery to do what I want, or perhaps another task system exists that does what I want?

Edit:

Currently I use the following command to spawn my celery workers with a concurrency of one on a fixed set of queues

celery multi start 4 -A app.celery -Q:1 queue_1 -Q:2 queue_2 -Q:3 queue_3 -Q:4 queue_4 --logfile=celery.log --concurrency=1

I then store a queue name on the user object, and when the user starts a process I queue a task to the queue stored on the user object. This gives me my synchronous tasks.

The downside is when I have multiple users sharing queues causing tasks to build up and never getting processed.

I'd like to have say 5 workers, and a queue per user object. Then have the workers just hop over the queues, but never have more than 1 worker on a single queue at a time.

2
  • Do you have any code from you attempts you can share? Commented May 11, 2018 at 16:24
  • There isn't much to it, but I've added some more information to the original question. Commented May 11, 2018 at 16:32

1 Answer 1

1

I use chain doc here condition for execution task in a specific order :

chain = task1_task.si(account_pk) | task2_task.si(account_pk) | task3_task.si(account_pk)
chain()

So, i execute for a specific user task1 when its finished i execute task2 and when finished execute task3. It will spawm in any worker available :)

For stopping a chain midway:

self.request.callbacks = None
return

And don't forget to bind your task :

@app.task(bind=True)
def task2_task(self, account_pk):
Sign up to request clarification or add additional context in comments.

7 Comments

Can you dynamically add to a chain? An example being I am a user and I have a chain with a long running task currently being executed, but I want to run another task. Instead of creating a new chain that will be picked up by another worker, can I just push the new task onto the old chain?
Don't do that, it will be un-maintenable in the futur, but what you can do is add some logic in task2_task or task3_task, so from the result of task1_task you will do different code in task2 or task3, if you want to stock a chain midway just add : self.request.callbacks = None and return it will stop the chain without error message
The thing is, my tasks are never created in an environment where they can be chained. Basically a user makes a change in the front end editor, the change is pushed to the server, the server pushes out a task to render the new changes, and the user gets pushed the result via websockets. But the user can make multiple changes at varying times, creating multiple tasks. Each task needs to be executed synchronously for that user, with the results coming back in the order they were sent.
so what you need is not chain, you need to add in db all the user change request and task_id, and after a task is executed check if a task is available, for new task check if the current task is already executed and finished with celery.result.AsyncResult(task_id) or do nothing (the running task will execute the new after finished)
Here is a quick sketch of what I'm trying to achieve. The above example is what I have currently, and it is working, it just doesn't scale very well. The example below that is what I'd like to achieve, but I've been unable to find any resources of anything similar. i.imgur.com/jlCbApN.png
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.