0

I have a wrapper script that starts a worker, but I don't understand how to limit the amount of concurrency. I tried to read the celery code, but for me it's very hard to see how they do it.

My code:

from celery.bin import worker as w

my_worker = w.worker(app=app )
options = {
  'loglevel': loglevel,
  'queues': [service,],
  'hostname': hostname,
  }
my_worker.run(**options)

I don't really know how to add the concurrency to this.

1 Answer 1

1

Here is my code just use cmd celery:

def start(self,datas):
    queue = datas['queue']
    # check if exists with app.control.inspect().active()
    all_workers = self.active(datas).keys()
    if all_workers:
        return None
    #return os.getcwd()
    a="celery multi start %s_worker -A celeryserver  -Q '%s' --concurrency=1 -l  DEBUG" % (queue, queue)
    sys.argv = a.split()
    from celery.bin.celery import main
    try:
        main()
    except SystemExit as exit:
        return exit.code 

for your code, add 'concurrency': 1 to your options (It should work, not tested), but I suggests you to use main with sys.argv just like I do.
Because it can auto parse sys.argv into *args, **options with Command.handle_argv and call __call__ which call self.run() (just like you do,but more compatibility and easy).

Sign up to request clarification or add additional context in comments.

5 Comments

Thanks! Couldn't find the concurrency option in the worker.run function, but it works.
All options not list in run function, will be received by **kwargs.
Yup, that's what I understood. For an outsider it is just difficult to figure out which arguments are really used I guess. Thanks again. It works now.
What is datas?
Can you post the whole class, not sure what self.active(datas).keys() does.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.