1

I just learnt python multiprocessing. I want to make a model to simulate the process of sending and receiving messages in a networks. A directed graph describes the relation between two nodes and a dictionary describes the communication between two nodes. The data type of the value of this dictionary is queue. But I met some errors:

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager

PoolGroup=[('R1','R2','R3'),('N1','N2','N3'),('E1','E2','E3')]
PoolElement=['R1','R2','R3','N1','N2','N3','E1','E2','E3']
graph={'R1':['N1','N2','N3'],
   'R2':['N1','N2','N3'],
   'R3':['N1','N2','N3'],
   'N1':['E1','E2','E3'],
   'N2':['E1','E2','E3'],
   'N3':['E1','E2','E3'],
   'E1':[],
   'E2':[],
   'E3':[]}

def addSigal(target,information):
    AllQueue[target].put(information)
    print("Succeed in sending msg to "+target)
    print(target+' now has ',AllQueue[target].qsize(),' signals')


def pool1function(name,information):
    targetlist=list(graph[name])
    print(name+" send information to "+str(targetlist))            
    with ProcessPoolExecutor() as pool1:
        pool1.map(addSigal,targetlist,[information]*3)


if __name__=='__main__':
    m=Manager()
    AllQueue=m.dict()
    AllQueue.update({PE:m.Queue() for PE in PoolElement})
    with ProcessPoolExecutor() as pool:       
        pool.map(pool1function,PoolGroup[0],[1,2,3])

Unfortunately, the result just showed:

R1 send information to ['N1', 'N2', 'N3']
R2 send information to ['N1', 'N2', 'N3']
R3 send information to ['N1', 'N2', 'N3']

it means the information is not sent to the corresponding node. So I checked AllQueue and found something strange: when I print AllQueue['R1'], it showed:

RemoteError: 
---------------------------------------------------------------------------
Unserializable message: ('#RETURN', <queue.Queue object at 0x10edd8dd8>)
---------------------------------------------------------------------------

I also failed to put or get element from AllQueue['R1'], what's the problem?

3
  • I'm not sure that AllQueue is defined. Also please edit your question to remove redundant elements: are all the imports needed? Commented Apr 13, 2017 at 12:26
  • I tried to put the definition of AllQueue out of main, or make it global variable, the result was the same... Commented Apr 13, 2017 at 13:18
  • It seems that AllQueue goes out of scope when you use ProcessPoolExecutor or multiprocessing.Pool. You will need to pass AllQueue as a parameter through the map calls. Commented Apr 13, 2017 at 13:47

1 Answer 1

1

This is an example of passing the dictionary to a task:

from concurrent.futures import ProcessPoolExecutor
from multiprocessing import Manager


def addSigal(target, information, q):
    print(target,information,q)
    q[target]=(information)
    print("Succeed in sending msg to "+target)
    print(target+' now has ',q[target])


if __name__=='__main__':
    m = Manager()
    AllQueue = m.dict()
    AllQueue.update({'A':0,'B':1})
    with ProcessPoolExecutor() as pool:
        pool.map(addSigal,'AB', [1, 2],[AllQueue,AllQueue])
    print(AllQueue)
Sign up to request clarification or add additional context in comments.

2 Comments

Yes, you are right. This works for one-to-one, but how about all-to-one? the one receives a sequence of number rather than one number, the value of dictionary is just one number, I'm afraid it's not enough...
Well, I think its down to you to work on this. I've shown that the principal problem is that different processes have their own copies of variables and the work-around is to send anything global through as parameters. You just need to keep going with this.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.