TPL数据流如何与异步操作配合使用

4
我正在尝试使用TPL数据流将一些旧的套接字代码移植到使用TPL数据流和新的异步功能。虽然API感觉非常稳定,但我的代码仍然感觉杂乱无章。我想知道我是否漏掉了什么。
我的要求如下:一个套接字类公开了Open、Close、Send和Receive方法。所有方法都返回一个任务,并且都是异步的。Open和Close是原子性的。Send和Receive可以同时工作,但两者都只能处理一个命令。
从逻辑上讲,这将带我进入下一个用于内部控制的代码片段:
// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair;
private readonly ActionBlock<Action> connectionBlock;
private readonly ActionBlock<Action> sendBlock;
private readonly ActionBlock<Action> receiveBlock;

// within the constructor:
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair();
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions()  { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler });
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions()    { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
this.receiveBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });

到目前为止,一切都很好。我可以安全地向发送和接收块发送操作,而不必担心同时进行的连接相关操作。此外,ActionBlock确保多个调用发送(接收、关闭和打开)是同步的。
问题在于,没有一个简单的方法让操作将任务传递回发布者。现在我正在使用TaskCompletionSource来传递结果。例如:
public Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
    TaskCompletionSource<object> resultCompletionSource = new TaskCompletionSource<object>();

    sendBlock.Post(async () =>
    {
        if (!tcpClient.Connected)
            throw new InvalidOperationException("Cant send when not open");
        else
        {
            await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
            resultCompletionSource.SetResult(null);
        }
    });

    return resultCompletionSource.Task;
}

这种方式感觉很丑陋和笨拙。我的问题是:有没有一种方法可以使用TPL同步工作流程,而不必使用TaskCompletionSource来进行通信?

谢谢!


1
我没有使用过TPL DataFlow,所以可能会有些偏差,但如果您需要知道操作何时完成,那么似乎您不希望使用Post的“fire and forget”,而是应该使用SendAsync,这样您就可以获得一个Task了。参见:http://msdn.microsoft.com/en-us/library/hh194681(v=vs.110) - James Manning
2
据我理解,当请求被接受或明确拒绝时,SendAsync返回一个任务。由于我正在使用无限制的操作块,因此请求将始终直接被接受。在我的情况下,SendAsync无法提供帮助。 - Polity
1个回答

5

首先,你根本不需要TPL Dataflow,因为你实际上没有任何数据流。

其次,像这样使用TaskScheduler也不是正确的解决方案。 TaskScheduler调度代码,但在你await某些东西时,没有代码正在运行。 因此,在WriteAsync()执行其异步工作时,Open()的代码可能会运行。

实际上,你需要的是类似于ReaderWriterLock的东西,但它可以很好地与async配合使用。 框架中没有类似的东西,但是你可以使用Stephen Toub的文章Building Async Coordination Primitives, Part 7: AsyncReaderWriterLock中的代码,它正好做你需要的事情。 文章还更详细地解释了为什么使用TaskScheduler是错误的。

使用 AsyncReaderWriterLock,你的代码可能是这样的:
public async Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
    using (await readerWriterLock.ReaderLockAsync())
    {
        if (!tcpClient.Connected)
            throw new InvalidOperationException("Can't send when not open");

        await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
    }
}

1
读完了OP的问题后,我首先想到的是链接到Toub的文章并给出一些背景。但我犹豫了,因为我记不清Toub在哪篇博客文章、白皮书等中写过这个问题。然后我看到这个答案已经很好地解释了,并链接到了确切的文章!偶尔你会读到一个你希望可以多次点赞的答案! - payo
谢谢你的回答,虽然它还没有完全满足我的需求。打开和关闭应该只从发送和接收中运行。虽然使用两个异步rwlock可以工作,但这将是一个丑陋的解决方案。基于ConcurrentExclusiveSchedulerPair调度任务似乎是一种整洁而易于同步的方法。我认为你错误的假设是Open应该能够在Send正在运行时运行,但事实并非如此。当调用open时,任何正在运行的Send/Receive都应该在实际打开之前完成。 - Polity
我的观点是,如果您有async方法并使用ConcurrentExclusiveSchedulerPair,那么Open()将能够在Send()等待时运行。但如果您使用AsyncReaderWriterLock,则不会。我认为这正是您想要的。 - svick
@svick 实际上不是这样的。对于这个问题讲得不够清楚很抱歉。在打开客户端时,不应允许运行发送和接收操作。(也就是说,在打开(连接)时发送消息是行不通的)。另一方面,客户端可以在接收其他数据的同时发送数据。 - Polity
@Polity 我理解你想要的是什么,那也是我所说的。如果这就是你想要的,那么 AsyncReaderWriterLock 是正确的解决方案,而 ConcurrentExclusiveSchedulerPair 则不是。 - svick
@svick 已经明白了,但是发送或接收的并行运行数量为1。因此,我仍然需要一种批处理多个调用发送和接收的方法。为此,我可以引入另一种异步锁定机制,但是与现在相比,代码不会更加清晰。TPL数据流提供了一种干净的控制并行运行的方式,尽管它目前不能很好地处理异步操作。 - Polity

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接