1

I have this case where I am trying to build a pipeline using TPL Dataflow that could process a big number of items. At some point in the pipeline I need to batch items and send to the next block after they are processed.

I cannot find a proper way to send the processed items to the next block. I need to consider that the current block might still be processing other items and will need to send more items to the next block.

I received some suggestions to consider using TransformBlock or TransformManyBlock, but it would only work if my action returned a single array.

The problem is mostly with an action that is in the middle of the pipeline, and has a for loop to iterate through the items.

      private async Task ProcessAsync(int id) {
            var items = await ReadOperationItemsAsync(id);

            // items could contain a huge number of items
            // therefore, I process in batches
            for (var skip = 0; skip < items.Count; skip += 10) {
                var batch = items.Skip(skip).Take(items.Count);

                var results = BulkProcessitems(batch);

                // send these results to the next block in each iteration
            }
        }

Here is an example of the pipeline I am trying to build just to give more context.

namespace Test {
    public class PipelineTest {
        public async Task PipelineTestTask() {
            var batchSize = 10;

            var buffer = new BufferBlock<int>(
                new DataflowBlockOptions() {
                    BoundedCapacity = batchSize
                });

            var processIdBlock = new ActionBlock<int>(
                id => ProcessAsync(id),
                new ExecutionDataflowBlockOptions {
                    BoundedCapacity = batchSize,
                });

            var sendEmailsBlock = new ActionBlock<IEnumerable<Item>>(
                items => SendEmailsAsync(items),
                new ExecutionDataflowBlockOptions {
                    BoundedCapacity = batchSize,
                });

            var getIdsTask = GetIdsToProcessAsync(buffer);

            buffer.LinkTo(processIdBlock, new DataflowLinkOptions { PropagateCompletion = true });

            await processIdBlock.Completion;
        }

        private async Task GetIdsToProcessAsync(ITargetBlock<int> target) {
            var ids = new List<int>() {  1, 2, 3 };
            
            foreach (var id in ids) {
                await target.SendAsync(id);
            }

            target.Complete();
        }

        private async Task ProcessAsync(int id) {
            var items = await ReadOperationItemsAsync(id);

            for (var skip = 0; skip < items.Count; skip += 10) {
                var batch = items.Skip(skip).Take(items.Count);

                var results = BulkProcessitems(batch);
            }
        }

        private void SendEmailsAsync(IEnumerable<Item> items) {
            // Do something to send emails
        }

        // pretend this does some heavy operation
        private IEnumerable<Item> BulkProcessitems(IEnumerable<int> itemIds) {            
            return itemIds.Select(i => new Item { Id = i, Name = "Exampe" });;
        }

        public class Item {
            public int Id { get; set; }
            public string? Name { get; set; }
        }

        private static Task<List<int>> ReadOperationItemsAsync(int operationId) {
            // Placeholder method to simulate reading items for an operation from the database
            var items = operationId switch {
                1 => [1],
                2 => [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200],
                3 => [201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363, 364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 400],
                _ => new List<int>()
            };
            return Task.FromResult(items);
        }
    }
}

The issue is mostly with this action which is in the middle of the pipeline, and

My biggest concern here is memory allocation. If I wait for processIdBlockto process all items and return a big list to sendEmailsBlock, then it might consume too much memory.

Any suggestion on how to approach this?

7
  • ActionBlocks are terminal blocks. You probably want a TransformBlock. And as far as I can see, that would be a TransformBlock<int, Item> ? Commented Jul 11, 2024 at 8:10
  • ^^ TransformBlock<TInput,TOutput> Class Commented Jul 11, 2024 at 8:18
  • What's the purpose of the buffer and the emailBuffer? All dataflow blocks are equipped with an input buffer. There is no reason to add another one explicitly in the pipeline, unless you are doing something advanced. Is your scenario advanced? Commented Jul 11, 2024 at 8:50
  • Are trying to do something like this: dotnetfiddle.net/3sbJ9H ? (!! "sunshine implementation" - error handling omitted for brevity) Commented Jul 11, 2024 at 9:07
  • 1
    @TheodorZoulias my apologies. My intention wasn't to bring a solution that doesn't work and ask someone to fix it. I just thought the example would help to illustrate the problem. I'll try to rephrase my question. Commented Jul 12, 2024 at 1:33

2 Answers 2

2

TPL Dataflow and the entire dataflow or pipeline architecture isn't broken or flawed. Like all tools, it needs to be used properly. Using a wrench to drive nails is possible but not very effective.

Dataflow blocks are steps in a pipeline, meant to process a potentially infinite number of messages. They work in ways similar to a Powershell or Bash pipeline, where the output of one process is used as the input of another. The similarities are a lot closer than one would imagine. Powershell cmdlets have separate Output and Error streams and even processes have stdout and stderr.

Messages produced by one block are automatically sent to linked blocks. All blocks except the NullTarget block have an input or output buffer, or both.

To produce one output message for every input, use TransformBlock. To produce multiple output messages for each input, use TransformManyBlock. Using these, the question's code can be simplified to this :

var backPressureOptions=new DataflowBlockOptions { BoundedCapacity=1 };

var processorBlock=new TransformManyBlock<int,Item>(ReadOperationItemsAsync,backPressureOptions);
var emailBlock=new ActionBlock<Item>(SendEmailAsync,backPressureOptions);

var linkOptions=new DataflowLinkOptions { PropagateCompletion = true };

processorBlock.LinkTo(emailBlock,linkOptions);

foreach (var id in ids) 
{
    await processorBlock.SendAsync(id);
}

processorBlock.Complete();
await emailBlock.Completion;

The worker methods only have to receive a single input and produce either a single output (for TransformBlock) or an IEnumerable<T> for multiple outputs:

private async Task<List<Item>> ReadOperationItemsAsync(int id) 
{
...
}
private async Task SendEmailAsync(Item item)
{
...
}

Messages can be batched using the BatchBlock for bulk processing :

var processorBlock=new TransformManyBlock<int,Item>(...);
var batchBlock=new BatchBlock<Item>(100);
var emailBlock=new ActionBlock<Item[]>(SendManyEmailsAsync,backPressureOptions);

processorBlock.LinkTo(batchBlock,linkOptions);
batchBlock.LinkTo(emailBlock,linkOptions);

...
private async Task SendManyEmailAsync(Item[] item)
{
...
}

Error Handling

Error handling in a pipeline is different, whether it's a production pipeline, a Powershell or a Dataflow pipeline.

One way to handle errors in both functional and dataflow pipelines is to pack them with the message and pass them down the pipeline. In functional programming, this is called Railway Oriented programming, as good messages pass along one "line", while failures are passed down another. LinkTo(...,Predicate<TOutput>) even allows us to redirect failure messages to eg a logger block or a NullTarget block.

For this to work, each worker method has to catch all exceptions and return outputs wrapped eg in a record Result<T>(T? item,string? Error=null) wrapper. Successful messages have an empty Error. Failures have an error message. That's a very primitive Result implementation, but it's enough to demonstrate error handling

private async Task<Result<List<Item>>> ReadOperationItemsAsync(int id) 
{
    try
    {
        var items=await ..;
        return new Result(items,default);
    }
    catch(Exception exc)
    {
        // log or trace the error then
        return (default,some_error_message);
    }
}
...
}
private async Task SendEmailAsync(Result<Item> msg)
{
    if(msg is (Item item,null))
    { 
        try
        {

            ...
        }
        catch(Exception exc)
        {
            // log or trace the error then
            return (default,some_error_message);
        } 
    }
}

SendEmailAsync discards failed messages. A method that had to return an output could return a new failed message instead :

private Result<OtherItem> ProcessItem(Result<Item> msg)
{
    if(msg is (Item item,null))
    { 
    ...
    }
    else
    {
        return new Result<OtherItem>(default,msg.Error);
    }
}

Discarding failed messages

LinkTo can be used to discard failed messages too :

var nullBlock=DataflowBlock.NullTarget<Result<Item>>();

processorBlock.LinkTo(batchBlock,linkOptions,msg=>msg.Error = null);
processorBlock.LinkTo(nullBlock,linkOptions,msg=>msg.Error != null);

Linking to a null or logging block is essential, otherwise the failed messages will remain in processorBlock's output buffer, preventing completion

Sign up to request clarification or add additional context in comments.

2 Comments

thank you for the suggestion. I'm just not sure how ReadOperationItemsAsync would propagate data to the next block before it finishes its execution. In my case ReadOperationItemsAsync would have a loop to iterate a list of items. Instead of making ReadOperationItemsAsync return all items in a list I was trying to post this list in bacthes.
The library propagates the output of the function to the block specified by LinkTo. The function itself doesn't have to do anything, just as the function you pass to Select or SelectMany doesn't have to do anything special. TransformManyBlock will propagate the results one by one just as LINQ's SelectMany does. BatchBlock in the end batches items in fixed-size batches.
2

I think that what you have to do is to switch the processIdBlock from ActionBlock<int> to TransformManyBlock<int, IEnumerable<Item>>. This block takes a delegate that should return an IEnumerable<TOutput> for each int it receives. The generic argument TOutput in your case is IEnumerable<Item> (which makes things a bit confusing). A memory-efficient implementation of such a delegate is to make it an iterator, meaning it should include the yield return statement:

private IEnumerable<IEnumerable<Item>> ProcessAsync(int id)
{
    List<int> items = ReadOperationItemsAsync(id).Result;
    foreach (IList<int> batch in items.Buffer(10))
    {
        IEnumerable<Item> results = BulkProcessItems(batch);
        yield return results;
    }
}

The Buffer LINQ operator exists in the System.Interactive package. It is functionally equivalent to the .NET 6 Chunk operator.

Unfortunately you have to block for the result of the ReadOperationItemsAsync operation (using the .Result property). The TransformManyBlock<I,O> started supporting asynchronous sequences from .NET 6 and later, and you are targeting an earlier .NET platform.

After doing this modification, you should be able to link the processIdBlock to the sendEmailsBlock.

Comments

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.