7

I have a long running celery task which iterates over an array of items and performs some actions.

The task should somehow report back which item is it currently processing so end-user is aware of the task's progress.

At the moment my django app and celery seat together on one server, so I am able to use Django's models to report the status, but I am planning to add more workers which are away from Django, so they can't reach DB.

Right now I see few solutions:

  • Store intermediate results manually using some storage, like redis or mongodb making then available over the network. This worries me a little bit because if for example I will use redis then I should keep in sync the code on a Django side reading the status and Celery task writing the status, so they use the same keys.
  • Report status to the Django back from celery using REST calls. Like PUT http://django.com/api/task/123/items_processed
  • Maybe use Celery event system and create events like Item processed on which django updates the counter
  • Create a seperate worker which runs on a server with django which holds a task which only increases items proceeded count, so when the task is done with an item it issues increase_messages_proceeded_count.delay(task_id).

Are there any solution or hidden problems with the ones I mentioned?

3
  • Why can't they reach the DB? Commented Sep 26, 2015 at 13:58
  • Because I want to reduce coupling and I do not want to expose DB. Commented Sep 26, 2015 at 16:59
  • I can understand that and I design workers as decoupled as possible but for the workloads I have designed for a worker is more capable if it knows something (e.g. has access to the DB). I can see tasks like "produce a PDF" or "render this image" that doesn't strictly need the Django DB. I'm just wondering if you need it or you want it. I'll elaborate in my answer below. Commented Sep 26, 2015 at 17:32

5 Answers 5

9
+50

There are probably many ways to achieve your goal, but here is how I would do it.

Inside your long running celery task set the progress using django's caching framework:

from django.core.cache import cache

@app.task()
def long_running_task(self, *args, **kwargs):
    key = "my_task: %s" % self.result.id
    ...
    # do whatever you need to do and set the progress
    # using cache:
    cache.set(key, progress, timeout="whatever works for you")
    ...

Then all you have to do is make a recurring AJAX GET request with that key and retrieve the progress from cache. Something along those lines:

 def task_progress_view(request, *args, **kwargs):
     key = request.GET.get('task_key')
     progress = cache.get(key)
     return HttpResponse(content=json.dumps({'progress': progress}),
                         content_type="application/json; charset=utf-8")

Here is a caveat though, if you are running your server as multiple processes, make sure that you are using something like memcached, because django's native caching will be inconsistent among the processes. Also I probably wouldn't use celery's task_id as a key, but it is sufficient for demonstration purpose.

Sign up to request clarification or add additional context in comments.

Comments

4

Take a look at flower - a real-time monitor and web admin for Celery distributed task queue:

You need it for presentation, right? Flower works with websockets.

For instance - receive task completion events in real-time (taken from official docs):

var ws = new WebSocket('ws://localhost:5555/api/task/events/task-succeeded/');
ws.onmessage = function (event) {
    console.log(event.data);
}

You would likely need to work with tasks ('ws://localhost:5555/api/tasks/').

I hope this helps.

2 Comments

It is limited to the events celery provides. For task started/completed events I do not need Flower. I need custom events, like stage X of a task completed.
Flower doesn't even record custom status. e.g the 'PROGRESS' status update isn't reflected in Flower. I suppose, what the asker(and I) want is something that reports the execution of a task akin to the logger.
2

Simplest:

Your tasks and django app already share access one or two data stores - the broker and the results backend (if you're using one that is different to the broker)

You can simply put some data into one or other of these data stores that indicates which item the task is currently processing.

e.g. if using redis simply have a key 'task-currently-processing' and store the data relevant to the item currenlty being processed in there.

7 Comments

You are right, I can utilize redis/mongo/mysql by manually maintaining some counters. That's what I do right now. What I thought about is how to utilize the MQ Celery already uses. So each task would produce some events likes item proceeded which will be taken care of on a server where django is located. Because counters is not the only thing I need to do when the item processing is over. There could be more stuff to be taken care of which the task processing items is not aware of.
I thought you said that the database (mysql) was not available to the workers
It is now, but that's what I want to avoid.
At the moment I need a simple counter which shows user how much items was proceeded. But in the near feature I will need more. Celery already uses a pub/sub redis, so I thought there is a way to make this work within the celery. It has an event system, but the number of event types is really limited and they all are related more to the celery monitoring. So otherwise I'll need to work manually with redis, which is not that great.
As as it is only a counter - yes. But then I need to create a pub/sub channel so anybody could subscribe to a "item proceeded" event, not just a function which only increases the counter. Therefore I need to create an abstraction like observer/observable so one could subscribe to item proceeded event and other to trigger the event and so on. So I'll have to create a proper infrastructure.
|
1

You can use something like Swampdragon to reach the user from the Celery instance (you have to be able to reach it from the client thou, take care not to run afoul of CORS thou). It can be latched onto the counter, not the model itself.

3 Comments

what sort of process worker Why not? Workers receive task data as a json object. Thet are not aware of my DB schema. I want to have a propper separation of Django + DB container and workers, which are installed on many different servers. Worker which parses pages shouldn't be aware of what user account model is, but it should have a propper way of reporting task's progress.
Did you take a look at Swampdragon? It incurs the extra effort of running a tornado node on your worker but websockets are just the sort of thing to keep a client updated.
Yeah, I know about it and plan to dig it in as it support python3 unlike django-redis-websockets. But it seems to be quite young, not sure if it's ready for production. And I am not sure if it's Django orientation will lead to some problems with code decoupling
0

lehins' solution looks good if you don't mind your clients repeatedly polling your backend. That may be fine but it gets expensive as the number of clients grows.

Artur Barseghyan's solution is suitable if you only need the task lifecycle events generated by Celery's internal machinery.

Alternatively, you can use Django Channels and WebSockets to push updates to clients in real-time. Setup is pretty straightforward.

  1. Add channels to your INSTALLED_APPS and set up a channel layer. E.g., using a Redis backend:
CHANNEL_LAYERS = {
    "default": {
        "BACKEND": "channels_redis.core.RedisChannelLayer",
        "CONFIG": {
            "hosts": [("redis", 6379)]
        }
    }
}
  1. Create an event consumer. This will receive events from Channels and push them via Websockets to the client. For instance:
import json
from asgiref.sync import async_to_sync
from channels.generic.websocket import WebSocketConsumer


class TaskConsumer(WebsocketConsumer):
    def connect(self):
        self.task_id = self.scope['url_route']['kwargs']['task_id'] # your task's identifier
        async_to_sync(self.channel_layer.group_add)(f"tasks-{self.task_id}", self.channel_name)
        self.accept()

    def disconnect(self, code):
        async_to_sync(self.channel_layer.group_discard)(f"tasks-{self.task_id}", self.channel_name)

    def item_processed(self, event):
        item = event['item']
        self.send(text_data=json.dumps(item))
  1. Push events from your Celery tasks like this:
from asgiref.sync import async_to_sync
from channels.layers import get_channel_layer

...
async_to_sync(get_channel_layer.group_send)(f"tasks-{task.task_id}", {
    'type': 'item_processed',
    'item': item,
})

You can also write an async consumer and/or invoke group_send asynchronously. In either case you no longer need the async_to_sync wrapper.

  1. Add websocket_urlpatterns to your urls.py:
websocket_urlpatterns = [
    path(r'ws/tasks/<task_id>/', TaskConsumer.as_asgi()),
]
  1. Finally, to consume events from JavaScript in your client, you can do something like this:
let task_id = 123;
let protocol = location.protocol === 'https:' ? 'wss://' : 'ws://';
let socket = new WebSocket(`${protocol}${window.location.host}/ws/tasks/${task_id}/`);

socket.onmessage = function(event) {
    let data = JSON.parse(event.data);
    let item = data.item;
    // do something with the item (e.g., push it into your state container)
}

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.