I have this case where I am trying to build a pipeline using TPL Dataflow that could process a big number of items. At some point in the pipeline I need to batch items and send to the next block after they are processed.
I cannot find a proper way to send the processed items to the next block. I need to consider that the current block might still be processing other items and will need to send more items to the next block.
I received some suggestions to consider using TransformBlock or TransformManyBlock, but it would only work if my action returned a single array.
The problem is mostly with an action that is in the middle of the pipeline, and has a for loop to iterate through the items.
private async Task ProcessAsync(int id) {
var items = await ReadOperationItemsAsync(id);
// items could contain a huge number of items
// therefore, I process in batches
for (var skip = 0; skip < items.Count; skip += 10) {
var batch = items.Skip(skip).Take(items.Count);
var results = BulkProcessitems(batch);
// send these results to the next block in each iteration
}
}
Here is an example of the pipeline I am trying to build just to give more context.
namespace Test {
public class PipelineTest {
public async Task PipelineTestTask() {
var batchSize = 10;
var buffer = new BufferBlock<int>(
new DataflowBlockOptions() {
BoundedCapacity = batchSize
});
var processIdBlock = new ActionBlock<int>(
id => ProcessAsync(id),
new ExecutionDataflowBlockOptions {
BoundedCapacity = batchSize,
});
var sendEmailsBlock = new ActionBlock<IEnumerable<Item>>(
items => SendEmailsAsync(items),
new ExecutionDataflowBlockOptions {
BoundedCapacity = batchSize,
});
var getIdsTask = GetIdsToProcessAsync(buffer);
buffer.LinkTo(processIdBlock, new DataflowLinkOptions { PropagateCompletion = true });
await processIdBlock.Completion;
}
private async Task GetIdsToProcessAsync(ITargetBlock<int> target) {
var ids = new List<int>() { 1, 2, 3 };
foreach (var id in ids) {
await target.SendAsync(id);
}
target.Complete();
}
private async Task ProcessAsync(int id) {
var items = await ReadOperationItemsAsync(id);
for (var skip = 0; skip < items.Count; skip += 10) {
var batch = items.Skip(skip).Take(items.Count);
var results = BulkProcessitems(batch);
}
}
private void SendEmailsAsync(IEnumerable<Item> items) {
// Do something to send emails
}
// pretend this does some heavy operation
private IEnumerable<Item> BulkProcessitems(IEnumerable<int> itemIds) {
return itemIds.Select(i => new Item { Id = i, Name = "Exampe" });;
}
public class Item {
public int Id { get; set; }
public string? Name { get; set; }
}
private static Task<List<int>> ReadOperationItemsAsync(int operationId) {
// Placeholder method to simulate reading items for an operation from the database
var items = operationId switch {
1 => [1],
2 => [2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21, 22, 23, 24, 25, 26, 27, 28, 29, 30, 31, 32, 33, 34, 35, 36, 37, 38, 39, 40, 41, 42, 43, 44, 45, 46, 47, 48, 49, 50, 51, 52, 53, 54, 55, 56, 57, 58, 59, 60, 61, 62, 63, 64, 65, 66, 67, 68, 69, 70, 71, 72, 73, 74, 75, 76, 77, 78, 79, 80, 81, 82, 83, 84, 85, 86, 87, 88, 89, 90, 91, 92, 93, 94, 95, 96, 97, 98, 99, 100, 101, 102, 103, 104, 105, 106, 107, 108, 109, 110, 111, 112, 113, 114, 115, 116, 117, 118, 119, 120, 121, 122, 123, 124, 125, 126, 127, 128, 129, 130, 131, 132, 133, 134, 135, 136, 137, 138, 139, 140, 141, 142, 143, 144, 145, 146, 147, 148, 149, 150, 151, 152, 153, 154, 155, 156, 157, 158, 159, 160, 161, 162, 163, 164, 165, 166, 167, 168, 169, 170, 171, 172, 173, 174, 175, 176, 177, 178, 179, 180, 181, 182, 183, 184, 185, 186, 187, 188, 189, 190, 191, 192, 193, 194, 195, 196, 197, 198, 199, 200],
3 => [201, 202, 203, 204, 205, 206, 207, 208, 209, 210, 211, 212, 213, 214, 215, 216, 217, 218, 219, 220, 221, 222, 223, 224, 225, 226, 227, 228, 229, 230, 231, 232, 233, 234, 235, 236, 237, 238, 239, 240, 241, 242, 243, 244, 245, 246, 247, 248, 249, 250, 251, 252, 253, 254, 255, 256, 257, 258, 259, 260, 261, 262, 263, 264, 265, 266, 267, 268, 269, 270, 271, 272, 273, 274, 275, 276, 277, 278, 279, 280, 281, 282, 283, 284, 285, 286, 287, 288, 289, 290, 291, 292, 293, 294, 295, 296, 297, 298, 299, 300, 301, 302, 303, 304, 305, 306, 307, 308, 309, 310, 311, 312, 313, 314, 315, 316, 317, 318, 319, 320, 321, 322, 323, 324, 325, 326, 327, 328, 329, 330, 331, 332, 333, 334, 335, 336, 337, 338, 339, 340, 341, 342, 343, 344, 345, 346, 347, 348, 349, 350, 351, 352, 353, 354, 355, 356, 357, 358, 359, 360, 361, 362, 363, 364, 365, 366, 367, 368, 369, 370, 371, 372, 373, 374, 375, 376, 377, 378, 379, 380, 381, 382, 383, 384, 385, 386, 387, 388, 389, 390, 391, 392, 393, 394, 395, 396, 397, 398, 399, 400],
_ => new List<int>()
};
return Task.FromResult(items);
}
}
}
The issue is mostly with this action which is in the middle of the pipeline, and
My biggest concern here is memory allocation. If I wait for processIdBlockto process all items and return a big list to sendEmailsBlock, then it might consume too much memory.
Any suggestion on how to approach this?
ActionBlocks are terminal blocks. You probably want aTransformBlock. And as far as I can see, that would be aTransformBlock<int, Item>?bufferand theemailBuffer? All dataflow blocks are equipped with an input buffer. There is no reason to add another one explicitly in the pipeline, unless you are doing something advanced. Is your scenario advanced?