2

I think my question is very easy and stupid, but I read a lot of materials and can't imagine how to do what I want.

So, I use websockets library, and I have this algorythm:

# 1. get connection and start handle it
async def main_request_handler(ws, path):
    proxy = Proxy()
    try:
        await proxy.start(ws, path)

2. inside start I create second websocket to pass requests from ws and receive answers to send them to ws

while True:
    request_raw = await self.ws_server.recv()
    await self.process_request_from_server(request_raw)

The problem is, I need to use one websocket server connection for multiply ws clients, and I need to pass to everyone the same answer from ws_server. Now I get only one response, due to .recv() returns value only for one of the 'subscribers'. How to fix this? Please note, that I use while True and async

3
  • Can you post a minimal example? So I can try your code? Commented Jan 23, 2018 at 4:47
  • It will be a quite big example Commented Jan 23, 2018 at 4:50
  • You will probably have to abstract the problem to the specific part you are struggling with. If it's extremely domain specific, others won't be able to comprehend the intention of the code. It can be a challenge... Commented Jan 23, 2018 at 4:52

4 Answers 4

7

Here is a very simplistic example for a pub/sub websockets server

import asyncio
import websockets

connections = set()
n = 0


async def handler(websocket, path):
    global n

    if path == "/sub":
        n = n + 1
        i = n
        connections.add(websocket)
        print("adding subscriber #", i)
        try:
            async for msg in websocket:
                pass  # ignore
        except websockets.ConnectionClosed:
            pass
        finally:
            print("removing subscriber #", i)
            connections.remove(websocket)

    elif path == "/pub":
        async for msg in websocket:
            print("<", msg)
            for ws in connections:
                asyncio.ensure_future(ws.send(msg))


start_server = websockets.serve(handler, 'localhost', 8765)

asyncio.get_event_loop().run_until_complete(start_server)
asyncio.get_event_loop().run_forever()

And an example subscriber client (run a few of those):

import asyncio
import websockets


async def listen():
    async with websockets.connect('ws://localhost:8765/sub') as websocket:
        while True:
            greeting = await websocket.recv()
            print("< {}".format(greeting))


asyncio.get_event_loop().run_until_complete(listen())

and a publisher:

import asyncio
import websockets


async def say():
    async with websockets.connect('ws://localhost:8765/pub') as websocket:
        while True:
            msg = input("Enter message:")
            if not msg:
                break
            await websocket.send(msg)


asyncio.get_event_loop().run_until_complete(say())
Sign up to request clarification or add additional context in comments.

Comments

1

In other words, I need to run .recv in the same loop and thread with multiple consumers. In RxPy I could just stream.emit(recv_result) and consume items like thatstrem.subscribe(callback_fn), but this is callback way, I need async

Your subscribe method can accept coroutine functions, i.e. functions created with async def. Once something is emitted, they can be instantiated and their coroutines spawned using create_task:

def __init__(self, ...):
    self._subscribers = []

def subsribe(self, corofn):
    self._subscribers.append(corofn)

def emit(self, obj):
    loop = asyncio.get_event_loop()
    for corofn in self._subscribers:
        coro = corofn(obj)
        loop.create_task(coro)

async def main(self):
    while True:
        request_raw = await self.ws_server.recv()
        self.emit(request_raw)

Comments

0

I'm not sure I understood correctly, but isn't gathering multiple coroutines is what you want?

while True:
    request_raw = await self.ws_server.recv()

    # process by multiple clients parallely:
    await asyncio.gather(
        self.process_by_client_1(request_raw),
        self.process_by_client_2(request_raw),
        self.process_by_client_3(request_raw),
    )

2 Comments

Yes, I want something like that, but I need to add process_by_client_ dynamically. (like add callback, but I need to run it with await keyword.
In other words, I need to run .recv in the same loop and thread with multiple consumers. In RxPy I could just stream.emit(recv_result) and consume items like thatstrem.subscribe(callback_fn), but this is callback way, I need async
0

Thanks for suggestions, they might work. I made it through queues.

class SWebsocket(object):

    def __init__(self, websocket: WebSocketServerProtocol):
        self.ws = websocket
        self.queues = {}
        self.subscribe()

    def subscribe(self):
        # fire and forget function
        asyncio.ensure_future(self.recv_mess())

    async def recv_mess(self):
        while True:
            try:
                data = await self.ws.recv()
            except websockets.ConnectionClosed as e:
                for _, q in self.queues.items():
                    await q.put(e)
                return
            for _, q in self.queues.items():
                await q.put(data)

    async def recv(self, id):
        # read value from queue
        if id not in self.queues:
            self.queues[id] = asyncio.Queue()
        data = await self.queues[id].get()
        if isinstance(data, websockets.ConnectionClosed):
            raise data
        return data

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.