4

I'm experimenting with TPL dataflow by porting some old socket code over for using TPL dataflow and the new async features. Although the API feels rock solid, my code still ends up feeling messy. I'm wondering if I'm missing something here.

My requirements is as follows: A socket class exposes: Open, Close, Send and Receive methods. All return a Task and therefore are async. Open and Close are atomic. Send and Receive can work next to each other although both can only handle 1 command a time.

Logically this brings me to the next piece of code for internal control:

// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair;
private readonly ActionBlock<Action> connectionBlock;
private readonly ActionBlock<Action> sendBlock;
private readonly ActionBlock<Action> receiveBlock;

// within the constructor:
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair();
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions()  { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler });
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions()    { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
this.receiveBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });

So far all good. I can safely send actions to the Send and Receive block without having to worry about a running connection related action in the meantime. Also ActionBlock ensures that multiple calls to send are synchronized (idem for receive, close and open).

The problem is that there is no easy way for an action to communicate a task back to the poster. Right now i'm using a TaskCompletionSource to communicate a result back. Like:

public Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
    TaskCompletionSource<object> resultCompletionSource = new TaskCompletionSource<object>();

    sendBlock.Post(async () =>
    {
        if (!tcpClient.Connected)
            throw new InvalidOperationException("Cant send when not open");
        else
        {
            await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
            resultCompletionSource.SetResult(null);
        }
    });

    return resultCompletionSource.Task;
}

which just feels ugly and clumsy. My question is: Is there a way to synchronize the workflow using TPL without having to use a TaskCompletionSource for communication in between?

Thanks!

2
  • 1
    I've not used TPL DataFlow, so this could be way off, but if you need to know when the action completes, it seems like you don't want the 'fire and forget' of Post, but instead should use SendAsync so you get a Task back instead? msdn.microsoft.com/en-us/library/hh194681(v=vs.110) Commented Jun 12, 2012 at 16:53
  • 2
    To my understanding, SendAsync returns a task when the post is accepted or explicitly declined. Since i'm using unbounded action blocks. Posts will always be accepted directly. SendAsync cant help in my case Commented Jun 12, 2012 at 16:56

1 Answer 1

5

First, you don't need TPL Dataflow for this at all, because you don't actually have any dataflow.

Second, using TaskSchedulers like this is not the correct solution either. TaskSchedulers schedule code, but while you await something, there is no code running. So, while WriteAsync() is doing its async work, code for Open() could run.

What you actually need is something like ReaderWriterLock, but that works well with async. There isn't anything like that in the framework, but you could use code from Stephen Toub's article Building Async Coordination Primitives, Part 7: AsyncReaderWriterLock, which does exactly what you need. The article also explains in more detail why using a TaskScheduler is wrong.

Using AsyncReaderWriterLock, your code might look like this:

public async Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
    using (await readerWriterLock.ReaderLockAsync())
    {
        if (!tcpClient.Connected)
            throw new InvalidOperationException("Can't send when not open");

        await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
    }
}
Sign up to request clarification or add additional context in comments.

6 Comments

Upon reading the OP's question, my first thought was to link to Toub's article and give some background. I was hesitant because I couldn't remember which blog post, white paper, etc. in which Toub wrote about this very issue. Then I saw this answer had already explained it masterfully AND linked to the exact article! Every now and then you read an answer you wish you could upvote more than once!
Thanks for the answer although it doesnt satisfy me just yet. Open and close should be running exclusively from send and receive and the other. Although an async rwlock will work when using 2 of those, it will be an ugly solution. Scheduling tasks based on a ConcurrentExclusiveSchedulerPair seems for neat and easy synchronization. I think your wrong assumption is that Open should be able to run while Send is running which is not the case. When open is called, any running Send/Receive should be finished off before actually opening.
@Polity My point is that if you have async methods and use ConcurrentExclusiveSchedulerPair, then Open() will be able to run while Send() is awaiting. But if you use AsyncReaderWriterLock, it won't. I think that's exactly what you want.
@svick actually its not. Sorry for this not being clear. When opening a client, no send and receive operation should be allowed to run. (hence, sending while opening(connecting) is not going to work). On the other hand, The client is able to send data while receiving other data.
@Polity I do understand that's what you want, that's what I said. If that's what you want, then AsyncReaderWriterLock is the right solution and ConcurrentExclusiveSchedulerPair isn't.
|

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.