49

I have some objects I want to send to celery tasks on my application. Those objects are obviously not json serializable using the default json library. Is there a way to make celery serialize/de-serialize those objects with custom JSON Encoder/Decoder?

2 Answers 2

77

A bit late here, but you should be able to define a custom encoder and decoder by registering them in the kombu serializer registry, as in the docs: http://docs.celeryproject.org/en/latest/userguide/calling.html#serializers.

For example, the following is a custom datetime serializer/deserializer (subclassing python's builtin json module) for Django:


myjson.py (put it in the same folder of your settings.py file)

import json
from datetime import datetime
from time import mktime

class MyEncoder(json.JSONEncoder):   
    def default(self, obj):
        if isinstance(obj, datetime):
            return {
                '__type__': '__datetime__', 
                'epoch': int(mktime(obj.timetuple()))
            }
        else:
            return json.JSONEncoder.default(self, obj)

def my_decoder(obj):
    if '__type__' in obj:
        if obj['__type__'] == '__datetime__':
            return datetime.fromtimestamp(obj['epoch'])
    return obj

# Encoder function      
def my_dumps(obj):
    return json.dumps(obj, cls=MyEncoder)

# Decoder function
def my_loads(obj):
    return json.loads(obj, object_hook=my_decoder)


settings.py

# Register your new serializer methods into kombu
from kombu.serialization import register
from .myjson import my_dumps, my_loads

register('myjson', my_dumps, my_loads, 
    content_type='application/x-myjson',
    content_encoding='utf-8') 

# Tell celery to use your new serializer:
CELERY_ACCEPT_CONTENT = ['myjson']
CELERY_TASK_SERIALIZER = 'myjson'
CELERY_RESULT_SERIALIZER = 'myjson'
Sign up to request clarification or add additional context in comments.

12 Comments

Where should we put this code? Where do we import it?
@EduardLuca are you using Django? Please see my edit
There is a typo: 'epoc' should be 'epoch' on lines 10 and 20
This used to work perfectly fine, but since celery 4.2 I cannot get it to work. Error when starting celery worker: Unrecoverable error: ContentDisallowed('Refusing to deserialize untrusted content of type json (application/json)',)
Kombu utils.json.py now provides, register_type. Pretty straightforward. See here for usage.
|
0

Today, there is an easier way to do this job with kombu register_type.

Here is an example of implementation for dataclass serialization/deserialization.

from functools import partial
from dataclasses import asdict
from kombu.utils.json import register_type


def class_full_name(clz: type) -> str:
    return ".".join([clz.__module__, clz.__qualname__])


def _encoder(obj) -> dict:
    return asdict(obj)


def _decoder(clz: type, data: dict):
    return clz(**data)


def register_kombu_type(model):
    register_type(
        model,
        class_full_name(model),
        encoder=_encoder,
        decoder=partial(_decoder, model),
    )

Inspired by zeroohub's solution for Pydantic. Ref

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.