2

On a single process I have a tasks running on a thread that produces values and broadcasts them and several consumer async tasks that run concurrently in an asyncio loop.

I found this issue on PyZMQ's github asking async <-> sync communication with inproc sockets which is what I also wanted and the answer was to use .shadow(ctx.underlying) when creating the async ZMQ Context.

I prepared this example and seems to be working fine:

import signal
import asyncio
import zmq
import threading
import zmq.asyncio
import sys
import time
import json


def producer(ctrl):
    # delay first push to give asyncio loop time
    # to start
    time.sleep(1)

    ctx = ctrl["ctx"]

    s = ctx.socket(zmq.PUB)

    s.bind(ctrl["endpoint"])

    v = 0
    while ctrl["run"]:
        payload = {"value": v, "timestamp": time.time()}

        msg = json.dumps(payload).encode("utf-8")

        s.send(msg)
        v += 1
        time.sleep(5)

    print("Bye")


def main():
    endpoint = "inproc://testendpoint"
    ctx = zmq.Context()
    actx = zmq.asyncio.Context.shadow(ctx.underlying)

    ctrl = {"run": True, "ctx": ctx, "endpoint": endpoint, }

    th = threading.Thread(target=producer, args=(ctrl,))
    th.start()

    try:
        asyncio.run(amain(actx, endpoint))
    except KeyboardInterrupt:
        pass

    print("Stopping thread")
    ctrl["run"] = False
    th.join()


async def amain(ctx, endpoint):
    s = ctx.socket(zmq.SUB)
    s.subscribe("")
    s.connect(endpoint)

    loop = asyncio.get_running_loop()

    def stop():
        try:
            print("Closing zmq async socket")
            s.close()
        except:
            pass

        raise KeyboardInterrupt

    loop.add_signal_handler(signal.SIGINT, stop)

    while True:
        event = await s.poll(1000)
        if event & zmq.POLLIN:
            msg = await s.recv()
            payload = json.loads(msg.decode("utf-8"))

            print("%f: %d" % (payload["timestamp"], payload["value"]))


if __name__ == "__main__":
    sys.exit(main())

Is it safe to use inproc://* between a thread and asyncio task in this way? The 0MQ context is thread safe and I'm not sharing sockets between the thread and the asyncio task, so I would say in general that this is thread safe, right? Or am I missing something that I should consider?

1 Answer 1

1

Q :
Is it safe to use inproc://* between a thread and asyncio task in this way?""

A :
First and foremost, I might be awfully wrong (not only here), yet having worked with ZeroMQ since native API 2.1.1+ I dare claim that unless newer "improvements" got lost the core principles ( ZeroMQ ZMTP/RFC-documented properties for building legal implementation of the still valid ZMTP-arsenal ), the answer here shall be YES, as much as the newer releases of pyzmq-binding kept all mandatory properties of the inproc:-Transport-Class without a compromise.

Q :
" The 0MQ context is thread safe and I'm not sharing sockets between the thread and the asyncio task, so I would say in general that this is thread safe, right? "

A :
Here my troubles start - ZeroMQ implementations were since ever developed based on Martin SUSTRIK's & Pieter HINTJENS' Zen-of-Zero -- i.e. also as Zero-sharing -- so never sharing was the principle ( though "share"-zmq.Context-instances were no problem to be used from different threads, to the contrary of the zmq.Socket-instances )

Python (since ever & still valid in 2022-Q1) used to use & still uses a total [CONCURRENT]-code-execution avoider -- prevented by GIL-lock, which principally avoids any & all kinds of problems, arising from [CONCURRENT]-code-execution to never happen insider Python GIL-lock re-[SERIAL]-ised flow of code-execution, so even if the asyncio-part is built as a pythonic (non-destructive) part of the ecosystem, your code shall never "meet" any kind of concurrency-related issue, as the unless it gains GIL-lock, it does nothing but "hanging in NOP-s cracking" ( nuts-cracking in idle loop ).

Being inside the same process, there seems no advantage to spawn another Context-instance at all ( this used to be the rock-solid certainty since ever, not to ever increase any kind of overheads - Zen-of-Zero ( almost )Zero-overhead ... ). The Sig/Msg core engine was, if performance or latency needs required, powered with more zmq.Context( IOthreads ) upon instantiations, yet these were zmq.Context-owned, not Python-GIL-governed/(b)locked threads, so the performance was pretty well scalable, without wasting any RAM/HWM/buffers/...-resources, without growing any overheads and very efficient, as the IO-threads were co-located for only indeed I/O-work, so not needed for inproc:-( protocol-less )-Transport-Class at all )

Q :
" Or am I missing something that I should consider? "

A :
Mixing asyncio, O/S-signals ( that are well documented how they interact with native ZeroMQ API ) and other layers of complexity is for sure possible, yet it comes at a cost - it makes the use-case less and less readable and more and more prone to conceptual-gaps and similar hard to decode "errors".

I remember using Tkinter-mainloop() as a cost-wise very cheap and a super-stable framework for rapid-prototyping an MVC-{ M-odel, V-isual, C-ontroller }-parts of many-actors' indeed applications in Python. There were Zerop-problems to use ZeroMQ with a single Context-instance, passing the references of the respective AccessNodes' into whatever amount of event-handlers, supposing we kept the ZeroMQ Zen-of-Zero, i.e. no to "share" (meaning no two parts "use" (compete to use) one and the same AccessPoint "one-over-another")

This all was designed-in, at "Zero-cost", by the ZeroMQ by-definition, so unless spoilt in some later phase, re-wrapping a re-wrapped native API, all this ought still work in 2022-Q1, ought it not?

Sign up to request clarification or add additional context in comments.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.