1

N-proxy-N Pub-Sub

Similar to the question N to N async pattern in ZeroMQ?, but which unfortunately never received an answer with working code.

I'm trying to implement Pub-Sub network as described in the guide: http://zguide.zeromq.org/py:all#The-Dynamic-Discovery-Problem (a small message broker in the style of N-proxy-N). Unfortunately, the guide doesn't provide any code examples.

I've tried to implement an Hello World example using PyZMQ, I think I'm close, but I'm facing some errors I don't know how to handle. Sorry for the use of asyncio (I'm more comfortable with this then threads).

Code

"""Example using zmq to create a PubSub node_topic similar to a ROS topic"""
# Copyright (c) Stef van der Struijk <[email protected]>.
# This example is in the public domain (CC-0)
# http://zguide.zeromq.org/py:all#The-Dynamic-Discovery-Problem

import asyncio
import zmq.asyncio
from zmq.asyncio import Context
import traceback
import logging

# N-proxy-M pattern: a subscriber which passes messages through a proxy through a publisher
class PubSubTopic:
    def __init__(self, address='127.0.0.1', port1='5566', port2='5567'):
        # get ZeroMQ version
        print("Current libzmq version is %s" % zmq.zmq_version())
        print("Current  pyzmq version is %s" % zmq.pyzmq_version())

        self.context = Context.instance()
        # 2 sockets, because we can only bind once to a socket (as opposed to connect)
        self.url1 = "tcp://{}:{}".format(address, port1)
        self.url2 = "tcp://{}:{}".format(address, port2)

        # start proxy, pubs and subs async; demonstration purpose only, probably better in separate threads
        asyncio.get_event_loop().run_until_complete(asyncio.wait([
            self.xpub_xsub_proxy(),  # commented out for different error
            self.pub_hello_world(),
            self.pub_hello_world(lang='jp'),
            self.sub_hello_world(),
            self.sub_hello_world(lang='jp'),
        ]))

    # N publishers to 1 sub; proxy 1 sub to 1 pub; publish to M subscribers
    async def xpub_xsub_proxy(self):
        # no traceback with zmq.asyncio and no try statement
        try:
            print("Init proxy")

            # Socket subscribing to publishers
            frontend_pubs = self.context.socket(zmq.XSUB)
            frontend_pubs.bind(self.url1)

            # Socket publishing to subscribers
            backend_subs = self.context.socket(zmq.XPUB)
            backend_subs.bind(self.url2)

            print("Try: Proxy... CONNECT!")
            zmq.proxy(frontend_pubs, backend_subs)
            print("CONNECT successful!")

        except Exception as e:
            print("Error with proxy :(")
            # print(e)
            logging.error(traceback.format_exc())
            print()

    # test case: 2 pubs to 1 topic
    async def pub_hello_world(self, lang='en'):
        # no traceback with zmq.asyncio and no try statement
        try:
            print("Init pub {}".format(lang))

            # connect, because many publishers - 1 subscriber
            pub = self.context.socket(zmq.PUB)
            pub.connect(self.url1)

            if lang == 'en':
                message = "Hello World"
                sleep = 1
            else:
                message = "Hello Sekai"  # Japanese
                sleep = 2

            # wait proxy and subs to b ready
            await asyncio.sleep(.5)

            # keep publishing "Hello World" / "Hello Sekai" messages
            print("Pub {}: Going to pub messages!".format(lang))
            while True:
                # publish message to topic 'world'
                # multipart: topic, message; async always needs `send_multipart()`?
                await pub.send_multipart([lang.encode('ascii'), message.encode('ascii')])
                print("Pub {}: Have send msg".format(lang))

                # slow down message publication
                await asyncio.sleep(sleep)

        except Exception as e:
            print("Error with pub {}".format(lang))
            # print(e)
            logging.error(traceback.format_exc())
            print()

    # test case: 2 subs to 1 topic
    async def sub_hello_world(self, lang='en'):
        # no traceback with zmq.asyncio and no try statement
        try:
            print("Init sub {}".format(lang))

            # connect, because many subscribers - 1 (proxy) pub
            sub = self.context.socket(zmq.SUB)
            sub.connect(self.url2)
            # subscribe to topic 'en' or 'jp'
            sub.setsockopt(zmq.SUBSCRIBE, lang.encode('ascii'))

            # wait proxy to be ready; necessary?
            await asyncio.sleep(.2)

            # keep listening to all published message, filtered on topic
            print("Sub {}: Going to wait for messages!".format(lang))
            while True:
                msg_received = await sub.recv_multipart()
                print("sub {}: {}".format(lang, msg_received))

        except Exception as e:
            print("Error with sub {}".format(lang))
            # print(e)
            logging.error(traceback.format_exc())
            print()


if __name__ == '__main__':
    PubSubTopic()

Errors

Proxy error

When I don't comment out the proxy function, I get the following traceback

python pub_sub_topic.py 
Current libzmq version is 4.2.2
Current  pyzmq version is 16.0.2
Init proxy
Try: Proxy... CONNECT!
^CTraceback (most recent call last):
  File "pub_sub_topic.py", line 139, in <module>
    PubSubTopic()
  File "pub_sub_topic.py", line 43, in __init__
    self.sub_hello_world(lang='jp'),
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 1426, in _run_once
    handle._run()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/events.py", line 127, in _run
    self._callback(*self._args)
  File "pub_sub_topic.py", line 62, in xpub_xsub_proxy
    zmq.proxy(frontend_pubs, backend_subs)
  File "zmq/backend/cython/_device.pyx", line 95, in zmq.backend.cython._device.proxy (zmq/backend/cython/_device.c:1824)
  File "zmq/backend/cython/checkrc.pxd", line 12, in zmq.backend.cython.checkrc._check_rc (zmq/backend/cython/_device.c:1991)
KeyboardInterrupt

Subscriber error

If I do comment out the proxy function (# self.xpub_xsub_proxy(),), I get the following traceback

python pub_sub_topic.py 
Current libzmq version is 4.2.2
Current  pyzmq version is 16.0.2
Init sub en
Init sub jp
Init pub en
Init pub jp
Sub en: Going to wait for messages!
Error with sub en
ERROR:root:Traceback (most recent call last):
  File "pub_sub_topic.py", line 128, in sub_hello_world
    msg_received = await sub.recv_multipart()
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 170, in recv_multipart
    dict(flags=flags, copy=copy, track=track)
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 321, in _add_recv_event
    self._add_io_state(self._READ)
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 294, in _add_io_state
    self.io_loop.add_reader(self, self._handle_recv)
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 337, in add_reader
    return self._add_reader(fd, callback, *args)
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 264, in _add_reader
    key = self._selector.get_key(fd)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 189, in get_key
    return mapping[fileobj]
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 70, in __getitem__
    fd = self._selector._fileobj_lookup(fileobj)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 224, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 39, in _fileobj_to_fd
    "{!r}".format(fileobj)) from None
ValueError: Invalid file object: <zmq.asyncio.Socket object at 0x7fa90a4a7528>


Exception ignored in: <bound method Socket.__del__ of <zmq.asyncio.Socket object at 0x7fa90a4a7528>>
Traceback (most recent call last):
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/sugar/socket.py", line 70, in __del__
    self.close()
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/eventloop/future.py", line 160, in close
    self._clear_io_state()
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 316, in _clear_io_state
    self._drop_io_state(self._state)
  File "/home/*user*/anaconda3/lib/python3.6/site-packages/zmq/asyncio/__init__.py", line 303, in _drop_io_state
    self.io_loop.remove_reader(self)
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 342, in remove_reader
    return self._remove_reader(fd)
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/selector_events.py", line 279, in _remove_reader
    key = self._selector.get_key(fd)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 189, in get_key
    return mapping[fileobj]
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 70, in __getitem__
    fd = self._selector._fileobj_lookup(fileobj)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 224, in _fileobj_lookup
    return _fileobj_to_fd(fileobj)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 39, in _fileobj_to_fd
    "{!r}".format(fileobj)) from None
ValueError: Invalid file object: <zmq.asyncio.Socket object at 0x7fa90a4a7528>
Sub jp: Going to wait for messages!

*snip* Same error as 'Sub en' *snip*

Pub en: Going to pub messages!
Pub en: Have send msg
Pub jp: Going to pub messages!
Pub jp: Have send msg
Pub en: Have send msg
Pub jp: Have send msg
Pub en: Have send msg
^CTraceback (most recent call last):
  File "pub_sub_topic.py", line 139, in <module>
    PubSubTopic()
  File "pub_sub_topic.py", line 43, in __init__
    self.sub_hello_world(lang='jp'),
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 454, in run_until_complete
    self.run_forever()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 421, in run_forever
    self._run_once()
  File "/home/*user*/anaconda3/lib/python3.6/asyncio/base_events.py", line 1390, in _run_once
    event_list = self._selector.select(timeout)
  File "/home/*user*/anaconda3/lib/python3.6/selectors.py", line 445, in select
    fd_event_list = self._epoll.poll(timeout, max_ev)
KeyboardInterrupt

System info

  • Ubuntu 16.04
  • Python 3.6 (through Anaconda)
  • libzmq version 4.2.2
  • pyzmq version 16.0.2

2 Answers 2

1

Absolutely you should not comment proxy function. The problem is because zmq.proxy function block forever and you ran it with "run_until_complete" event loop. You should change event loop execution type to run_forever.

Sign up to request clarification or add additional context in comments.

2 Comments

Thank you for your help. I thought zmq.proxy was a 1 time trigger, not a blocking call. Probably putting this proxy in a separate thread would make a single example script possible. However, I don't think switching run_until_complete with run_forever would change anything, because it will still be a blocking call, meaning asyncio doesn't have a chance to execute other code in an asynchronous fashion.
Yes you are right, I'm not familiar with multithreading in python so don't get my python suggestion serious ;)
0

As always, the answer is in simplicity. By separating it into 3 scripts, we don't have to work with threads and asynchronous programming, so this should help more people.

Open 6 terminals and run the following commands in a terminal each:

  1. python proxy_topic.py # proxy / ROS topic
  2. python proxy_pub.py # publish "Hello World"
  3. python proxy_pub.py jp # publish "Hello Sekai"
  4. python proxy_sub.py # receive all messages
  5. python proxy_sub.py en # receive "Hello World" only; not necessary
  6. python proxy_sub.py jp # receive "Hello Sekai" only; not necessary

proxy_topic.py

import sys
import zmq
from zmq import Context


class ProxyPub:
    def __init__(self, address='127.0.0.1', port1='5566', port2='5567'):
        # get ZeroMQ version
        print("Current libzmq version is %s" % zmq.zmq_version())
        print("Current  pyzmq version is %s" % zmq.pyzmq_version())

        self.context = Context.instance()
        # 2 sockets, because we can only bind once to a socket (as opposed to connect)
        self.url1 = "tcp://{}:{}".format(address, port1)
        self.url2 = "tcp://{}:{}".format(address, port2)

        self.xpub_xsub_proxy()

    # N publishers to 1 sub; proxy 1 sub to 1 pub; publish to M subscribers
    def xpub_xsub_proxy(self):
        print("Init proxy")

        # Socket subscribing to publishers
        frontend_pubs = self.context.socket(zmq.XSUB)
        frontend_pubs.bind(self.url1)

        # Socket publishing to subscribers
        backend_subs = self.context.socket(zmq.XPUB)
        backend_subs.bind(self.url2)

        print("Try: Proxy... CONNECT!")
        zmq.proxy(frontend_pubs, backend_subs)
        print("CONNECT successful!")


if __name__ == '__main__':
    print("Arguments given: {}".format(sys.argv))
    ProxyPub()

proxy_pub.py

import sys
import zmq
from zmq import Context
import time


class ProxyPub:
    def __init__(self, lang='en', address='127.0.0.1', port='5566'):
        # get ZeroMQ version
        print("Current libzmq version is %s" % zmq.zmq_version())
        print("Current  pyzmq version is %s" % zmq.pyzmq_version())

        self.context = Context.instance()
        self.url = "tcp://{}:{}".format(address, port)

        self.pub_hello_world(lang)

    def pub_hello_world(self, lang):
        print("Init pub {}".format(lang))

        # connect, because many publishers - 1 subscriber
        pub = self.context.socket(zmq.PUB)
        pub.connect(self.url)

        if lang == 'en':
            message = "Hello World"
            sleep = 1
        else:
            message = "Hello Sekai"  # Japanese
            sleep = 2

        # wait proxy and subs to b ready
        time.sleep(.5)

        # keep publishing "Hello World" / "Hello Sekai" messages
        print("Pub {}: Going to pub messages!".format(lang))
        while True:
            # publish message to topic 'world'
            # multipart: topic, message; async always needs `send_multipart()`?
            pub.send_multipart([lang.encode('ascii'), message.encode('ascii')])
            print("Pub {}: Have send msg".format(lang))

            # slow down message publication
            time.sleep(sleep)


if __name__ == '__main__':
    print("Arguments given: {}".format(sys.argv))
    if len(sys.argv) == 1:
        ProxyPub()
    elif len(sys.argv) == 2:
        ProxyPub(lang=sys.argv[1])
    else:
        print("Too many arguments")

proxy_sub.py

import sys
import zmq
from zmq import Context
import time


class ProxyPub:
    def __init__(self, lang='', address='127.0.0.1', port='5567'):
        # get ZeroMQ version
        print("Current libzmq version is %s" % zmq.zmq_version())
        print("Current  pyzmq version is %s" % zmq.pyzmq_version())

        self.context = Context.instance()
        self.url = "tcp://{}:{}".format(address, port)

        self.sub_hello_world(lang)

    def sub_hello_world(self, lang):
        print("Init sub {}".format(lang))

        # connect, because many subscribers - 1 (proxy) pub
        sub = self.context.socket(zmq.SUB)
        sub.connect(self.url)
        # subscribe to topic 'en' or 'jp'
        sub.setsockopt(zmq.SUBSCRIBE, lang.encode('ascii'))

        # wait proxy to be ready; necessary?
        time.sleep(.2)

        # keep listening to all published message, filtered on topic
        print("Sub {}: Going to wait for messages!".format(lang))
        while True:
            msg_received = sub.recv_multipart()
            print("sub {}: {}".format(lang, msg_received))


if __name__ == '__main__':
    print("Arguments given: {}".format(sys.argv))
    if len(sys.argv) == 1:
        ProxyPub()
    elif len(sys.argv) == 2:
        ProxyPub(lang=sys.argv[1])
    else:
        print("Too many arguments")

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.