12

My Python script contains a loop that uses subprocess to run commands outside the script. Each subprocess is independent. I listen for the returned message in case there's an error; I can't ignore the result of the subprocess. Here's the script without asyncio (I've replaced my computationally expensive call with sleep):

from subprocess import PIPE  # https://docs.python.org/3/library/subprocess.html
import subprocess

def go_do_something(index: int) -> None:
    """
    This function takes a long time
    Nothing is returned
    Each instance is independent
    """
    process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
    stdout = process.stdout.decode("utf-8")
    stderr = process.stderr.decode("utf-8")
    if "error" in stderr:
        print("error for "+str(index))
    return

def my_long_func(val: int) -> None:
    """
    This function contains a loop
    Each iteration of the loop calls a function
    Nothing is returned
    """
    for index in range(val):
        print("index = "+str(index))
        go_do_something(index)

# run the script
my_long_func(3) # launch three tasks

I think I could use asyncio to speed up this activity since the Python script is waiting on the external subprocess to complete. I think threading or multiprocessing are not necessary, though they could also result in faster execution. Using a task queue (e.g., Celery) is another option.

I tried implementing the asyncio approach, but am missing something since the following attempt doesn't change the overall execution time:

import asyncio
from subprocess import PIPE  # https://docs.python.org/3/library/subprocess.html
import subprocess


async def go_do_something(index: int) -> None:
    """
    This function takes a long time
    Nothing is returned
    Each instance is independent
    """
    process = subprocess.run(["sleep","2"],stdout=PIPE,stderr=PIPE,timeout=20)
    stdout = process.stdout.decode("utf-8")
    stderr = process.stderr.decode("utf-8")
    if "error" in stderr:
        print("error for "+str(index))
    return

def my_long_func(val: int) -> None:
    """
    This function contains a loop
    Each iteration of the loop calls a function
    Nothing is returned
    """
    # https://docs.python.org/3/library/asyncio-eventloop.html
    loop = asyncio.get_event_loop()
    tasks = []
    for index in range(val):
        task = go_do_something(index)
        tasks.append(task)
    # https://docs.python.org/3/library/asyncio-task.html
    tasks = asyncio.gather(*tasks)
    loop.run_until_complete(tasks)
    loop.close()
    return

my_long_func(3) # launch three tasks

If I want to monitor the output of each subprocess but not wait while each subprocess runs, can I benefit from asyncio? Or does this situation require something like multiprocessing or Celery?

2 Answers 2

10

Try executing the commands using asyncio instead of subprocess.

Define a run() function:

import asyncio

async def run(cmd: str):
    proc = await asyncio.create_subprocess_shell(
        cmd,
        stderr=asyncio.subprocess.PIPE,
        stdout=asyncio.subprocess.PIPE
    )

    stdout, stderr = await proc.communicate()

    print(f'[{cmd!r} exited with {proc.returncode}]')
    if stdout:
        print(f'[stdout]\n{stdout.decode()}')
    if stderr:
        print(f'[stderr]\n{stderr.decode()}')

Then you may call it as you would call any async function:

asyncio.run(run('sleep 2'))

#=>

['sleep 2' exited with 0]

The example was taken from the official documentation. Also available here.

Sign up to request clarification or add additional context in comments.

2 Comments

Thanks for pointing me to that documentation; I wasn't aware of that integration. When I try to adopt the example to my case, I'm failing. Specifically, I tried adding the async def run(cmd) to my first example above, placing await run("sleep 2") inside go_do_something(index). That results in an error message saying that await run("sleep 2") is SyntaxError: invalid syntax
Here's what I tried based on your suggestion -- pastebin.com/AU9YgbGG
5

@ronginat pointed me to https://asyncio.readthedocs.io/en/latest/subprocess.html which I was able to adapt to the situation I am seeking:

import asyncio

async def run_command(*args):
    # Create subprocess
    process = await asyncio.create_subprocess_exec(
        *args,
        # stdout must a pipe to be accessible as process.stdout
        stdout=asyncio.subprocess.PIPE)
    # Wait for the subprocess to finish
    stdout, stderr = await process.communicate()
    # Return stdout
    return stdout.decode().strip()

async def go_do_something(index: int) -> None:
    print('index=',index)
    res = await run_command('sleep','2')
    return res

def my_long_func(val: int) -> None:
    task_list = []
    for indx in range(val):
        task_list.append( go_do_something(indx) )
    loop = asyncio.get_event_loop()
    commands = asyncio.gather(*task_list)
    reslt = loop.run_until_complete(commands)
    print(reslt)
    loop.close()

my_long_func(3) # launch three tasks

The total time of execution is just over 2 seconds even though there are three sleeps of duration 2 seconds. And I get the stdout from each subprocess.

3 Comments

How would you handle long-lived processes ? Like I create a few processes in advance (to load a chunk of massive data) and wait for them to send messages via a Pipe My system doesn't seem to use asyncio properly (I am not "awaiting" the process response, it seems to be a synchronous call)
this is a very nice approach to send the subprocess to asyncIO and then just retrieve the results, similar to pool.map thanks for the example =)
any particular reason why 2 functions, go_do_something and run_command? can this be all done inside go_do_something?

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.