我正在尝试使用TPL数据流将一些旧的套接字代码移植到使用TPL数据流和新的异步功能。虽然API感觉非常稳定,但我的代码仍然感觉杂乱无章。我想知道我是否漏掉了什么。
我的要求如下:一个套接字类公开了Open、Close、Send和Receive方法。所有方法都返回一个任务,并且都是异步的。Open和Close是原子性的。Send和Receive可以同时工作,但两者都只能处理一个命令。
从逻辑上讲,这将带我进入下一个用于内部控制的代码片段:
到目前为止,一切都很好。我可以安全地向发送和接收块发送操作,而不必担心同时进行的连接相关操作。此外,ActionBlock确保多个调用发送(接收、关闭和打开)是同步的。
问题在于,没有一个简单的方法让操作将任务传递回发布者。现在我正在使用TaskCompletionSource来传递结果。例如:
我的要求如下:一个套接字类公开了Open、Close、Send和Receive方法。所有方法都返回一个任务,并且都是异步的。Open和Close是原子性的。Send和Receive可以同时工作,但两者都只能处理一个命令。
从逻辑上讲,这将带我进入下一个用于内部控制的代码片段:
// exposing an exclusive scheduler for connectivity related tasks and a parallel scheduler where send and receive can work with
private readonly ConcurrentExclusiveSchedulerPair exclusiveConnectionSchedulerPair;
private readonly ActionBlock<Action> connectionBlock;
private readonly ActionBlock<Action> sendBlock;
private readonly ActionBlock<Action> receiveBlock;
// within the constructor:
this.exclusiveConnectionSchedulerPair = new ConcurrentExclusiveSchedulerPair();
this.connectionBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ExclusiveScheduler });
this.sendBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
this.receiveBlock = new ActionBlock<Action>(action => action(), new ExecutionDataflowBlockOptions() { TaskScheduler = exclusiveConnectionSchedulerPair.ConcurrentScheduler });
到目前为止,一切都很好。我可以安全地向发送和接收块发送操作,而不必担心同时进行的连接相关操作。此外,ActionBlock确保多个调用发送(接收、关闭和打开)是同步的。
问题在于,没有一个简单的方法让操作将任务传递回发布者。现在我正在使用TaskCompletionSource来传递结果。例如:
public Task Send(ArraySegment<byte> buffer, CancellationToken cancellationToken)
{
TaskCompletionSource<object> resultCompletionSource = new TaskCompletionSource<object>();
sendBlock.Post(async () =>
{
if (!tcpClient.Connected)
throw new InvalidOperationException("Cant send when not open");
else
{
await sendStream.WriteAsync(buffer.Array, buffer.Offset, buffer.Count, cancellationToken);
resultCompletionSource.SetResult(null);
}
});
return resultCompletionSource.Task;
}
这种方式感觉很丑陋和笨拙。我的问题是:有没有一种方法可以使用TPL同步工作流程,而不必使用TaskCompletionSource来进行通信?
谢谢!