3

How do you run a chain task in a for loop since the signatures are generated dynamically. The following approach was used because defining the tester task as:

@task
def tester(items):
    ch = []
    for i in items:
        ch.append(test.si(i))
    return chain(ch)()

would raise an error of EncodeError(RuntimeError('maximum recursion depth exceeded while getting the str of an object',),) if the chains are too large which is os or system specific.

E.g calling the task as follows

item = range(1,40000) #40,000 raises exception but #3,000 doesn't after setting sys.setrecursionlimit(15000)
tester.delay(item)

raises the EcodeError. In the past I used to have this error when length of item is 5000 i.e range(1,5000). Which i fixed by importing sys and calling sys.setrecursionlimit(15000) at the top of the module. But there is a limitation to this so I decided to refactor a little and use the approach below. That is trying, to split the list and do it in chunks after chunks.The problem is it doesn't seem to continue after 2000 i.e test prints 2000 to screen.

@task
def test(i):
    print i


@task
def tester(items):
    ch = []
    for i in items:
        ch.append(test.si(i))
    counter = 1
    if len(ch) > 2000:
        ch_length = len(ch) #4k
        while ch_length >= 2000:
            do = ch[0:2000] # 2k
            print "Doing...NO#...{}".format(counter)
            ret = chain(do)() #doing 2k
            print "Ending...NO#...{}".format(counter)
            ch = ch[2000:] #take all left i.e 2k

            ch_length = len(ch) #2k
            if ch_length <= 2000 and ch_length > 0:
                print "DOING LAST {}".format(counter)
                ret = chain(ch)()
                print "ENDING LAST {}".format(counter)
                break
            else:
                break
            counter +=1
    else:
        ret = chain(ch)()
    return ret

According to celery documentation, a chain basically executes task within it one after the other. I expect the while loop to continue only first iteration is conpleted in the chain before proceeding.

I hope someone has experience with this and could help.

Merry Xmas in advance!

1 Answer 1

3

It seems you met this issue: https://github.com/celery/celery/issues/1078

Also calling chain(ch)() seems to execute it asynchronously. Try to explicitely call apply() on it.

@app.task
def tester(items):
    ch = []
    for i in items:
        ch.append(test.si(i))

    PSIZE = 1000
    for cl in range(0, len(ch), PSIZE):
        print("cl: %s" % cl)
        chain(ch[cl:cl + PSIZE]).apply()
        print("cl: %s END" % cl)
    return None
Sign up to request clarification or add additional context in comments.

3 Comments

Bravo! This seems to be working with the actual test snippet I provided. Would see whats going to happen with actual code and revert. Thanks this is a head start especially with the apply()
I'd like to say that calling chain([t, t, t, ...]).apply() in a loop is the same as calling t() in a loop. I mean celery is not needed in this case.
you are right but the reason i use this approach is that in my actual code, test() performs calls other tasks and also does some CRUD in database. Well regardless i think without the chain it would work

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.