1

I have a System.Timers.Timer which elapses every 3 seconds.
Once it elapses I want to take all the items in my collection and process them in one batch.

The motivation for that is to reduce the number of I/Os on the backend system.

The challenge is that I have multiple concurrent threads appending to the collection/queue. And because of this I thought about using a ConcurrentQueue<T> - but that's a bad choice.

This article on social msdn describes the problem here very good.

What I need is a collection/queue where I can get all data at once (ToArray()) and clearing the queue in one atomic operation so that I don't lose any data written to the collection/queue by other threads in the meantime.

 private static void T1_Elapsed(object sender, ElapsedEventArgs e)
 {
    string[] result = _queue.ToArray();
   _queue = new ConcurrentQueue<string>(); // strings will be lost :-)
 }

I tend to use a simple lock-based approach on a simple Queue<T>.

 private static readonly object _myLock = new object();

 private static void T1_Elapsed(object sender, ElapsedEventArgs e)
 {
     string[] result;
     lock (_myLock)
     {
         result = _queue.ToArray();
         _queue.Clear();
     }
 }

Now this piece of code has one obvious flaw which can be seen in the producer code:

private static void ProduceItems()
{
    //while (!_stop)
    for(int i=0; i<int.MaxValue; i++)
    {
        if (_stop) break;

        lock (_myLock) // bad. locks out other producers running on other threads.
        {
            Console.WriteLine("Enqueue " + i);
            _queue.Enqueue("string" + i);
        }

        Thread.Sleep(1000); // FOR DEBUGGING PURPOSES ONLY
    }
}

Of course this code will lock out any other producers trying to append to the queue. Is there any way I can only validate the lock in the producers if the "T1_Elapsed" lock has been set?

Is there anything more suitable to my problem? Maybe anything Observable? Or are there any good "batcher/aggregator" examples?

UPDATE 1: RX
Awesome what you can do with RX :)
I'm still looking into how I can handle errors, retries or re-enqueues in this scenario.

internal class Rx
{
    internal static void Start()
    {
        ISubject<int> subject = new Subject<int>();
        ISubject<int> syncedSubject = Subject.Synchronize(subject); // that should do it? - UNTESTED!

        var subscription = syncedSubject.Buffer(TimeSpan.FromSeconds(5), 10)
            .Subscribe((item) => ProcessBatch(item));

        for (int i=1; i<int.MaxValue; i++)
        {
            syncedSubject.OnNext(i);
            Thread.Sleep(200);
            Console.WriteLine($"Produced {i}.");
        }

        Console.ReadKey();
        subscription.Dispose();
    }

    private static void ProcessBatch(IList<int> list)
    {
        // Aggregate many into one
        string joined = string.Join(" ", list);

        // Process one
        Console.WriteLine($"Wrote {joined} to remote storage.");

        // how do you account for errors here?
        myProducer.ReEnqueueMyFailedItems(list); // ?
    }
}

1 Answer 1

1

TPL DataFlow

I'd say give the TPL DataFlow library a go. It is build upon the Task Paralled Library and designed for these kind of requirements where concurrency plays a big role. See http://blog.stephencleary.com/2012/09/introduction-to-dataflow-part-1.html for a series of blog posts about this library.

The BatchBlock seems like a good fit for your scenario. See https://msdn.microsoft.com/en-us/library/hh228602(v=vs.110).aspx for a tutorial.

Another example of using the BatchBlock: https://taskmatics.com/blog/simplifying-producer-consumer-processing-with-tpl-dataflow-structures/

Instead of posting data to a queue you will post to one of the available TPL Dataflow blocks.

another option could be using

Reactive Extensions

See http://www.introtorx.com/uat/content/v1.0.10621.0/01_WhyRx.html for a good introduction

It provides batching support as well:

void Sample()
{
    var dataprovider = new Subject<int>();

    var subscription = dataprovider
        .Buffer(TimeSpan.FromMinutes(3))
        .Subscribe(listOfNumbers => 
        {
            // do something with batch of items
            var batchSize = listOfNumbers.Count;
        });

    for(int i = 0; i <= 5; ++i)
    {
        dataprovider.OnNext(i);
    }

    subscription.Dispose();
}

In the above example, you need some modifications to enable multiple producers from different threads to add data, see reactive extension OnNext. It is simplified code(!) but it gives you a general idea of the concept of using RX.

Buffering can be done using a max buffer size, a given timeperiod or a combination of both. So it can replace your timer as well.

Instead of adding items to a queue you call OnNext on the Subject

Both TPL DataFlow and RX eliminate the use of a queue or something alike that needs to be cleared, so it will free you from that pain.

Sign up to request clarification or add additional context in comments.

2 Comments

I knew there was something out there ;-) I'm really impressed by the RX extensions. Your example really helped. Now I only need to figure out how any retry/fail mechanisms fit into this.
You could send them to another RX stream and handle them in that particular subscribe action. There are also build-in error handling strategies but it depends a little bit on what kind of action you need in case of failure.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.