I have created a pipeline of Transformblocks and finally ending with Actionblock. This pipeline will remain static and once created when the service starts and will be used to push data whenever there is api request from the client.
When one of the blocks internal function throws an unhandled error say Bblock in below code, the propagation completes and aggregated exception is thrown at the end. but the issue is I cannot send any more request in the blocks This is happening in UT. Will it work in real case scenario? I do not want any block in bad state as this can block the message pipeline endlessly which I can see in UT.
How to handle error in a block gracefully so next message is executed without an issue (don't want block in faulted state)?
I am sending cancellationtoken while sending message as this is will be new for every request. Will taskcancelledexception make the block in faulted state?
Code
public static ClassA
{
private BufferBlock<Input> requestQueue;
private ActionBlock<Input> CBlock;
void init()
{
var options = new DataflowLinkOptions { PropagateCompletion = true };
this.requestQueue.LinkTo(Ablock, options);
Ablock.LinkTo(Bblock, options);
Bblock.LinkTo(Cblock, options);
}
public async Task<bool> Execute(Input input, CancellationToken cancellationToken)
{
try
{
await this.requestQueue.SendAsync(input, cancellationToken);
await this.CBlock.Completion;
}
catch (Exception ex)
{
if (ex is AggregateException)
{}
}
}
}
Here in the above sample code. Bblock throws error.