How do you run a chain task in a for loop since the signatures are generated dynamically. The following approach was used because defining the tester task as:
@task
def tester(items):
ch = []
for i in items:
ch.append(test.si(i))
return chain(ch)()
would raise an error of EncodeError(RuntimeError('maximum recursion depth exceeded while getting the str of an object',),) if the chains are too large which is os or system specific.
E.g calling the task as follows
item = range(1,40000) #40,000 raises exception but #3,000 doesn't after setting sys.setrecursionlimit(15000)
tester.delay(item)
raises the EcodeError. In the past I used to have this error when length of item is 5000 i.e range(1,5000). Which i fixed by importing sys and calling sys.setrecursionlimit(15000) at the top of the module. But there is a limitation to this so I decided to refactor a little and use the approach below. That is trying, to split the list and do it in chunks after chunks.The problem is it doesn't seem to continue after 2000 i.e test prints 2000 to screen.
@task
def test(i):
print i
@task
def tester(items):
ch = []
for i in items:
ch.append(test.si(i))
counter = 1
if len(ch) > 2000:
ch_length = len(ch) #4k
while ch_length >= 2000:
do = ch[0:2000] # 2k
print "Doing...NO#...{}".format(counter)
ret = chain(do)() #doing 2k
print "Ending...NO#...{}".format(counter)
ch = ch[2000:] #take all left i.e 2k
ch_length = len(ch) #2k
if ch_length <= 2000 and ch_length > 0:
print "DOING LAST {}".format(counter)
ret = chain(ch)()
print "ENDING LAST {}".format(counter)
break
else:
break
counter +=1
else:
ret = chain(ch)()
return ret
According to celery documentation, a chain basically executes task within it one after the other. I expect the while loop to continue only first iteration is conpleted in the chain before proceeding.
I hope someone has experience with this and could help.
Merry Xmas in advance!