2

I'd like to send Connection object via ØMQ. There are too sides: producer (who sends Connection) and receiver (who receives this Connection).

producer.py

import zmq
import time
from multiprocessing import Pipe
from multiprocessing.reduction import reduce_connection

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

time.sleep(1)

a, b = Pipe()
reduced_a = reduce_connection(a)
socket.send_pyobj(reduced_a)

print b.recv()

receiver.py

import zmq

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect ("tcp://localhost:5556")
socket.setsockopt(zmq.SUBSCRIBE, '')

obj = socket.recv_pyobj()
answer_conn = obj[0](obj[1][0],obj[1][1],obj[1][2])
# answer_conn = obj[0](*obj[1])

answer_conn.send('All is OK!')

And while connection rebuliding:

    answer_conn = obj[0](obj[1][0],obj[1][1],obj[1][2])
  File "/usr/lib/python2.7/multiprocessing/reduction.py", line 170, in rebuild_connection
    handle = rebuild_handle(reduced_handle)
  File "/usr/lib/python2.7/multiprocessing/reduction.py", line 155, in rebuild_handle
    conn = Client(address, authkey=current_process().authkey)
  File "/usr/lib/python2.7/multiprocessing/connection.py", line 175, in Client
    answer_challenge(c, authkey)
  File "/usr/lib/python2.7/multiprocessing/connection.py", line 420, in answer_challenge
    raise AuthenticationError('digest sent was rejected')
multiprocessing.AuthenticationError: digest sent was rejected

Does anyone know something that can help to pass a Connection object?

Thank you

1 Answer 1

3

You have to use connection owner's process (producer) authkey to rebuild the handle on the receiver. You may send producer's authkey along with the pickled connection over ØMQ.

producer.py

import zmq
import time
from multiprocessing import Pipe, current_process
from multiprocessing.reduction import reduce_connection

context = zmq.Context()
socket = context.socket(zmq.PUB)
socket.bind("tcp://*:5556")

time.sleep(1)

a, b = Pipe()
reduced_a = reduce_connection(a)
socket.send_pyobj(reduced_a)
socket.send(current_process().authkey)

print b.recv()

receiver.py

import zmq
from multiprocessing import reduction

def rebuild_handle_with_key(pickled_data, authkey=None):
    from multiprocessing import current_process
    from multiprocessing.util import sub_debug
    from multiprocessing.connection import Client
    from multiprocessing.reduction import recv_handle
    import os
    address, handle, inherited = pickled_data
    if inherited:
        return handle
    sub_debug('rebuilding handle %d', handle)
    conn = Client(address, authkey=authkey or current_process().authkey)
    conn.send((handle, os.getpid()))
    new_handle = recv_handle(conn)
    conn.close()
    return new_handle

def rebuild_connection_with_key(reduced_handle, readable, writable, authkey=None):
    import _multiprocessing
    handle = rebuild_handle_with_key(reduced_handle, authkey)
    return _multiprocessing.Connection(
        handle, readable=readable, writable=writable
        )

reduction.rebuild_connection = rebuild_connection_with_key

context = zmq.Context()
socket = context.socket(zmq.SUB)
socket.connect ("tcp://localhost:5556")
socket.setsockopt(zmq.SUBSCRIBE, '')

obj = socket.recv_pyobj()
authkey = socket.recv()
answer_conn = obj[0](obj[1][0],obj[1][1],obj[1][2], authkey)
# answer_conn = obj[0](*obj[1])

answer_conn.send('All is OK!')

I couldn't find a better way to make multiprocessing.reduction use a different authkey so here I have patched it.

Sign up to request clarification or add additional context in comments.

1 Comment

It works well when I rebuild connection only one time. But on second time it stops on: ` File "/usr/lib/python2.7/multiprocessing/reduction.py", line 83, in recv_handle return _multiprocessing.recvfd(conn.fileno()) KeyboardInterrupt `

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.