10

Below I have attached a test program to demonstrate a problem I am having with asyncio.gather throwing a TypeError.

My objective: To make multiple concurrent asynchronous calls to capture camera images to files from an array of USB cameras attached to my computer. When all cameras have completed their async captures, I want then resume processing.

The async coroutine take_image() shown here makes a system call to the "ffmpeg" application that captures an image from the specified camera to a specified file.

import asyncio
import os
import subprocess
import time

async def take_image(camera_id, camera_name, image_file_path, image_counter):
    image_capture_tic = time.perf_counter()
    try:
        run_cmd = subprocess.run( ["ffmpeg", '-y', '-hide_banner', '-f', 'avfoundation', '-i', camera_id,
                                   '-frames:v', '1', '-f', 'image2', image_file_path], universal_newlines=True,
                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE)  # Note, ffmpeg writes to stderr, not stdout!
    except Exception as e:
        print("Error: Unable to capture image for", image_file_path)
        return "NO IMAGE!"

    image_capture_toc = time.perf_counter()
    print(f"{image_counter}: Captured {camera_name} image in: {image_capture_toc - image_capture_tic:0.0f} seconds")
    return camera_name

The main() routine shown below takes a list of multiple cameras, and iterating over each camera in the list, main() makes creates an asyncio task for each camera using asyncio.create_task(). Each task is added to a list of tasks.

Once all image capture tasks have been started, I await their completion using await asyncio.gather(tasks).

async def main():
    tic = time.perf_counter()
    camera_list = [('0', 'FHD Camera #1'),  ('1', 'FHD Camera #2'), ('2', 'FHD Camera #3'), ]
    image_counter = 1
    tasks = []
    for camera_pair in camera_list:
        camera_id, camera_name = camera_pair
        image_file_name = 'img' + str(image_counter) + "-cam" + str(camera_id)  + "-" + camera_name + '.jpg'
        image_file_path = os.path.join("/tmp/test1/img", image_file_name)

        # schedule all image captures calls *concurrently*:
        tasks.append(asyncio.create_task(take_image(camera_id, camera_name, image_file_path, image_counter),
                     name=image_file_name))
        image_counter = image_counter + 1

    await asyncio.gather(tasks) # <-- This line throws a TypeError!
    toc = time.perf_counter()
    print(f"Captured list of {image_counter - 1} cameras in: {toc - tic:0.0f} seconds")

asyncio.run(main())

Unfortunately, when I attempt to run this program, I am getting this error:

TypeError: unhashable type: 'list'

and the following Traceback:

Traceback (most recent call last):
  File "scratch_10.py", line 41, in <module>
    asyncio.run(main())
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
  File "scratch_10.py", line 36, in main
    await asyncio.gather(tasks)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py", line 805, in gather
    if arg not in arg_to_fut:
TypeError: unhashable type: 'list'

I have been trying to puzzle through the 3.8 documentation on asyncio, but I don't understand what is wrong.

How can I have each take_image request run asynchronously, and then resume processing in my calling routine once each task is complete?

2 Answers 2

18

gather takes positional arguments, not a single, iterable argument. You need to unpack your list.

await asyncio.gather(*tasks)
Sign up to request clarification or add additional context in comments.

3 Comments

This line seems to work. But isn't 'asyncio.create_task()' already scheduling these coroutines in the eventloop? I would expect, that if I got a list of tasks/futures/coroutines, they should all start/being scheduled when I execute 'asyncio.gather(*task_list)'.
@zonk I'm not sure I fully understand what you're asking. Yes create_task will schedule execution. It's common, though, to want to wait until a collection of tasks are complete. Awaiting gather is the way to do that.
If I create a task and try to gather it again, wouldn't it occure errors? Example: async def sleeper(): // await asyncio.sleep(1)and now in main g = asyncio.create_task(sleeper) // task = [g] // res = await asyncio.gather(*task) // print(res) wouldn't this cause errors? Because g is already scheduled anyway? And what if I want to start them in gatherat the same time?
0

You should try: (Task1, Task2, Task3) That is what i did for tasks an it worked

1 Comment

Although in my example I gave a fixed list of 3 cameras, in my production system, I will have a variable length list of cameras with names and ids not known until run time. So hard coding it seems impractical.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.