Skip to main content

I've written a queue supporting one producer and multiple consumer threads. IdeaThe idea is that the queue instances a definable number of long running comsumerconsumer threads. Internally I'm using a BlockingCollection to solve the producer consumer problem. I've done some little testing via a console application, and it seems to work. Can somebody review the code and let me know if there is any flaw?

/// <summary>
/// A worker receives a collection to take elements from. After an element was succefullysuccessfully retrivedretrieved it will call <see cref="actionToBeCalled"/>. 
/// Stopping the worker can be done via <see cref="RequestStop"/>.
/// </summary>
/// <typeparam name="T"></typeparam>
public class Worker<T>
{
    public Worker(BlockingCollection<T> collection, string workerName, Action<T> actionToBeCalled, ILog logger)
    {
        if (collection == null) { throw new ArgumentNullException(nameof(collection));}
        if (workerName == null) { throw new ArgumentNullException(nameof(workerName));}
        if (actionToBeCalled == null) { throw new ArgumentNullException(nameof(actionToBeCalled));}
        if (logger == null) { throw new ArgumentNullException(nameof(logger));}

        this.collection = collection;
        this.workerName = workerName;
        this.actionToBeCalled = actionToBeCalled;
        this.cancelationTokenSource = new CancellationTokenSource();
        this.cancelationToken = this.cancelationTokenSource.Token;
        this.logger = logger;
    }

    public void DoWork()
    {
        while (!this.shouldStop)
        {
            try
            {
                var item = this.collection.Take(this.cancelationToken); // Take should block, until an element was added.
                this.actionToBeCalled?.Invoke(item);
            }
            catch (Exception exception)
            {
                this.logger.Warn($"[{this.workerName}]: Exception occurred: {exception}");
            }
        }

        this.logger.Debug($"[{this.workerName}]: Shutdown gracefully");
    }

    public void RequestStop()
    {
        this.logger.Debug($"[{this.workerName}]: {nameof(this.RequestStop)}");
        this.cancelationTokenSource.Cancel();
        this.shouldStop = true;
    }
    // Volatile is used as hint to the compiler that this data member will be accessed by multiple threads.
    private volatile bool shouldStop;

    private readonly BlockingCollection<T> collection;

    private readonly string workerName;

    private readonly Action<T> actionToBeCalled;

    private readonly CancellationToken cancelationToken;

    private readonly CancellationTokenSource cancelationTokenSource;

    private readonly ILog logger;
}

I've written a queue supporting one producer and multiple consumer threads. Idea is that the queue instances a definable number of long running comsumer threads. Internally I'm using a BlockingCollection to solve the producer consumer problem. I've done some little testing via a console application, and it seems to work. Can somebody review the code and let me know if there is any flaw?

/// <summary>
/// A worker receives a collection to take elements from. After an element was succefully retrived it will call <see cref="actionToBeCalled"/>. 
/// Stopping the worker can be done via <see cref="RequestStop"/>.
/// </summary>
/// <typeparam name="T"></typeparam>
public class Worker<T>
{
    public Worker(BlockingCollection<T> collection, string workerName, Action<T> actionToBeCalled, ILog logger)
    {
        if (collection == null) { throw new ArgumentNullException(nameof(collection));}
        if (workerName == null) { throw new ArgumentNullException(nameof(workerName));}
        if (actionToBeCalled == null) { throw new ArgumentNullException(nameof(actionToBeCalled));}
        if (logger == null) { throw new ArgumentNullException(nameof(logger));}

        this.collection = collection;
        this.workerName = workerName;
        this.actionToBeCalled = actionToBeCalled;
        this.cancelationTokenSource = new CancellationTokenSource();
        this.cancelationToken = this.cancelationTokenSource.Token;
        this.logger = logger;
    }

    public void DoWork()
    {
        while (!this.shouldStop)
        {
            try
            {
                var item = this.collection.Take(this.cancelationToken); // Take should block, until an element was added.
                this.actionToBeCalled?.Invoke(item);
            }
            catch (Exception exception)
            {
                this.logger.Warn($"[{this.workerName}]: Exception occurred: {exception}");
            }
        }

        this.logger.Debug($"[{this.workerName}]: Shutdown gracefully");
    }

    public void RequestStop()
    {
        this.logger.Debug($"[{this.workerName}]: {nameof(this.RequestStop)}");
        this.cancelationTokenSource.Cancel();
        this.shouldStop = true;
    }
    // Volatile is used as hint to the compiler that this data member will be accessed by multiple threads.
    private volatile bool shouldStop;

    private readonly BlockingCollection<T> collection;

    private readonly string workerName;

    private readonly Action<T> actionToBeCalled;

    private readonly CancellationToken cancelationToken;

    private readonly CancellationTokenSource cancelationTokenSource;

    private readonly ILog logger;
}

I've written a queue supporting one producer and multiple consumer threads. The idea is that the queue instances a definable number of long running consumer threads. Internally I'm using a BlockingCollection to solve the producer consumer problem. I've done some little testing via a console application, and it seems to work. Can somebody review the code and let me know if there is any flaw?

/// <summary>
/// A worker receives a collection to take elements from. After an element was successfully retrieved it will call <see cref="actionToBeCalled"/>. 
/// Stopping the worker can be done via <see cref="RequestStop"/>.
/// </summary>
/// <typeparam name="T"></typeparam>
public class Worker<T>
{
    public Worker(BlockingCollection<T> collection, string workerName, Action<T> actionToBeCalled, ILog logger)
    {
        if (collection == null) { throw new ArgumentNullException(nameof(collection));}
        if (workerName == null) { throw new ArgumentNullException(nameof(workerName));}
        if (actionToBeCalled == null) { throw new ArgumentNullException(nameof(actionToBeCalled));}
        if (logger == null) { throw new ArgumentNullException(nameof(logger));}

        this.collection = collection;
        this.workerName = workerName;
        this.actionToBeCalled = actionToBeCalled;
        this.cancelationTokenSource = new CancellationTokenSource();
        this.cancelationToken = this.cancelationTokenSource.Token;
        this.logger = logger;
    }

    public void DoWork()
    {
        while (!this.shouldStop)
        {
            try
            {
                var item = this.collection.Take(this.cancelationToken); // Take should block, until an element was added.
                this.actionToBeCalled?.Invoke(item);
            }
            catch (Exception exception)
            {
                this.logger.Warn($"[{this.workerName}]: Exception occurred: {exception}");
            }
        }

        this.logger.Debug($"[{this.workerName}]: Shutdown gracefully");
    }

    public void RequestStop()
    {
        this.logger.Debug($"[{this.workerName}]: {nameof(this.RequestStop)}");
        this.cancelationTokenSource.Cancel();
        this.shouldStop = true;
    }
    // Volatile is used as hint to the compiler that this data member will be accessed by multiple threads.
    private volatile bool shouldStop;

    private readonly BlockingCollection<T> collection;

    private readonly string workerName;

    private readonly Action<T> actionToBeCalled;

    private readonly CancellationToken cancelationToken;

    private readonly CancellationTokenSource cancelationTokenSource;

    private readonly ILog logger;
}
reformatted code for readability
Source Link
Abbas
  • 5.6k
  • 24
  • 40
static void Main(string[] args)
{
    var q = new QueueWithMultipleConsumerThreads<int>(
        numberOfWorkerThreads: 10,
        actionToBeCalled: i =>
                {
                    Console.WriteLine($"Consumed {i} from thread {Thread.CurrentThread.Name}, id: {Thread.CurrentThread.ManagedThreadId}");
                });

    // Add some entries to the q
    for (int i = 0; i < 10000; i++) { q.Enque(i); }
    
    Thread.Sleep(5000); // Give the q time to work
    q.Shutdown();
}
static void Main(string[] args)
{
    var q = new QueueWithMultipleConsumerThreads<int>(
            numberOfWorkerThreads: 10,
            actionToBeCalled: i =>
            {
                Console.WriteLine($"Consumed {i} from thread {Thread.CurrentThread.Name}, id: {Thread.CurrentThread.ManagedThreadId}");
            });

    // Add some entries to the q
    for (int i = 0; i < 10000; i++)
    {
        q.Enque(i);
    }
     
    Thread.Sleep(5000); // Give the q time to work
    q.Shutdown();
}
 public class  QueueWithMultipleConsumerThreads<T>
{
 
    private readonly ConcurrentBag<Thread> threads = new ConcurrentBag<Thread>();
    private readonly ConcurrentBag<Worker<T>> workers = new ConcurrentBag<Worker<T>>();
    private readonly BlockingCollection<T> queue = new BlockingCollection<T>();

    public QueueWithMultipleConsumerThreads(uint numberOfWorkerThreads, Action<T> actionToBeCalled  )
    {
        if (numberOfWorkerThreads == 0) { throw new ArgumentException($"{nameof(numberOfWorkerThreads)} must be > 0"); }
        if (actionToBeCalled == null) { throw new ArgumentNullException(nameof(actionToBeCalled));}

        for (var i = 0; i < numberOfWorkerThreads; i++)
        {
            // Create a worker and assign it to a thread
            var threadName = $"Worker thread {i}";
            var logger = LogManager.GetLogger(threadName);

            var w = new Worker<T>(this.queue, threadName, actionToBeCalled, logger);
            var t = new Thread(w.DoWork) { IsBackground = true, Name = threadName};

            this.workers.Add(w);
            this.threads.Add(t);
            t.Start();
        }
    }

    public void Enque(T item)
    {
        this.queue.Add(item);
    }

    public int Count()
    {
        return this.queue.Count;
    }
 

    public void Shutdown()
    {
        while (!this.workers.IsEmpty)
        {
            Worker<T> w;
            this.workers.TryTake(out w);
            w?.RequestStop();
        }

        while (!this.threads.IsEmpty)
        {
            Thread t;
            this.threads.TryTake(out t);
            t?.Join(1000);
        }
    }
}
 /// <summary>
/// A worker receives a collection to take elements from. After an element was succefully retrived it will call <see cref="actionToBeCalled"/>. 
/// Stopping the worker can be done via <see cref="RequestStop"/>.
/// </summary>
/// <typeparam name="T"></typeparam>
public class Worker<T>
{
    public Worker(BlockingCollection<T> collection, string workerName, Action<T> actionToBeCalled, ILog logger)
    {
        if (collection == null) { throw new ArgumentNullException(nameof(collection));}
        if (workerName == null) { throw new ArgumentNullException(nameof(workerName));}
        if (actionToBeCalled == null) { throw new ArgumentNullException(nameof(actionToBeCalled));}
        if (logger == null) { throw new ArgumentNullException(nameof(logger));}

        this.collection = collection;
        this.workerName = workerName;
        this.actionToBeCalled = actionToBeCalled;
        this.cancelationTokenSource = new CancellationTokenSource();
        this.cancelationToken = this.cancelationTokenSource.Token;
        this.logger = logger;
    }

    public void DoWork()
    {
        while (!this.shouldStop)
        {
            try
            {
                var item = this.collection.Take(this.cancelationToken); // Take should block, until an element was added.
                this.actionToBeCalled?.Invoke(item);
            }
            catch (Exception exception)
            {
                this.logger.Warn($"[{this.workerName}]: Exception occurred: {exception}");
            }
        }

        this.logger.Debug($"[{this.workerName}]: Shutdown gracefully");
    } 

    public void RequestStop()
    {
        this.logger.Debug($"[{this.workerName}]: {nameof(this.RequestStop)}");
        this.cancelationTokenSource.Cancel();
        this.shouldStop = true;
    }
    // Volatile is used as hint to the compiler that this data member will be accessed by multiple threads.
    private volatile bool shouldStop;

    private readonly BlockingCollection<T> collection;

    private readonly string workerName;

    private readonly Action<T> actionToBeCalled;

    private readonly CancellationToken cancelationToken;

    private readonly CancellationTokenSource cancelationTokenSource;

    private readonly ILog logger;
}
static void Main(string[] args)
{
    var q = new QueueWithMultipleConsumerThreads<int>(
        numberOfWorkerThreads: 10,
        actionToBeCalled: i =>
                {
                    Console.WriteLine($"Consumed {i} from thread {Thread.CurrentThread.Name}, id: {Thread.CurrentThread.ManagedThreadId}");
                });

    // Add some entries to the q
    for (int i = 0; i < 10000; i++) { q.Enque(i); }
    
    Thread.Sleep(5000); // Give the q time to work
    q.Shutdown();
}
 public class  QueueWithMultipleConsumerThreads<T>
{
 
    private readonly ConcurrentBag<Thread> threads = new ConcurrentBag<Thread>();
    private readonly ConcurrentBag<Worker<T>> workers = new ConcurrentBag<Worker<T>>();
    private readonly BlockingCollection<T> queue = new BlockingCollection<T>();

    public QueueWithMultipleConsumerThreads(uint numberOfWorkerThreads, Action<T> actionToBeCalled  )
    {
        if (numberOfWorkerThreads == 0) { throw new ArgumentException($"{nameof(numberOfWorkerThreads)} must be > 0"); }
        if (actionToBeCalled == null) { throw new ArgumentNullException(nameof(actionToBeCalled));}

        for (var i = 0; i < numberOfWorkerThreads; i++)
        {
            // Create a worker and assign it to a thread
            var threadName = $"Worker thread {i}";
            var logger = LogManager.GetLogger(threadName);

            var w = new Worker<T>(this.queue, threadName, actionToBeCalled, logger);
            var t = new Thread(w.DoWork) { IsBackground = true, Name = threadName};

            this.workers.Add(w);
            this.threads.Add(t);
            t.Start();
        }
    }

    public void Enque(T item)
    {
        this.queue.Add(item);
    }

    public int Count()
    {
        return this.queue.Count;
    }
 

    public void Shutdown()
    {
        while (!this.workers.IsEmpty)
        {
            Worker<T> w;
            this.workers.TryTake(out w);
            w?.RequestStop();
        }

        while (!this.threads.IsEmpty)
        {
            Thread t;
            this.threads.TryTake(out t);
            t?.Join(1000);
        }
    }
 /// <summary>
/// A worker receives a collection to take elements from. After an element was succefully retrived it will call <see cref="actionToBeCalled"/>. 
/// Stopping the worker can be done via <see cref="RequestStop"/>.
/// </summary>
/// <typeparam name="T"></typeparam>
public class Worker<T>
{
    public Worker(BlockingCollection<T> collection, string workerName, Action<T> actionToBeCalled, ILog logger)
    {
        if (collection == null) { throw new ArgumentNullException(nameof(collection));}
        if (workerName == null) { throw new ArgumentNullException(nameof(workerName));}
        if (actionToBeCalled == null) { throw new ArgumentNullException(nameof(actionToBeCalled));}
        if (logger == null) { throw new ArgumentNullException(nameof(logger));}

        this.collection = collection;
        this.workerName = workerName;
        this.actionToBeCalled = actionToBeCalled;
        this.cancelationTokenSource = new CancellationTokenSource();
        this.cancelationToken = this.cancelationTokenSource.Token;
        this.logger = logger;
    }

    public void DoWork()
    {
        while (!this.shouldStop)
        {
            try
            {
                var item = this.collection.Take(this.cancelationToken); // Take should block, until an element was added.
                this.actionToBeCalled?.Invoke(item);
            }
            catch (Exception exception)
            {
                this.logger.Warn($"[{this.workerName}]: Exception occurred: {exception}");
            }
        }

        this.logger.Debug($"[{this.workerName}]: Shutdown gracefully");
    }
    public void RequestStop()
    {
        this.logger.Debug($"[{this.workerName}]: {nameof(this.RequestStop)}");
        this.cancelationTokenSource.Cancel();
        this.shouldStop = true;
    }
    // Volatile is used as hint to the compiler that this data member will be accessed by multiple threads.
    private volatile bool shouldStop;

    private readonly BlockingCollection<T> collection;

    private readonly string workerName;

    private readonly Action<T> actionToBeCalled;

    private readonly CancellationToken cancelationToken;

    private readonly CancellationTokenSource cancelationTokenSource;

    private readonly ILog logger;
}
static void Main(string[] args)
{
    var q = new QueueWithMultipleConsumerThreads<int>(
            numberOfWorkerThreads: 10,
            actionToBeCalled: i =>
            {
                Console.WriteLine($"Consumed {i} from thread {Thread.CurrentThread.Name}, id: {Thread.CurrentThread.ManagedThreadId}");
            });

    // Add some entries to the q
    for (int i = 0; i < 10000; i++)
    {
        q.Enque(i);
    }
     
    Thread.Sleep(5000); // Give the q time to work
    q.Shutdown();
}
public class  QueueWithMultipleConsumerThreads<T>
{
    private readonly ConcurrentBag<Thread> threads = new ConcurrentBag<Thread>();
    private readonly ConcurrentBag<Worker<T>> workers = new ConcurrentBag<Worker<T>>();
    private readonly BlockingCollection<T> queue = new BlockingCollection<T>();

    public QueueWithMultipleConsumerThreads(uint numberOfWorkerThreads, Action<T> actionToBeCalled  )
    {
        if (numberOfWorkerThreads == 0) { throw new ArgumentException($"{nameof(numberOfWorkerThreads)} must be > 0"); }
        if (actionToBeCalled == null) { throw new ArgumentNullException(nameof(actionToBeCalled));}

        for (var i = 0; i < numberOfWorkerThreads; i++)
        {
            // Create a worker and assign it to a thread
            var threadName = $"Worker thread {i}";
            var logger = LogManager.GetLogger(threadName);

            var w = new Worker<T>(this.queue, threadName, actionToBeCalled, logger);
            var t = new Thread(w.DoWork) { IsBackground = true, Name = threadName};

            this.workers.Add(w);
            this.threads.Add(t);
            t.Start();
        }
    }

    public void Enque(T item)
    {
        this.queue.Add(item);
    }

    public int Count()
    {
        return this.queue.Count;
    }

    public void Shutdown()
    {
        while (!this.workers.IsEmpty)
        {
            Worker<T> w;
            this.workers.TryTake(out w);
            w?.RequestStop();
        }

        while (!this.threads.IsEmpty)
        {
            Thread t;
            this.threads.TryTake(out t);
            t?.Join(1000);
        }
    }
}
/// <summary>
/// A worker receives a collection to take elements from. After an element was succefully retrived it will call <see cref="actionToBeCalled"/>. 
/// Stopping the worker can be done via <see cref="RequestStop"/>.
/// </summary>
/// <typeparam name="T"></typeparam>
public class Worker<T>
{
    public Worker(BlockingCollection<T> collection, string workerName, Action<T> actionToBeCalled, ILog logger)
    {
        if (collection == null) { throw new ArgumentNullException(nameof(collection));}
        if (workerName == null) { throw new ArgumentNullException(nameof(workerName));}
        if (actionToBeCalled == null) { throw new ArgumentNullException(nameof(actionToBeCalled));}
        if (logger == null) { throw new ArgumentNullException(nameof(logger));}

        this.collection = collection;
        this.workerName = workerName;
        this.actionToBeCalled = actionToBeCalled;
        this.cancelationTokenSource = new CancellationTokenSource();
        this.cancelationToken = this.cancelationTokenSource.Token;
        this.logger = logger;
    }

    public void DoWork()
    {
        while (!this.shouldStop)
        {
            try
            {
                var item = this.collection.Take(this.cancelationToken); // Take should block, until an element was added.
                this.actionToBeCalled?.Invoke(item);
            }
            catch (Exception exception)
            {
                this.logger.Warn($"[{this.workerName}]: Exception occurred: {exception}");
            }
        }

        this.logger.Debug($"[{this.workerName}]: Shutdown gracefully");
    } 

    public void RequestStop()
    {
        this.logger.Debug($"[{this.workerName}]: {nameof(this.RequestStop)}");
        this.cancelationTokenSource.Cancel();
        this.shouldStop = true;
    }
    // Volatile is used as hint to the compiler that this data member will be accessed by multiple threads.
    private volatile bool shouldStop;

    private readonly BlockingCollection<T> collection;

    private readonly string workerName;

    private readonly Action<T> actionToBeCalled;

    private readonly CancellationToken cancelationToken;

    private readonly CancellationTokenSource cancelationTokenSource;

    private readonly ILog logger;
}
edited title
Link
200_success
  • 145.7k
  • 22
  • 191
  • 481

Queue with multiple cosumersconsumers and one producer

made example code more clearly an example, fixed formatting, added relevant tags
Source Link
Vogel612
  • 25.5k
  • 7
  • 59
  • 141
Loading
Source Link
Moerwald
  • 163
  • 1
  • 5
Loading