I have 3 programs written in Python, which need to be connected. 2 programs X and Y gather some information, which are sent by them to program Z. Program Z analyzes the data and send to program X and Y some decisions. Number of programs similar to X and Y will be expanded in the future. Initially I used named pipe to allow communication from X, Y to Z. But as you can see, I need bidirectional relation. My boss told me to use ZeroMQ. I have just found pattern for my use case, which is called Asynchronous Client/Server. Please see code from ZMQ book (http://zguide.zeromq.org/py:all) below.
The problem is my boss does not want to use any threads, forks etc. I moved client and server tasks to separate programs, but I am not sure what to do with ServerWorker class. Can this be somehow used without threads? Also, I am wondering, how to establish optimal workers amount.
import zmq
import sys
import threading
import time
from random import randint, random
__author__ = "Felipe Cruz <[email protected]>"
__license__ = "MIT/X11"
def tprint(msg):
"""like print, but won't get newlines confused with multiple threads"""
sys.stdout.write(msg + '\n')
sys.stdout.flush()
class ClientTask(threading.Thread):
"""ClientTask"""
def __init__(self, id):
self.id = id
threading.Thread.__init__ (self)
def run(self):
context = zmq.Context()
socket = context.socket(zmq.DEALER)
identity = u'worker-%d' % self.id
socket.identity = identity.encode('ascii')
socket.connect('tcp://localhost:5570')
print('Client %s started' % (identity))
poll = zmq.Poller()
poll.register(socket, zmq.POLLIN)
reqs = 0
while True:
reqs = reqs + 1
print('Req #%d sent..' % (reqs))
socket.send_string(u'request #%d' % (reqs))
for i in range(5):
sockets = dict(poll.poll(1000))
if socket in sockets:
msg = socket.recv()
tprint('Client %s received: %s' % (identity, msg))
socket.close()
context.term()
class ServerTask(threading.Thread):
"""ServerTask"""
def __init__(self):
threading.Thread.__init__ (self)
def run(self):
context = zmq.Context()
frontend = context.socket(zmq.ROUTER)
frontend.bind('tcp://*:5570')
backend = context.socket(zmq.DEALER)
backend.bind('inproc://backend')
workers = []
for i in range(5):
worker = ServerWorker(context)
worker.start()
workers.append(worker)
poll = zmq.Poller()
poll.register(frontend, zmq.POLLIN)
poll.register(backend, zmq.POLLIN)
while True:
sockets = dict(poll.poll())
if frontend in sockets:
ident, msg = frontend.recv_multipart()
tprint('Server received %s id %s' % (msg, ident))
backend.send_multipart([ident, msg])
if backend in sockets:
ident, msg = backend.recv_multipart()
tprint('Sending to frontend %s id %s' % (msg, ident))
frontend.send_multipart([ident, msg])
frontend.close()
backend.close()
context.term()
class ServerWorker(threading.Thread):
"""ServerWorker"""
def __init__(self, context):
threading.Thread.__init__ (self)
self.context = context
def run(self):
worker = self.context.socket(zmq.DEALER)
worker.connect('inproc://backend')
tprint('Worker started')
while True:
ident, msg = worker.recv_multipart()
tprint('Worker received %s from %s' % (msg, ident))
replies = randint(0,4)
for i in range(replies):
time.sleep(1. / (randint(1,10)))
worker.send_multipart([ident, msg])
worker.close()
def main():
"""main function"""
server = ServerTask()
server.start()
for i in range(3):
client = ClientTask(i)
client.start()
server.join()
if __name__ == "__main__":
main()