3

Python 3.1.2

I have a problem with variable sharing between two threads spawned by multiprocessing.Process. It is simple bool variable which should determine if thread should run or should it stop execution. Below is simplified code (but using the same mechanisms as my original code) shown in three cases:

  1. main class beeing of threading.Thread type and self.is_running bool type [Works fine].
  2. main class beeing of multiprocess.Process type and self.is_running bool type [Not working. Child threads have local copies of self.is_running instead of sharing it].
  3. main class beeing of multiprocess.Process type and self.is_running is of type multiprocessing.Value("b", True) [Works fine].

What I'd like is to understand WHY it's working this way and not the other. (i.e. why point 2. isn't working as I'm assuming it should).

Testing is done from python's interpreter:

from testclass import *

d = TestClass()
d.start()
d.stop()

Below is example from point 1:

import threading
import time
import queue
import multiprocessing

class TestClass(threading.Thread):
def __init__(self):
    threading.Thread.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = True
    self.sema = threading.Semaphore()

def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running
    self.sema.release()
    return z

def stop(self):
    self.sema.acquire()
    self.is_running = False
    print("STOPPING")
    self.sema.release()

def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)

def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

Example from point 2:

import threading
import time
import queue
import multiprocessing


class Test(multiprocessing.Process):
def __init__(self):
    multiprocessing.Process.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = True
    self.sema = threading.Semaphore()

def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running
    self.sema.release()
    return z

def stop(self):
    self.sema.acquire()
    self.is_running = False
    print("STOPPING")
    self.sema.release()

def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)

def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

Example from point 3:

import threading
import time
import queue
import multiprocessing

class TestClass(multiprocessing.Process):
def __init__(self):
    multiprocessing.Process.__init__(self)
    self.q = queue.Queue(10)
    self.is_running = multiprocessing.Value("b", True)
    self.sema = threading.Semaphore()

def isRunning(self):
    self.sema.acquire()
    print ("Am I running?", self.is_running)
    z = self.is_running.value
    self.sema.release()
    return z

def stop(self):
    self.sema.acquire()
    self.is_running.value = False
    print("STOPPING")
    self.sema.release()

def reader(self):
    while self.isRunning():
        print("R] Reading!")
        try:
            data = self.q.get(timeout=1)
        except:
            print("R] NO DATA!")
        else:
            print("R] Read: ", data)
def writer(self):
    while self.isRunning():
        print("W] Writing!")
        self.q.put(time.time())
        time.sleep(2)

def run(self):
    tr = threading.Thread(target=self.reader)
    tw = threading.Thread(target=self.writer)
    tr.start()
    tw.start()
    tr.join()
    tw.join()

2 Answers 2

1

Threads are all part of the same process, so they share memory. Another consequence is that threads cannot be executed exactly at the same time by different cpu's as a process can only be picked up by one cpu .

Processes have seperate memory space. One cpu can run one process while at the same time another runs the other process. Special constructions are needed to let processes cooperate.

Sign up to request clarification or add additional context in comments.

1 Comment

I know that. But here (point 2) threads are created in Process memory-space. At least it should be like this, that Process (TestClass) is a wrapper for threads created in that Process. I'm not creating new processes. But it just looks that threads spawned from multiprocessing.Process are not sharing their memoryspace. Why? It looks like if they (threads) are beeing created paralell to TestClass and not under it (if you know what I mean)
0

In point 2, both the parent process and the child process have their own copy of is_running. When you call stop() in the parent process, it only modifies is_running in the parent process and not in the child process. The reason multiprocessing.Value works is that its memory is shared between both processes.

If you want a process-aware queue, use multiprocessing.Queue.

2 Comments

Can You explain me why parent process and children processes have their own copy of is_running and not shared one ? And why isn't it so in case of parent process being of type threading ? If TestClass wouldn't be of type multiprocessing.Process or threading.Thread than it would share is_running with it's children processes, right? If so - why?
As Marco says, processes don't share memory but threads do. is_running occupies a different memory address in the parent process to the child, but occupies the same address when you use threads. Though Process and Thread look similar, they aren't completely interchangeable. If TestClass wasn't an instance of Process, its members still wouldn't be magically shared across processes. You must use one of the messaging or sharing primitives from multiprocessing, such as multiprocessing.Value to do that.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.