0

I found a Python tutorial on queue and threading. Here is the code:

#!/usr/bin/python

import Queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q
    def run(self):
        print "Starting " + self.name
        process_data(self.name, self.q)
        print "Exiting " + self.name

def process_data(threadName, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print "%s processing %s" % (threadName, data)
            time.sleep(3)
            print "%s finished processing %s" % (threadName, data)
        else:
            queueLock.release()
        time.sleep(1)


threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1

# Create new threads
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1


# Fill the queue
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# Wait for queue to empty
while not workQueue.empty():
    pass

# Notify threads it's time to exit
exitFlag = 1

# Wait for all threads to complete
for t in threads:
    t.join()
print "Exiting Main Thread"

I am super new to Python and threading + queuing so pardon me.

I am intending to write a few thread classes (e.g: myThread1, myThread2, etc). In the main(), it will receive command line arguments and decides which thread class to create.

So I'm thinking of splitting myThread classes and main into a separate python file. And I also intend to move process_data method into myThread classes to perform a set of rules which will be different for each threadclasses. Think of it as encapsulation.

This is what I have tried:

mythread.py:

#!/usr/bin/python

import Queue
import threading
import time

exitFlag = 0

class myThread (threading.Thread):
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q
    def run(self):
        print "Starting " + self.name
        process_data(self.name, self.q)
        print "Exiting " + self.name

def process_data(threadName, q):
    while not exitFlag:
        queueLock.acquire()
        if not workQueue.empty():
            data = q.get()
            queueLock.release()
            print "%s processing %s" % (threadName, data)
            time.sleep(3)
            print "%s finished processing %s" % (threadName, data)
        else:
            queueLock.release()
        time.sleep(1)

main.py

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
queueLock = threading.Lock()
workQueue = Queue.Queue(10)
threads = []
threadID = 1

# Create new threads
for tName in threadList:
    thread = myThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1


# Fill the queue
queueLock.acquire()
for word in nameList:
    workQueue.put(word)
queueLock.release()

# Wait for queue to empty
while not workQueue.empty():
    pass

# Notify threads it's time to exit
exitFlag = 1

# Wait for all threads to complete
for t in threads:
    t.join()
print "Exiting Main Thread"

There are a few problems that i'm facing now:

  1. How can I pass exitFlag to myThread class? I tried setting it as a class variable but the while not exitFlag will never be true when I set exitFlag=1 in main
  2. How can I pass queueLock into the class? same thing here. Right now it is declared as global variable? and it also didn't work if I set it as a member variable for myThread.

3 Answers 3

1

An answer to #1 is don't use a global variable. Instead add a flag to yourmyThreadsubclass.

As for question #2, TheQueueclass was designed for multi-threaded programming, so its methods automatically handle all required locking details for you, thereby preventing simultaneous access problems. This means you don't really need thequeueLock.

Incorporating both of these suggestions into your answer would result in something like this (untested):

main.py

from mythread import MyThread
import Queue
import threading

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
workQueue = Queue.Queue(10)
threads = []
threadID = 1

# Create new threads
for tName in threadList:
    thread = MyThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1

# Fill the queue
for word in nameList:
    workQueue.put(word)

# Wait for queue to empty
while not workQueue.empty():
    pass

# Notify threads it's time to exit
for t in threads:
    t.stop()

# Wait for all threads to complete
for t in threads:
    t.join()
print "Exiting Main Thread"

mythread.py

import threading
import time

class MyThread (threading.Thread):  # note capitalization change
    def __init__(self, threadID, name, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.name = name
        self.q = q
        self.__exitFlag = False
        self.__signal_lock = threading.Lock()
    def run(self):
        print "Starting " + self.name
        self.process_data()
        print "Exiting " + self.name
    def stop(self):
        with self.__signal_lock:
            self.__exitFlag = True
    def process_data(self):
        while not self.__exitFlag:
            if not self.q.empty():
                data = self.q.get()
                print "%s processing %s" % (self.name, data)
                time.sleep(3)
                print "%s finished processing %s" % (self.name, data)
            time.sleep(1)
Sign up to request clarification or add additional context in comments.

4 Comments

hi tks for the solution. it works like a charm. one correction to your code though (in MyThread, replace workQueue with self.q).
Oops, missed one -- glad to have helped.
hello martineau, im back. i discover that its necessary to add queueLock. Without it, my program will not exit main thread if i have more threads than queue items.
It's unclear why you'd have more threads than queue items (not counting the main thread). Regardless, you can make a started thread object not prevent the main thread from exiting by setting its daemon property beforehand (its default initial value is inherited from the creating thread). In the code in my answer that could be accomplished by adding a self.daemon = True statement to the MyThread.__init__() constructor method.
1

import your thread class from main.py and execute anything you want from main.py don't execute mythread.py, then let threads check exitFlag and alter it from main.py .

Comments

0

This is the working solution;

mythread_class.py

import threading
import time

class MyThread (threading.Thread):
    def __init__(self, threadID, threadname, q):
        threading.Thread.__init__(self)
        self.threadID = threadID
        self.threadname = threadname
        self.queue = q
        self.__exitFlag = False
        self.__signal_lock = threading.Lock()

    def run(self):
        print "Starting " + self.threadname
        self.process_data()
        print "Exiting " + self.threadname

    def stop(self):
        with self.__signal_lock:
            self.__exitFlag = True

    def process_data(self):
        while not self.__exitFlag:
            if not self.queue.empty():
                data = self.queue.get()
                print "%s processing %s" % (self.threadname, data)
                time.sleep(3)
                print "%s finished processing %s" % (self.threadname, data)

            time.sleep(1)

main.py from mythread_class import MyThread import Queue import threading

threadList = ["Thread-1", "Thread-2", "Thread-3"]
nameList = ["One", "Two", "Three", "Four", "Five"]
workQueue = Queue.Queue(10)
threads = []
threadID = 1

# Create new threads
for tName in threadList:
    thread = MyThread(threadID, tName, workQueue)
    thread.start()
    threads.append(thread)
    threadID += 1

# Fill the queue
for word in nameList:
    workQueue.put(word)

# Wait for queue to empty
while not workQueue.empty():
    pass

# Notify threads it's time to exit
for t in threads:
    t.stop()

# Wait for all threads to complete
for t in threads:
    t.join()
print "Exiting Main Thread"

if you have multiple MyThread classes, simply replace this line to your other class:

thread = MyThread(threadID, tName, workQueue)

--OR --

thread = MyThread2(threadID, tName, workQueue)

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.