2

I want to execute tasks asynchronously and concurrently. If task1 is running when task2 arrives, task2 is started right away, without waiting for task2 to complete. Also, I would like to avoid callbacks with the help of coroutines.

Here's a concurrent solution with callbacks:

def fibonacci(n):
    if n <= 1:
        return 1
    return fibonacci(n - 1) + fibonacci(n - 2)


class FibonacciCalculatorFuture:

    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)

    @staticmethod
    def calculate(n):
        print(f"started n={n}")
        return fibonacci(n)

    def run(self, n):
        future = self.pool.submit(self.calculate, n)
        future.add_done_callback(lambda f: print(f.result()))


if __name__ == '__main__':
    calculator = FibonacciCalculatorFuture()
    calculator.run(35)
    calculator.run(32)
    print("initial thread can continue its work")

Its output:

started n=35
started n=32
initial thread can continue its work
3524578
14930352

And here's my effort to get rid of callbacks:

class FibonacciCalculatorAsync:

    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)
        self.loop = asyncio.get_event_loop()

    @staticmethod
    def calculate_sync(n):
        print(f"started n={n}")
        return fibonacci(n)

    async def calculate(self, n):
        result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
        print(result)

    def run(self, n):
        asyncio.ensure_future(self.calculate(n))


if __name__ == '__main__':
    calculator = FibonacciCalculatorAsync()
    calculator.run(35)
    calculator.run(32)
    calculator.loop.run_forever()
    print("initial thread can continue its work")

Output:

started n=35
started n=32
3524578
14930352

In this case initial thread won't be able to go further than loop.run_forever() and hence won't be able to accept new tasks.

So, here's my question: is there a way to simultaneously:

  • execute tasks concurrently;
  • be able to accept new tasks and schedule them for execution right away (along with already running taks);
  • use coroutines and code without callbacks.

3 Answers 3

1

The second bullet from your question can be met by running asyncio in a dedicated thread and using asyncio.run_coroutine_threadsafe to schedule coroutines. For example:

class FibonacciCalculatorAsync:
    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)
        self.loop = asyncio.get_event_loop()

    @staticmethod
    def calculate_sync(n):
        print(f"started n={n}")
        return fibonacci(n)

    async def calculate(self, n):
        result = await self.loop.run_in_executor(self.pool, self.calculate_sync, n)
        print(result)

    def run(self, n):
        asyncio.run_coroutine_threadsafe(self.calculate(n), self.loop)

    def start_loop(self):
        thr = threading.Thread(target=self.loop.run_forever)
        thr.daemon = True
        thr.start()


if __name__ == '__main__':
    calculator = FibonacciCalculatorAsync()
    calculator.start_loop()
    calculator.run(35)
    calculator.run(32)
    print("initial thread can continue its work")
    calculator.run(10)
    time.sleep(1)
Sign up to request clarification or add additional context in comments.

Comments

1

loop.run_forever() will indeed run forever, even if there are no tasks inside. Good news is that you don't need this function. In order to wait for your computations to complete, use asyncio.gather:

class FibonacciCalculatorAsync:

    def __init__(self):
        self.pool = ThreadPoolExecutor(max_workers=2)
        # self.loop = asyncio.get_event_loop()

    ...

    async def calculate(self, n):
        loop = asyncio.get_running_loop()
        result = await loop.run_in_executor(self.pool, self.calculate_sync, n)
        print(result)


async def main():
    calculator = FibonacciCalculatorAsync()
    fib_35 = asyncio.ensure_future(calculator.run(35))
    fib_32 = asyncio.ensure_future(calculator.run(32))

    print("initial thread can continue its work")
    ...

    # demand fibonaccy computation has ended
    await asyncio.gather(fib_35, fib_32)


if __name__ == '__main__':
    asyncio.run(main())

Please note how the loop is handled here - I changed a few things. If you start using asyncio, I'd actually recommend to have one loop for all the things instead of creating loops for more granular task. With this approach, you get all asyncio bells and whistles for handling and synchronizing tasks.

Also, it is not possible to parallelize pure Python non-IO code in ThreadPoolExecutor due to GIL. Keep that in mind and prefer a process pool executor in such cases.

9 Comments

How can we call await not from a coroutine?
You need to create an async def main() function and use asyncio.run() instead.
Yes, indeed, I missed that this happens outside of any loop. I'll amend the answer.
So, do I get it right, that if I want to use coroutines in my code, I have to wrap the entry point of my project in a coroutine? I cannot restrict their scope to the place of usage.
@MaximBlumental that's the preferred approach to work with asyncio, yes. Imagine you have some loops for some smaller tasks in your code, and then you suddenly need two subtasks from two different loops run concurrently - here's where it gets unbearably complicated - without any profit comparing to a single loop passing tasks down to executors.
|
1

Please note that since python is a single threaded language because of the global interpreter lock, you cannot achieve true concurrency when doing cpu bound tasks, like calculating Fibonacci. It will still just run synchronously. However you can achieve 'fake it til you make it' concurrency with I/O bound tasks like reading/writing to sockets. This can be read about in more depth in O'Reilly Python Concurrency with asyncio. The book will demonstrate with coding examples what I have said above and goes more in depth on asyncio leveraging the OS's underlying event notification api to achieve concurrency with i/o bound tasks.

1 Comment

The fact that there is GIL means that we cannot have true parallelism with threads. But we do have true concurrency with CPU bound tasks in Python.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.