I am trying to adopt the ZeroMQ asynchronous client-server pattern described here with python multiprocessing. A brief description in the ZeroMQ guide
It's a DEALER/ROUTER for the client to server frontend communication and DEALER/DEALER for the server backend to the server workers communication. The server frontend and backend are connected using a zmq.proxy()-instance.
Instead of using threads, I want to use multiprocessing on the server. But requests from the client do not reach the server workers. However, they do reach the server frontend. And also the backend. But the backend is not able to connect to the server workers.
How do we generally debug these issues in pyzmq?
How to turn on verbose logging for the sockets?
The python code snippets I am using -
server.py
import zmq
import time
from multiprocessing import Process
def run(context, worker_id):
socket = context.socket(zmq.DEALER)
socket.connect("ipc://backend.ipc")
print(f"Worker {worker_id} started")
try:
while True:
ident, msg = socket.recv_multipart()
print("Worker received %s from %s" % (msg, "ident"))
time.sleep(5)
socket.send_multipart([ident, msg])
print("Worker sent %s from %s" % (msg, ident))
except:
socket.close()
if __name__ == "__main__":
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind("tcp://*:5570")
backend = context.socket(zmq.DEALER)
backend.bind("ipc://backend.ipc")
N_WORKERS = 7
jobs = []
try:
for worker_id in range(N_WORKERS):
job = Process(target=run, args=(context, worker_id,))
jobs.append(job)
job.start()
zmq.proxy(frontend, backend)
for job in jobs:
job.join()
except:
frontend.close()
backend.close()
context.term()
client.py
import re
import zmq
from uuid import uuid4
if __name__ == "__main__":
context = zmq.Context()
socket = context.socket(zmq.DEALER)
identity = uuid4()
socket.identity = identity.encode("ascii")
socket.connect("tcp://localhost:5570")
poll = zmq.Poller()
poll.register(socket, zmq.POLLIN)
request = {
"body": "Some request body.",
}
socket.send_string(json.dumps(request))
while True:
for i in range(5):
sockets = dict(poll.poll(10))
if socket in sockets:
msg = socket.recv()
print(msg)