1

I'm using websockets in an python project i'm working on. The websocket is being run in an thread and given 2 queue's from the parent thread. I'm using javascript to connect to the websocket server.

I'm able to get messages from the parent thread throughself.ssi.get(True) and passed them through to the javascript websocket client.

But i'm unable to receive messages from the client. When i use zaproxy i can see the messages going through. On the websocket server i'm also able to see the packets arrive on the interface. Python throws no error and logger.setLevel(logging.DEBUG) does not show messages arriving the same way as i'm able to see messages being send.

I've been trying to solve this but i've run out of ideas to find the problem, any help is welcome.

Python websocket server:

import websockets
import logging
import asyncio
import ssl

class websocket:
    def __init__(self,ssi,sso):

        self.ssi = ssi
        self.sso = sso

        logger = logging.getLogger('websockets')
        logger.setLevel(logging.DEBUG)
        # logger.addHandler(logging.FileHandler('debug.log'))
        logger.addHandler(logging.StreamHandler())

        sslc = ssl.SSLContext(ssl.PROTOCOL_TLSv1_2)
        sslc.load_cert_chain(
            'keys/wss.server.crt',
            'keys/wss.server.key')

        loop = asyncio.new_event_loop()
        wsrv = websockets.serve(
            self.handler,
            host='0.0.0.0',
            port=9000,
            ssl=sslc,
            loop=loop)

        loop.run_until_complete(wsrv)
        loop.run_forever()

    async def handler(self, wss, path):
        consumer_task = asyncio.ensure_future(self.consumerHandler(wss, path))
        producer_task = asyncio.ensure_future(self.producerHandler(wss, path))
        done, pending = await asyncio.wait(
            [consumer_task, producer_task],
            return_when=asyncio.FIRST_COMPLETED,)
        for task in pending:
            task.cancel()

    async def producerHandler(self, wss, path):
        while True:
            msg = await self.producer()
            await wss.send(str(msg))

    async def consumerHandler(self, wss, path):
        async for msg in wss:
            await self.consumer(msg)

    async def producer(self):
        return self.ssi.get(True)

    async def consumer(self, msg):
        self.sso.put(msg.data)

Javascript client:

var ws;

function ws_init() {

    ws = new WebSocket("wss://pri.local:9000/");

    ws.onopen = function(e) {
        output("connected");
    };

    ws.onmessage = function(e) {
        output("i: " + e.data);
    };

    ws.onclose = function() {
        output("disconnect");
    };

    ws.onerror = function(e) {
        output("onerror");
        console.log(e)
    };

}

function onSubmit() {
    var input = document.getElementById("input");
    ws.send(input.value);
    output("o: " + input.value);
    input.value = "";
    input.focus();
}

function onCloseClick() {
    ws.close();
}

function output(str) {
    var log = document.getElementById("log");
    var escaped = str.replace(/&/, "&amp;").replace(/</, "&lt;").
        replace(/>/, "&gt;").replace(/"/, "&quot;"); // "
        log.innerHTML = escaped + "<br>" + log.innerHTML;
}
2
  • Maybe check your python version - the async for msg in wss: syntax is only available in 3.6 according to websockets.readthedocs.io/en/stable/intro.html#consumer Commented Feb 14, 2018 at 0:58
  • Yes you are correct and i made sure i was using the correct version, thanks. python --version Python 3.6.4 Commented Feb 14, 2018 at 1:09

1 Answer 1

1

I think the issue is that you're mixing the usage of the queue library and asyncio.queue.

queue is thread safe and so is a good mechanism for communicating between threads, but it doesn't have an async API, so you're blocking the websocket thread when you call self.ssi.get(True) which prevents any of the other websocket code from running.

asyncio.queue has the API you want (you can await queue.get()), but unfortunately isn't thread safe (it's designed for use within single threaded async applications).

You may be able to use loop.run_in_executor to await the blocking queue.get(True) call. See here for an example https://carlosmaniero.github.io/asyncio-handle-blocking-functions.html

Sign up to request clarification or add additional context in comments.

3 Comments

Thanks! This, now for me to, seems to be the reason for the code to lock up. I've tried to use loop.run_in_executor but gives me an error, i think the websockets library does not work in combination. I have found Janus github.com/aio-libs/janus i think this can potentially solve the issue.
@FUBARnl Janus sounds perfect - thanks for the followup
I've used Janus to solve to problem and it works very smooth. Thanks for the insight it really helped me along!

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.