2

I have a process that reads input and writes output, like this doubler:
(in reality it's actually a black box and the input and the output are completely independent)

#!/bin/bash
while read -r i; do
    sleep 0.$RANDOM
    echo $((i*2))
done

and a few functions in my Python code that feeds this process asynchronously:

import asyncio
import subprocess
import random

class Feeder:
    def __init__(self):
        self.process = subprocess.Popen(['doubler.sh'],
                                        stdin=subprocess.PIPE)

    def feed(self, value):
        self.process.stdin.write(str(value).encode() + b'\n')
        self.process.stdin.flush()

feeder = Feeder()

async def feed_random():
    while True:
        feeder.feed(random.randint(0, 100))
        await asyncio.sleep(1)

async def feed_tens():
    while True:
        feeder.feed(10)
        await asyncio.sleep(3.14)

async def main():
    await asyncio.gather(
        feed_random(),
        feed_tens(),
    )

if __name__ == '__main__':
    asyncio.run(main())

This works well. But I would like to read the output of the process too, like this:

...
stdout=subprocess.PIPE
...
for line in feeder.process.stdout:
    print("The answer is " + line.decode())

but that is blocking, so the feeding won't happen. Can this be done in the same asyncio loop? Or do I need another thread?

2
  • do you want everything to print? or just what ends up in process.stdout Commented Mar 26, 2020 at 18:20
  • @gold_cy stdout is sufficient, I don't need stderr. Commented Mar 27, 2020 at 7:45

1 Answer 1

2

Something like this should work. In order to read from stdout asynchronously you have to switch to asyncio.subprocess.

import asyncio
import random

class Feeder:
    def __init__(self):
        self.process = None

    async def start_process(self):
        self.process = await asyncio.create_subprocess_exec('./doubler.sh',
            stdin=asyncio.subprocess.PIPE,
            stdout=asyncio.subprocess.PIPE)

    async def feed(self, value):
        self.process.stdin.write(str(value).encode() + b'\n')
        await self.process.stdin.drain()

feeder = Feeder()

async def feed_random():
    while True:
        asyncio.ensure_future(feeder.feed(random.randint(0, 100)))
        await asyncio.sleep(1)

async def feed_tens():
    while True:
        asyncio.ensure_future(feeder.feed(10))
        await asyncio.sleep(3.14)

async def read_feed():
    while True:
        line = await feeder.process.stdout.readline()
        print("The answer is " + line.decode('utf-8'))


async def main():
    await feeder.start_process()
    await asyncio.gather(
        feed_random(),
        feed_tens(),
        read_feed()
    )

if __name__ == '__main__':
    asyncio.run(main())
Sign up to request clarification or add additional context in comments.

2 Comments

Interesting. Could you please explain what is drain for and why you use ensure_future instead of awaiting?
sure so drain is the equivalent of flush. then we use ensure_future to create an asyncio.Task for both feeding in a random integer and 10. if we use await that would be a blocking call until it completes. by creating a Task from each of them we can then have both tasks run concurrently in our asyncio.gather call. hope that makes sense. if you are using Python 3.7+ you can replace ensure_future with asyncio.create_task however, I wasn't sure what version you are using and ensure_future is backwards compatible.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.