0

I have written a async function which collects multiple text data and does data processsing in a batch. After that, it returns the output, like this:

import sys
import asyncio

Model_runner():
    '''
    The model runner combines all the input coming to it and combines in a batch of 10 or 1 sec, which ever duration is less. 
    After combining, it does processing and returns the output 
    '''


loop = asyncio.get_event_loop()
model_obj = ModelRunner(loop)
loop.create_task(model_obj.model_runner())


async def process_text(text):
        out_ = await model_obj.process_input(text)
        return out_

To get the output, I am running the following code:

task1 = asyncio.ensure_future(process_text(text1))
task2 = asyncio.ensure_future(process_text(text2))
task3 = asyncio.ensure_future(process_text(text3))
task4 = asyncio.ensure_future(process_text(text4))
async_tasks = [task1, task2, task3, task4]
out1, out2 ,out3 ,out4 = loop.run_until_complete(asyncio.gather(*async_tasks))

Here, out1, out2, out3, and out4 are the output after processing the text data.

Here, I do not want to combine the task like [task1, task2, task3, task4] and then call the loop.run_until_complete to get the output. Instead, I am looking for a function like this:

out1 = func(text1)
out2 = func(text2) 
etc..

But, they should work in in non blocking way like asyncio.ensure_future. How can I do that. Thanks in advance.

3
  • Do you want func(text2) to only start once func(text1) is complete, etc., or do you want them both to run at the same time? Commented Oct 13, 2020 at 8:54
  • @ArthurTacca I want both the functions to run at the same time, as I want to call them from threads that generate the text data. I do not want to create async_tasks. The Model_runner will handle if multiple inputs are coming at the same time. Basically, func(text) should work as a async api over the process_text(text) function. Commented Oct 13, 2020 at 9:16
  • @ArthurTacca The text data is generated continuously over multiple threads and each thread should be able to call process_text(text) at the same time. Commented Oct 13, 2020 at 9:41

1 Answer 1

1

Two obvious options:

  • If you already have multiple threads, why bother with asyncio at all? Just make process_text a regular blocking function and call it from those threads.
  • Conversely, if you're using asyncio, why use multiple threads at all? Make your top-level tasks async and run them all in one thread.

If you really must use multiple threads and async functions:

  • Have a single thread running your asyncio loop and the worker threads you already mentioned, and use loop.call_soon_threadsafe in the threads to force the asyncs function to run in the async thread. If you want to get the result back to the thread you can use a queue.Queue to send the result (or results) back.
  • This final option is the worst one possible and almost certainly not what you want, but I mention it for completeness: start a separate asyncio event loop from each thread that needs it and use those to run your async functions in the worker threads directly.
Sign up to request clarification or add additional context in comments.

1 Comment

Thanks for the reply. Actually, I was working on the second method that you suggested, and not using any threads here. I am using loop.run_forever() ` ` But, now I am getting memory leak. As my precessing is done in GPU, my memory is increasing at the operation is running. Can you suggest any methods to resolve? ### starting asyncio operation ` asyncio_list = [] for i in range(cam_run): print("cam ", i, " initialised") asyncio_list.append(asyncio.ensure_future(process_text(text))) loop.run_forever() loop.close() `

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.