1

I am using Celery chords to structure parallel AI processing of large document page content. Because this is a single use function with no public signature, I am pickling the objects to distribute and reaggregate. The task to process a single page is successfully reading the arguments and performing needed function. It fails however trying to return results to queue for subsequent aggregation.

Does anyone know of a way to specify a result_serializer for a single task called via Chord?

chord generation---

callback = processPageResults.subtask(kwargs={'cdd_id' : cdoc.cdd_id,'user_id':user.id},options={'serializer':'pickle'})

res = chord([processPage.s(useBold,
       docPages[i]).set(serializer='pickle') for i in range(0, len(docPages))], callback)()

called task --- @shared_task(serializer='pickle',result_serializer='pickle',bind=True, max_retries=20) def processPage(self, *args): useBold = args[0] page= args[1] page.processWords(useBold) return page

error ---

kombu.exceptions.EncodeError: Object of type DocumentPage is not JSON serializable

2 Answers 2

4

Ideally you can set result_serializer in the signature which works for normal task.

In case of chord, it is not working well for me. But on the other hand, you can update celery global configuration and it will work.

I am trying to report this is as bug. Lets see.

celery = Celery("app_name", backend=result_backend, broker=broker_url)

celery.conf.update(
    result_serializer='pickle',        
)
Sign up to request clarification or add additional context in comments.

4 Comments

looking at the current stable docs, result_serializer is not an option you can set for tasks/subtasks. i believe the option serializer refer to task_serializer option in global: docs.celeryproject.org/en/v4.4.7/userguide/… poor name choice imo
yes, It doesn't seem to accept. serializer is param to accept task parameters. It does not impact the return result serialization. It seems like that only way to set result serialize in the global param.
Anyone of you figured out how to change this yet? It's been a year and wanting tasks to be serialized differently must be a somewhat common problem (and not have one serializer as the result for all tasks)?
FYI someone opened a github issue for 'not being able to set the result_serializer at runtime with tasks.add.apply_async(..., results_serializer='pickle') github.com/celery/celery/issues/6310 .
0

After revisiting this, I have a solution for my use case. The result_serializer for me was the wrong concept. Because celery will automatically serialize arguments passed to task, I solved my problem simply by setting the task_serializer for the callback to 'pickle'. It appears that for a set of chained tasks, result_serializer isn't really useful

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.