How to build list of tasks for asyncio.gather in Python 3.8
Asked Answered
A

2

9

Below I have attached a test program to demonstrate a problem I am having with asyncio.gather throwing a TypeError.

My objective: To make multiple concurrent asynchronous calls to capture camera images to files from an array of USB cameras attached to my computer. When all cameras have completed their async captures, I want then resume processing.

The async coroutine take_image() shown here makes a system call to the "ffmpeg" application that captures an image from the specified camera to a specified file.

import asyncio
import os
import subprocess
import time

async def take_image(camera_id, camera_name, image_file_path, image_counter):
    image_capture_tic = time.perf_counter()
    try:
        run_cmd = subprocess.run( ["ffmpeg", '-y', '-hide_banner', '-f', 'avfoundation', '-i', camera_id,
                                   '-frames:v', '1', '-f', 'image2', image_file_path], universal_newlines=True,
                                 stdout=subprocess.PIPE, stderr=subprocess.PIPE)  # Note, ffmpeg writes to stderr, not stdout!
    except Exception as e:
        print("Error: Unable to capture image for", image_file_path)
        return "NO IMAGE!"

    image_capture_toc = time.perf_counter()
    print(f"{image_counter}: Captured {camera_name} image in: {image_capture_toc - image_capture_tic:0.0f} seconds")
    return camera_name

The main() routine shown below takes a list of multiple cameras, and iterating over each camera in the list, main() makes creates an asyncio task for each camera using asyncio.create_task(). Each task is added to a list of tasks.

Once all image capture tasks have been started, I await their completion using await asyncio.gather(tasks).

async def main():
    tic = time.perf_counter()
    camera_list = [('0', 'FHD Camera #1'),  ('1', 'FHD Camera #2'), ('2', 'FHD Camera #3'), ]
    image_counter = 1
    tasks = []
    for camera_pair in camera_list:
        camera_id, camera_name = camera_pair
        image_file_name = 'img' + str(image_counter) + "-cam" + str(camera_id)  + "-" + camera_name + '.jpg'
        image_file_path = os.path.join("/tmp/test1/img", image_file_name)

        # schedule all image captures calls *concurrently*:
        tasks.append(asyncio.create_task(take_image(camera_id, camera_name, image_file_path, image_counter),
                     name=image_file_name))
        image_counter = image_counter + 1

    await asyncio.gather(tasks) # <-- This line throws a TypeError!
    toc = time.perf_counter()
    print(f"Captured list of {image_counter - 1} cameras in: {toc - tic:0.0f} seconds")

asyncio.run(main())

Unfortunately, when I attempt to run this program, I am getting this error:

TypeError: unhashable type: 'list'

and the following Traceback:

Traceback (most recent call last):
  File "scratch_10.py", line 41, in <module>
    asyncio.run(main())
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/runners.py", line 43, in run
    return loop.run_until_complete(main)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/base_events.py", line 608, in run_until_complete
    return future.result()
  File "scratch_10.py", line 36, in main
    await asyncio.gather(tasks)
  File "/Library/Frameworks/Python.framework/Versions/3.8/lib/python3.8/asyncio/tasks.py", line 805, in gather
    if arg not in arg_to_fut:
TypeError: unhashable type: 'list'

I have been trying to puzzle through the 3.8 documentation on asyncio, but I don't understand what is wrong.

How can I have each take_image request run asynchronously, and then resume processing in my calling routine once each task is complete?

Acculturize answered 21/7, 2020 at 23:34 Comment(0)
W
17

gather takes positional arguments, not a single, iterable argument. You need to unpack your list.

await asyncio.gather(*tasks)
Washin answered 22/7, 2020 at 4:42 Comment(3)
This line seems to work. But isn't 'asyncio.create_task()' already scheduling these coroutines in the eventloop? I would expect, that if I got a list of tasks/futures/coroutines, they should all start/being scheduled when I execute 'asyncio.gather(*task_list)'.Novelistic
@Novelistic I'm not sure I fully understand what you're asking. Yes create_task will schedule execution. It's common, though, to want to wait until a collection of tasks are complete. Awaiting gather is the way to do that.Washin
If I create a task and try to gather it again, wouldn't it occure errors? Example: async def sleeper(): // await asyncio.sleep(1)and now in main g = asyncio.create_task(sleeper) // task = [g] // res = await asyncio.gather(*task) // print(res) wouldn't this cause errors? Because g is already scheduled anyway? And what if I want to start them in gatherat the same time?Novelistic
G
0

You should try: (Task1, Task2, Task3) That is what i did for tasks an it worked

Guan answered 21/7, 2020 at 23:55 Comment(1)
Although in my example I gave a fixed list of 3 cameras, in my production system, I will have a variable length list of cameras with names and ids not known until run time. So hard coding it seems impractical.Acculturize

© 2022 - 2024 — McMap. All rights reserved.