1

I have a websocket server (python 3.x) taking requests where each is a url variable. It runs just fine except it only executes each request in serial, after one another. While the function is running it also blocks the client(s) trying to connect. Non-blocking is what i want!

  • Asyncronous multiprocessed threading of both websocket and subprocess function.
  • The ability to set the number of cores to use. This is not obligatory though.

Here's what i've got:


ANSWER (illustration and asyncio.subprocess in accepted answer)

So, I didn't get very far with this frustration. I reverted back to my original code and as it turns out, you need to sleep the function with await asyncio.sleep(.001). Now it runs perfectly fine, I tested with multiple clients at the same time and it handles it asynchronously.

import asyncio, websockets, json
async def handler(websocket, path):
    print("New client connected.")
    await websocket.send('CONNECTED')
    try:
        while True:
            inbound = await websocket.recv()
            if inbound is None:
                break
            while inbound != None:
                import time
                for line in range(10):
                    time.sleep(1)
                    data = {}
                    data['blah'] = line
                    await asyncio.sleep(.000001) # THIS
                    print(data)
                    await websocket.send(json.dumps(data))
                await websocket.send(json.dumps({'progress': 'DONE'}))
                break
    except websockets.exceptions.ConnectionClosed:
        print("Client disconnected.")
if __name__ == "__main__":
    server = websockets.serve(handler, '0.0.0.0', 8080)
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server)
    loop.run_forever()

Update: as suggested by @udi, if you want a slow external process, the way to go is asyncio.subprocess and not subprocess. Reading from pipe with a blocking call stalls the other threads, which is what asyncio.subprocess takes care of.

2
  • I don't know enough about asyncio to be able to put together a working example, but I think you want to be using concurrent.futures.ThreadPoolExecutor or concurrent.futures.ProcessPoolExecutor to handle your blocking jobs. The asyncio event loop has a method run_in_executor, which you can pass a task and a executor, which is probably what you want to do in your ws coroutine instead of directly yielding from download. Commented Jan 23, 2017 at 5:42
  • I tried your suggestion but i didn't get it to work. As it turns out, i went back to my original code, did a little edit and now it works asynchronous. Although I cant figure out how to set the number of cores, i'm fine with this. Thanks anyway. Commented Jan 24, 2017 at 0:43

1 Answer 1

3

time.sleep() is blocking.

Try:

# blocking_server.py
import asyncio
import time

import websockets

x = 0


async def handler(websocket, path):
    global x
    x += 1
    client_id = x

    try:
        print("[#{}] Connected.".format(client_id))

        n = int(await websocket.recv())
        print("[#{}] Got: {}".format(client_id, n))
        for i in range(1, n + 1):
            print("[#{}] zzz...".format(client_id))
            time.sleep(1)
            print("[#{}] woke up!".format(client_id))
            await asyncio.sleep(.001)
            msg = "*" * i
            print("[#{}] sending: {}".format(client_id, msg))
            await websocket.send(msg)

        msg = "bye!"
        print("[#{}] sending: {}".format(client_id, msg))
        await websocket.send(msg)

        print("[#{}] Done.".format(client_id, msg))

    except websockets.exceptions.ConnectionClosed:
        print("[#{}] Disconnected!.".format(client_id))


if __name__ == "__main__":
    port = 8080
    server = websockets.serve(handler, '0.0.0.0', port)
    print("Started server on port {}".format(port))
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server)
    loop.run_forever()

With this test client:

# test_client.py
import asyncio
import time

import websockets


async def client(client_id, n):
    t0 = time.time()
    async with websockets.connect('ws://localhost:8080') as websocket:
        print("[#{}] > {}".format(client_id, n))
        await websocket.send(str(n))
        while True:
            resp = await websocket.recv()
            print("[#{}] < {}".format(client_id, resp))
            if resp == "bye!":
                break

    print("[#{}] Done in {:.2f} seconds".format(client_id, time.time() - t0))


tasks = [client(i + 1, 3) for i in range(4)]
loop = asyncio.get_event_loop()
loop.run_until_complete(asyncio.wait(tasks))
loop.close()

Now compare the result when time.sleep(x) is replaced with await asyncio.sleep(x)!

If you need to run a slow external process via asyncio, try asynico.subprocess:

An example external program:

# I am `slow_writer.py`
import sys
import time

n = int(sys.argv[1])

for i in range(1, n + 1):
    time.sleep(1)
    print("*" * i)

with this server:

# nonblocking_server.py

import asyncio
import sys

import websockets

x = 0


async def handler(websocket, path):
    global x
    x += 1
    client_id = x

    try:
        print("[#{}] Connected.".format(client_id))

        n = int(await websocket.recv())

        print("[#{}] Got: {}. Running subprocess..".format(client_id, n))

        cmd = (sys.executable, 'slow_writer.py', str(n))
        proc = await asyncio.create_subprocess_exec(
            *cmd, stdout=asyncio.subprocess.PIPE)

        async for data in proc.stdout:
            print("[#{}] got from subprocess, sending: {}".format(
                client_id, data))
            await websocket.send(data.decode().strip())

        return_value = await proc.wait()
        print("[#{}] Subprocess done.".format(client_id))

        msg = "bye!"
        print("[#{}] sending: {}".format(client_id, msg))
        await websocket.send(msg)

        print("[#{}] Done.".format(client_id, msg))

    except websockets.exceptions.ConnectionClosed:
        print("[#{}] Disconnected!.".format(client_id))


if __name__ == "__main__":

    if sys.platform == 'win32':
        loop = asyncio.ProactorEventLoop()
        asyncio.set_event_loop(loop)

    port = 8080
    server = websockets.serve(handler, '0.0.0.0', port)
    print("Started server on port {}".format(port))
    loop = asyncio.get_event_loop()
    loop.run_until_complete(server)
    loop.run_forever()
Sign up to request clarification or add additional context in comments.

4 Comments

Yes, this is a great example. I was not using sleep() from either time nor asyncio. asyncio.subprocess looks interesting, but I need universal_newline which asyncio.subprocess doesn't support. It seems to be no noticeable difference using subprocess instead. I have no explanation for this though. I guess asyncio.sleep() allows the waiting thread to pass GIL?
asyncio.sleep schedules the task to for future execution; Meanwhile other tasks can take control.
async for data in proc.stdout: text = data.decode() should convert your data to str instead of universal_newlines.
Oh, so by doing that other tasks utilize GIL? Yeah, I did data.decode() with no luck in parsing a progress bar in the output, but with a split('\r') in for loop it was no problem. Great, much better! I read up on asyncio.subprocess and it seems to be the way to go with asyncio. Thank you.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.