1

I am working with both the asyncio and the multiprocessing library to run two processes, each with one server instance listening on different ports for incoming messages.

To identify each client, I want to share a dict between the two processes to update the list of known clients. To achieve this, I decided to use a Tuple[StreamReader, StreamWriter] lookup key which is assigned a Client object for this connection.

However, as soon as I insert or simply access the shared dict, the program crashes with the following error message:

Task exception was never retrieved
future: <Task finished name='Task-5' coro=<GossipServer.handle_client() done, defined at /home/croemheld/Documents/network/server.py:119> exception=AttributeError("Can't pickle local object 'WeakSet.__init__.<locals>._remove'")>
Traceback (most recent call last):
  File "/home/croemheld/Documents/network/server.py", line 128, in handle_client
    if not await self.handle_message(reader, writer, buffer):
  File "/home/croemheld/Documents/network/server.py", line 160, in handle_message
    client = self.syncmanager.get_api_client((reader, writer))
  File "<string>", line 2, in get_api_client
  File "/usr/lib/python3.9/multiprocessing/managers.py", line 808, in _callmethod
    conn.send((self._id, methodname, args, kwds))
  File "/usr/lib/python3.9/multiprocessing/connection.py", line 211, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/usr/lib/python3.9/multiprocessing/reduction.py", line 51, in dumps
    cls(buf, protocol).dump(obj)
AttributeError: Can't pickle local object 'WeakSet.__init__.<locals>._remove'

Naturally I looked up the error message and found this question, but I don't really understand what the reason is here. As far as I understand, the reason for this crash is that StreamReader and StreamWriter cannot be pickled/serialized in order to be shared between processes. If that is in fact the reason, is there a way to pickle them, maybe by patching the reducer function to instead use a different pickler?

2 Answers 2

0

You might be interested in using SyncManager instead. just be sure to close the manager by calling shutdown at the end so no zombie process is left.

from multiprocessing.managers import SyncManager
from multiprocessing import Process
import signal

my_manager = SyncManager()

# to avoid closing the manager by ctrl+C. be sure to handle KeyboardInterrupt errors and close the manager accordingly
def manager_init():
    signal.signal(signal.SIGINT, signal.SIG_IGN)

my_manager.start(manager_init)

my_dict = my_manager.dict()
my_dict["clients"] = my_manager.list()
def my_process(my_id, the_dict):
    for i in range(3):
        the_dict["clients"].append(f"{my_id}_{i}")

processes = []
for j in range(4):
    processes.append(Process(target=my_process, args=(j,my_dict)))

for p in processes:
    p.start()

for p in processes:
    p.join()

print(my_dict["clients"])
# ['0_0', '2_0', '0_1', '3_0', '1_0', '0_2', '1_1', '2_1', '3_1', '1_2', '2_2', '3_2']

my_manager.shutdown()



Sign up to request clarification or add additional context in comments.

7 Comments

This does not help since in your code you are simply storing pickable values in the dict or list you created with SyncManager. I am identifying each connection with a (StreamReader, StreamWriter) tuple, which are unpickable objects. So my options are limited to either find a way to make them pickable (unlikely) or to use surrogates (pickable keys in the dict) that allow me to obtain the original streams from the asyncio.start_server() callback function.
I'm guessing the reason behind you using StreamReader, StreamWriter is to share the a dict. With SyncManager you can share the dict that you needed. I'm not sure how the StreamReader and StreamWriter are used but you can consider initialize choosing something else other than these two classes. maybe socket is better for the job since their objects are pickable .
The shared dict is not the problem, its just that the StreamReader and StreamWriter objects are not pickable (regardless of the pickler used). I was currently considering if this would work with sockets as well, and while I found out that sockets it selves are also not pickable, the file descriptor returned by socket.fileno() is (since its just an int). From the file descriptor it should be possible to recreate the original objects, or is the file descriptor unique for the process?
socketpair is pickable. the file descriptor will change after the pickling since it will be in a new process.
How so? socket.socketpair() just returns two sockets which are not pickable. It's not a different class.
|
0

I managed to find a workaround while also keeping the asyncio and multiprocessing libraries without any other libraries.

First, since the StreamReader and StreamWriter objects are not pickable, I am forced to use a socket. This is easily achievable with a simple function:

def get_socket(writer: StreamWriter):
    fileno = writer.get_extra_info('socket').fileno()
    return socket.fromfd(fileno, AddressFamily.AF_INET, socket.SOCK_STREAM)

The socket is inserted into the shared object (e.g. Manager().dict() or even a custom class, which you have to register via a custom BaseManager instance). Now, since the application is build on asyncio and makes use of the streams provided by the library, we can easily convert the socket back to a pair of StreamReader and StreamWriter via:

node_reader, node_writer = await asyncio.open_connection(sock=self.node_sock)
node_writer.write(mesg_text)
await node_writer.drain()

Where self.node_sock is the socket instance that was passed through the shared object.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.