2

i need to make worker queue for aiohttp.

right now im using asyncio.gather, but it works in wrong way:

enter image description here

this is what i want to make:

enter image description here

the first one can be implemented with following code:

async def some_stuff(_):
    pass

tasks = []
for i in data:
    tasks.append(do_stuff(i))

asyncio.run(asyncio.gather(*tasks))

i need example of

4
  • Usually most of the delay with network requests comes from waiting, so if your tasks are good async code you shouldn't need any workers for that. If that's not the case, this question seems similar: stackoverflow.com/questions/55993833/… Commented Jan 25, 2022 at 15:28
  • @jaaq gather wait for every single task to end, i don't need this. i need exact amount of running tasks at a moment without waiting, for example all 5 to finish, to start another 5 new Commented Jan 25, 2022 at 16:54
  • What have you tried so far? What specific problem(s) are you having? Commented Jan 25, 2022 at 19:16
  • @dirn i just have no idea how to make second option in python Commented Jan 25, 2022 at 20:34

1 Answer 1

6

As I understand it, you want to run exactly 5 tasks in parallel. When one of those tasks finishes, you want to start a new task immediately. For this purpose, asyncio.gather doesn't work since it waits for all of its tasks to finish before proceeding.

I suggest something along these lines:

from collections import deque
import random
import asyncio

class RunSome:
    def __init__(self, task_count=5):
        self.task_count = task_count
        self.running = set()
        self.waiting = deque()
        
    @property
    def running_task_count(self):
        return len(self.running)
        
    def add_task(self, coro):
        if len(self.running) >= self.task_count:
            self.waiting.append(coro)
        else:
            self._start_task(coro)
        
    def _start_task(self, coro):
        self.running.add(coro)
        asyncio.create_task(self._task(coro))
        
    async def _task(self, coro):
        try:
            return await coro
        finally:
            self.running.remove(coro)
            if self.waiting:
                coro2 = self.waiting.popleft()
                self._start_task(coro2)
            
async def main():
    runner = RunSome()
    async def rand_delay():
        rnd = random.random() + 0.5
        print("Task started", asyncio.current_task().get_name(),
              runner.running_task_count)
        await asyncio.sleep(rnd)
        print("Task ended", asyncio.current_task().get_name(),
              runner.running_task_count)
    for _ in range(50):
        runner.add_task(rand_delay())
    # keep the program alive until all the tasks are done
    while runner.running_task_count > 0:
        await asyncio.sleep(0.1)
        
if __name__ == "__main__":
    asyncio.run(main())
        

Output:

Task started Task-2 5
Task started Task-3 5
Task started Task-4 5
Task started Task-5 5
Task started Task-6 5
Task ended Task-6 5
Task started Task-7 5
Task ended Task-4 5
Task ended Task-2 5
Task started Task-8 5
Task started Task-9 5
Task ended Task-5 5
Task started Task-10 5
Task ended Task-3 5
.....
Task started Task-51 5
Task ended Task-48 5
Task ended Task-47 4
Task ended Task-49 3
Task ended Task-51 2
Task ended Task-50 1

Coroutines are first class objects in Python. As such they can be put into lists and sets.

All of the task creation is handled by RunSome. You pass it coroutines to be executed. It knows how many tasks are currently running, and it decides either to create a new task immediately or add the coroutine to a queue of pending tasks. When a task finishes, it grabs a new coroutine out of the queue, if one is available. The number of running tasks never exceeds the threshold count that was passed to the constructor (default is 5). The tasks are wrappers around the passed coroutines.

You will have to figure out what to do with the returned values, if any. The error handling here is rudimentary, but it does maintain the correct number of running tasks because of the try:finally: block.

Sign up to request clarification or add additional context in comments.

5 Comments

thanks, this is what i wanted <3
could you edit this to get returned value?
There is no simple, general way to do that. The returned values become available asynchronously. add_task cannot return a result because the result isn't available. The best it could do would be to return a Future, but then how would your application handle those? In general, a Future should get awaited at some point and its result method called after the await. It might be simplest to write your coroutines to do something useful with the result just before they end.
how about making another queue to receive results, and after completing return list of these?
You are not saving the return value of create_task anywhere. Could this not be dangerous? The documentation states that tasks to which no hard references exist (event loop maintains only weak references to them) can be garbage collected at any time, even before finishing. docs.python.org/3/library/asyncio-task.html#creating-tasks

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.