I have lots of async code which is used for websocket connectons. Idea is simple: fetch data from websocket, calculate some math and do lots of io.
data = await websocket.recv()
# Do some math
await obj.save()
await redis.publish(data)
await api_client.send(data)
# And so on
This works perfect in an independent container.
But now i received another task:
Make Celery worker which will "re-check" data from third party api once in 10 minutes and do all the same logic. So i have to put all this stuff into celery task. Currently i do something like this:
@app.task
def sync_with_api():
for obj in objects_to_sync:
asgiref.sync.async_to_sync(websocket_logic)(obj)
This works, but i do all of it in synchronous way, but we expect thousand of objects to sync. So i am wondering about performance.
I thought about using gevent pool and did this:
@app.task
def sync_objects():
for obj in objects_to_sync:
app.send_task("core.sync_task", args=[obj])
@app.task
def sync_task(obj):
asgiref.sync.async_to_sync(websocket_logic)(obj)
From my perspective, this should works better. But i am not sure is it a good idea of combining gevent with asyncio because this is two different concurrency models. Does anyone had experience with problems like that?
Or i should just refactor original code and made two versions:
- Async to work with websockets
- Sync to work with gevent