0

OK, so I have an issue with creating a bunch of threads and them all using the same object. The idea is that I have a "queue" of items (AKA a list) and the items should get processed one-by-one until all the items have been processed. Currently this works just fine with one thread (when I change threadcount=1), but when I try to make it threadcount=2 and the threads are competing then it all goes to .... a bad place.

Here is a few quick classes I made to give a detailed example of what I'm trying to accomplish... I have a pretty good hunch that it will have something to do with using the "lock" keyword but I'm not 100% sure as to how that is used.

In your answer please give an example in code of a solution to make your answer clear. Thanks!

The Code:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;

namespace MyNamespace
{
    class Class1
    {
        static void Main()
        {
            Debug.WriteLine("starting application...");

            int threadcount = 2;
            List<Task> tasks = new List<Task>();

            List<Class2> myObjs = new List<Class2>();
            myObjs.Add(new Class2("list item 1"));
            myObjs.Add(new Class2("list item 2"));
            myObjs.Add(new Class2("list item 3"));
            myObjs.Add(new Class2("list item 4"));
            myObjs.Add(new Class2("list item 5"));
            myObjs.Add(new Class2("list item 6"));
            myObjs.Add(new Class2("list item 7"));
            myObjs.Add(new Class2("list item 8"));
            myObjs.Add(new Class2("list item 9"));

            Debug.WriteLine("about to create " + threadcount + " task(s)...");

            int t = 0;
            do
            {
                t++;
                Debug.WriteLine("creating task " + t);
                Class3 starter = new Class3();
                tasks.Add(starter.StartNewThread(myObjs));
            } while (t < threadcount);

            Task.WaitAll(tasks.ToArray());
            Debug.WriteLine("all tasks have completed");
        }
    }

    class Class2
    {
        private string m_status;
        public string status
        {
            get { return m_status; }
            set { m_status = value; }
        }

        private string m_text;
        public string text
        {
            get { return m_text; }
            set { m_text = value; }
        }

        private int m_threadid;
        public int threadid
        {
            get { return m_threadid; }
            set { m_threadid = value; }
        }

        public Class2()
        {
            m_status = "created";
            m_text = "";
            m_threadid = 0;
        }
        public Class2(string intext)
        {
            m_status = "created";
            m_text = intext;
            m_threadid = 0;
        }
    }

    class Class3
    {
        public Task StartNewThread(List<Class2> taskObjs)
        {
            Task<List<Class2>> task = Task.Factory
                .StartNew(() => threadTaskWorker(taskObjs),
                CancellationToken.None,
                TaskCreationOptions.None,
                TaskScheduler.Default)
                .ContinueWith(completed_task => threadTaskComplete(completed_task.Result));

            return task;
        }
        private List<Class2> threadTaskWorker(List<Class2> taskObjs)
        {
            Thread.CurrentThread.Name = "thread" + Thread.CurrentThread.ManagedThreadId;
            Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " created.");

            //Process all items in the list that need processing
            Class2 nextObj;
            do
            {
                //Look for next item in list that needs processing
                nextObj = null;
                foreach (Class2 taskObj in taskObjs)
                {
                    if (taskObj.status == "created")
                    {
                        nextObj = taskObj;
                        break;
                    }
                }

                if (nextObj != null)
                {
                    Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId +
                        " is handling " + nextObj.text);

                    nextObj.status = "processing";
                    nextObj.threadid = Thread.CurrentThread.ManagedThreadId;
                    nextObj.text += "(handled)";

                    Random rnd = new Random();
                    Thread.Sleep(rnd.Next(300, 3000));

                    nextObj.status = "completed";
                }
            } while (nextObj != null);

            Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " destroyed.");

            //Return the task object
            return taskObjs;
        }
        private List<Class2> threadTaskComplete(List<Class2> taskObjs)
        {
            Debug.WriteLine("a thread has finished, here are the current item's status...");

            foreach (Class2 taskObj in taskObjs)
            {
                Debug.WriteLine(taskObj.text +
                    " thread:" + taskObj.threadid +
                    " status:" + taskObj.status);
            }

            //Return the task object
            return taskObjs;
        }
    }
}

The Results:

/*
starting application...
about to create 2 task(s)...
creating task 1
creating task 2
thread #10 created.
thread #11 created.
thread #10 is handling list item 1
thread #11 is handling list item 1
thread #10 is handling list item 2
thread #11 is handling list item 2
thread #10 is handling list item 3
thread #11 is handling list item 4
thread #10 is handling list item 5
thread #11 is handling list item 5
thread #10 is handling list item 6
thread #11 is handling list item 6
thread #10 is handling list item 7
thread #11 is handling list item 8
thread #10 is handling list item 9
thread #11 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled)(handled) thread:11 status:completed
list item 3(handled) thread:10 status:completed
list item 4(handled) thread:11 status:completed
list item 5(handled)(handled) thread:11 status:completed
list item 6(handled)(handled) thread:11 status:completed
list item 7(handled) thread:10 status:completed
list item 8(handled) thread:11 status:completed
list item 9(handled) thread:10 status:processing
thread #10 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled)(handled) thread:11 status:completed
list item 3(handled) thread:10 status:completed
list item 4(handled) thread:11 status:completed
list item 5(handled)(handled) thread:11 status:completed
list item 6(handled)(handled) thread:11 status:completed
list item 7(handled) thread:10 status:completed
list item 8(handled) thread:11 status:completed
list item 9(handled) thread:10 status:completed
all tasks have completed
*/

The Expected Results:

/*
starting application...
about to create 2 task(s)...
creating task 1
creating task 2
thread #10 created.
thread #11 created.
thread #10 is handling list item 1
thread #11 is handling list item 2
thread #10 is handling list item 3
thread #11 is handling list item 4
thread #10 is handling list item 5
thread #10 is handling list item 6
thread #11 is handling list item 7
thread #10 is handling list item 8
thread #11 is handling list item 9
thread #10 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:10 status:completed
list item 2(handled) thread:11 status:completed
list item 3(handled) thread:10 status:completed
list item 4(handled) thread:11 status:completed
list item 5(handled) thread:10 status:completed
list item 6(handled) thread:10 status:completed
list item 7(handled) thread:11 status:completed
list item 8(handled) thread:10 status:completed
list item 9(handled) thread:11 status:processing
thread #11 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:10 status:completed
list item 2(handled) thread:11 status:completed
list item 3(handled) thread:10 status:completed
list item 4(handled) thread:11 status:completed
list item 5(handled) thread:10 status:completed
list item 6(handled) thread:10 status:completed
list item 7(handled) thread:11 status:completed
list item 8(handled) thread:10 status:completed
list item 9(handled) thread:11 status:completed
all tasks have completed
*/

4 Answers 4

2

If you don't want to use a ConcurrentQueue or if you're using other shared resources which are not thread-safe, use the option that you pointed out earlier in using the lock keyword.

From MSDN:

The lock keyword marks a statement block as a critical section by obtaining the mutual-exclusion lock for a given object, executing a statement, and then releasing the lock.

When a thread obtains the lock for a given object, other threads that encounter a lock(object) statement must wait for the lock to become available before continuing.

/// any resource shared between threads
private List<int> sharedResource = new List<int>();

/// best practice is to use a private object to synchronise threads
/// see: https://msdn.microsoft.com/en-us/library/c5kehkcz.aspx

private object resourceLock = new object();

void MethodAccessingSharedResource()
{
    /// Only one thread can acquire the lock on resourceLock at a time.

    lock (resourceLock)
    {
        /// The thread can safely access the shared resource here.
        /// Other threads will wait at lock(resourceLock) until 
        /// this thread gives up the lock.
    }

    /// The thread has released the lock on resourceLock.
    /// Another thread can now enter the lock(){} code block.
}
Sign up to request clarification or add additional context in comments.

3 Comments

I'm torn... Both methods (lock and ConcurrentQueue) are acceptable solutions I would think. Is there a reason to choose one over the other? I think since I have more going on (2 sets of threads doing different things based on the status) that I need to use lock instead. From what it sounds like ConcurrentQueue is good for processing an item one time. Thoughts?
You would only use a Queue for items that need processing one time. That is the nature of queues. Whether you use a queue or some other type of collection depends on your application. ConcurrentQueue is quite handy in that it handles synchronisation internally. Using lock is good when you need fine-grained control over access to shared resources, and of course it's not limited to Queues and can be used for any type of thread synchronisation work, which you indicated. There's also no reason you can't use both in parallel, but take the time to think about your threading.
Thanks! I think I'm sticking with locks. I seem to have it working +1 from me on both the response and the answer. ;)
1

First of all, thanks to @khargoosh and @interceptwind for their input! It was key in helping me understand locks and come up with this solution. This is what I came up with that ends up working solid! It has been tested and the results are ACTUAL results. In the answer I decided to use 4 threads to show the results.

The Code:

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Threading;
using System.Diagnostics;

namespace MyNamespace
{
    class Class1
    {
        static void Main()
        {
            Debug.WriteLine("starting application...");

            int threadcount = 4;
            List<Task> tasks = new List<Task>();

            List<Class2> myObjs = new List<Class2>();
            myObjs.Add(new Class2("list item 1"));
            myObjs.Add(new Class2("list item 2"));
            myObjs.Add(new Class2("list item 3"));
            myObjs.Add(new Class2("list item 4"));
            myObjs.Add(new Class2("list item 5"));
            myObjs.Add(new Class2("list item 6"));
            myObjs.Add(new Class2("list item 7"));
            myObjs.Add(new Class2("list item 8"));
            myObjs.Add(new Class2("list item 9"));

            Debug.WriteLine("about to create " + threadcount + " task(s)...");

            int t = 0;
            do
            {
                t++;
                Debug.WriteLine("creating task " + t);
                Class3 starter = new Class3();
                tasks.Add(starter.StartNewThread(myObjs));
            } while (t < threadcount);

            Task.WaitAll(tasks.ToArray());
            Debug.WriteLine("all tasks have completed");
        }
    }

    class Class2
    {
        private object m_locker = new object();
        public object locker
        {
            get { return m_locker; }
            set { m_locker = value; }
        }

        private string m_status;
        public string status
        {
            get { return m_status; }
            set { m_status = value; }
        }

        private string m_text;
        public string text
        {
            get { return m_text; }
            set { m_text = value; }
        }

        private int m_threadid;
        public int threadid
        {
            get { return m_threadid; }
            set { m_threadid = value; }
        }

        public Class2()
        {
            m_status = "created";
            m_text = "";
            m_threadid = 0;
        }
        public Class2(string intext)
        {
            m_status = "created";
            m_text = intext;
            m_threadid = 0;
        }
    }

    class Class3
    {
        public Task StartNewThread(List<Class2> taskObjs)
        {
            Task<List<Class2>> task = Task.Factory
                .StartNew(() => threadTaskWorker(taskObjs),
                CancellationToken.None,
                TaskCreationOptions.None,
                TaskScheduler.Default)
                .ContinueWith(completed_task => threadTaskComplete(completed_task.Result));

            return task;
        }
        private List<Class2> threadTaskWorker(List<Class2> taskObjs)
        {
            Thread.CurrentThread.Name = "thread" + Thread.CurrentThread.ManagedThreadId;
            Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " created.");

            //Process all items in the list that need processing
            Class2 nextObj;
            do
            {
                //Look for next item in list that needs processing
                nextObj = null;
                foreach (Class2 taskObj in taskObjs)
                {
                    nextObj = taskObj;

                    lock (nextObj.locker)
                    {
                        if (taskObj.status == "created")
                        {
                            nextObj.status = "processing";
                            break;
                        }
                        else nextObj = null;
                    }
                }

                if (nextObj != null)
                {
                    Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId +
                        " is handling " + nextObj.text);

                    nextObj.threadid = Thread.CurrentThread.ManagedThreadId;
                    nextObj.text += "(handled)";

                    Random rnd = new Random();
                    Thread.Sleep(rnd.Next(300, 3000));

                    nextObj.status = "completed";
                }
            } while (nextObj != null);

            Debug.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " destroyed.");

            //Return the task object
            return taskObjs;
        }
        private List<Class2> threadTaskComplete(List<Class2> taskObjs)
        {
            Debug.WriteLine("a thread has finished, here are the current item's status...");

            foreach (Class2 taskObj in taskObjs)
            {
                Debug.WriteLine(taskObj.text +
                    " thread:" + taskObj.threadid +
                    " status:" + taskObj.status);
            }

            //Return the task object
            return taskObjs;
        }
    }
}

The Results:

/*
starting application...
about to create 4 task(s)...
creating task 1
creating task 2
creating task 3
creating task 4
thread #11 created.
thread #13 created.
thread #12 created.
thread #12 is handling list item 3
thread #11 is handling list item 1
thread #13 is handling list item 2
thread #14 created.
thread #14 is handling list item 4
thread #12 is handling list item 5
thread #11 is handling list item 6
thread #13 is handling list item 7
thread #14 is handling list item 8
thread #12 is handling list item 9
thread #11 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled) thread:13 status:completed
list item 3(handled) thread:12 status:completed
list item 4(handled) thread:14 status:completed
list item 5(handled) thread:12 status:completed
list item 6(handled) thread:11 status:completed
list item 7(handled) thread:13 status:processing
list item 8(handled) thread:14 status:processing
list item 9(handled) thread:12 status:processing
thread #13 destroyed.
thread #14 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled) thread:13 status:completed
list item 3(handled) thread:12 status:completed
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 4(handled) thread:14 status:completed
list item 5(handled) thread:12 status:completed
list item 2(handled) thread:13 status:completed
list item 3(handled) thread:12 status:completed
list item 6(handled) thread:11 status:completed
list item 7(handled) thread:13 status:completed
list item 4(handled) thread:14 status:completed
list item 5(handled) thread:12 status:completed
list item 8(handled) thread:14 status:completed
list item 9(handled) thread:12 status:processing
list item 6(handled) thread:11 status:completed
list item 7(handled) thread:13 status:completed
list item 8(handled) thread:14 status:completed
list item 9(handled) thread:12 status:processing
thread #12 destroyed.
a thread has finished, here are the current item's status...
list item 1(handled) thread:11 status:completed
list item 2(handled) thread:13 status:completed
list item 3(handled) thread:12 status:completed
list item 4(handled) thread:14 status:completed
list item 5(handled) thread:12 status:completed
list item 6(handled) thread:11 status:completed
list item 7(handled) thread:13 status:completed
list item 8(handled) thread:14 status:completed
list item 9(handled) thread:12 status:completed
all tasks have completed
*/

Comments

0

If you actually want a first in, first out queue of items that you can access concurrently then use ConcurrentQueue. Using the TryDequeue() method to retrieve objects will ensure that each object is only accessed once.

Example:

var cq = new ConcurrentQueue<T>();
//populate queue
...
//process queue until empty -- this can be done in parallel
T item;
while(cq.trydequeue(out item)){
    //process item
}
//queue was empty when we tried to retrieve something.

2 Comments

I don't think this is a bad answer but I don't think it's what I'm looking for in the end. I have many groups of threads doing different tasks on the list and I think the ConcurrentQueue structure was built with a single task in mind handled by many threads. I on the other hand have many tasks in mind handled by many threads. Sorry I did not make this clear in the question, I was trying to keep it as simple as I could just to learn about locks I guess... Nevertheless, thanks for helping me learn about something new and useful! ;)
@ArvoBowen So you have a collection of items and you need to several tasks to each one? Is the nature of the tasks that they must be done sequentially in a specific order? Can one item have multiple tasks being performed on it simultaneously?
0

First of all, you need an additional object to be used as a locker in Class2

class Class2
{
    public object locker = new object();

    private string m_status;
    ...
}

Edit: Next, in the processing loop in your Class3, you need to first check whether your nextObj.status is "created". If it is, change it to "processing" and proceed to process it. If it isn't skip to the next object.

Note that we will lock the nextObj.status to prevent 2 threads from accessing it from the same time. (based on MoreOn's comment)

    private List<Class2> threadTaskWorker(List<Class2> taskObjs)
    {
        Thread.CurrentThread.Name = "thread" + Thread.CurrentThread.ManagedThreadId;
        Console.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " created.");

        //Process all items in the list that need processing
        foreach (Class2 nextObj in taskObjs)
        {
            Console.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId +
                " is handling " + nextObj.text);

            lock (nextObj.locker)
            {
                if (nextObj.status == "created")
                    nextObj.status = "processing";
                else
                    continue;
            }

            nextObj.status = "processing";
            nextObj.threadid = Thread.CurrentThread.ManagedThreadId;
            nextObj.text += "(handled)";
            Random rnd = new Random();
            Thread.Sleep(rnd.Next(300, 3000));
            nextObj.status = "completed";
        }

        Console.WriteLine("thread #" + Thread.CurrentThread.ManagedThreadId + " destroyed.");
        return taskObjs;
    }

7 Comments

I think that your locking is not at the right place. Two threads could both do the check that status is not completed, then both enter, then (one after the other) both process the same item. Additionally, I think that it should probably (as in the original) check that the item is still in the one specific state that you want to begin processing from.
@moreON I was just about to post something along the same lines! I loved this answer but when trying to implement it I ended up having issues. When the two threads first fire up they both see if (nextObj.status != "completed") = true and both threads continue on (causing the issue). I as well changed the code to use if (nextObj.status == "created") instead because for the final implementation I will need it to do that. I have many steps and THIS thread just needs to worry about a single state it's trying to process (aka "created").
Digging into it a little more I think I solved the issue. I'm going to make an edit on your answer. ;)
To all the users who rejected my edit due to the reasons given I'm floored. The intent of the edit was to make the answer actually work! Currently this solution does not work at all and the results posted are very untrue. For those that rejected with reason "This edit deviates from the original intent of the post" is ridiculous! The "original intent" was to lock the object and not use it until processing has finished. That was NOT accomplished with this post. With my edit it was! The poster had a great post and I wanted to accept it as an answer, that's why I didn't make my own answer!
@moreOn you are right, I have overlooked this part :)
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.