多线程异步模式

3
我有一个场景,其中多个线程通过单个套接字发送数据。已将唯一标识符插入消息中,并在响应消息中回显唯一标识符。当套接字隔离到单个客户端时,一切都运行良好(如预期)。现在我正在寻找多个线程的异步/等待模式,其中客户机等待特定响应。
以下是一些代码示例:
using (var ns = new NetworkStream(_socket))
{
    byte[] requestBuffer = GetBuffer(request);
    await ns.WriteAsync(requestBuffer, 0, request.Length);
    byte[] responseBuffer = await ReceiveMessageAsync(ns);

    // process response message
}

上述示例在多线程情况下无法正常工作,因为消息可能以任何顺序返回,因此下一条从线路上返回的消息可能不属于当前客户端。我的想法是客户端将使用其唯一ID注册委托或任务(将其存储在字典中),并且当带有该唯一ID的消息返回时,委托或任务将使用响应字节“完成”。我猜这可以通过EventHandler相对容易地实现,但我正在寻找一种等待响应的方法。
例如:
using (var ns = new CustomNetworkStream(_socket))
{
    Task waitTask = ns.RegisterTask(this.UniqueId);

    byte[] requestBuffer = GetBuffer(request);
    await ns.WriteAsync(requestBuffer, 0, request.Length);

    byte[] responseBuffer = await waitTask;

    // process response message
}

我不知道"RegisterTask"方法看起来是什么样子,该如何存储任务,如何使任务'等待'并且后来将其作为任务结果'完成'。有什么想法吗?我已经研究了Toub的异步协作原语系列,但我不确定是否是正确的方法。

1
我认为你只需要 TaskCompletionSource<T>。但是,针对套接字通信,多线程异步 API 非常复杂,远比你的示例代码所展示的要复杂得多。我建议使用一个更高级的抽象,如 SignalR。 - Stephen Cleary
我的库Griffin.Framework的网络部分已经内置了支持此功能的内容(包括异步客户端)。http://griffinframework.net - jgauffin
谢谢 @jgauffin,我会看一下的。 - Chris Gessler
“stream” 暗示 TCP,但 “单个套接字上的多个客户端” 暗示 UDP。无论哪种方式,我认为虽然您可能能够使其正常工作,但您在这里有更基本的设计问题。如果没有详细信息解释您如何陷入这种情况,那么很难完全理解这个问题。 - Peter Duniho
@PeterDuniho - 我正在创建一个持久化套接字,并允许多个线程通过连接发送/接收数据。 - Chris Gessler
2个回答

3
您需要的是一个TaskCompletionSource<byte[]>作为同步构造器,一个ConcurrentDictionary来映射id和TCS以及一个监听器。
ConcurrentDictionary<UniqueId, TaskCompletionSource<byte[]>> _dictionary;
async Task Listen(CancellationToken token)
{
    while (!token.IsCancellationRequested)
    {
        using (var ns = new NetworkStream(_socket))
        {
            byte[] responseBuffer = await ReceiveMessageAsync(ns);
            var id = ExtractId(responseBuffer);
            TaskCompletionSource<byte[]> tcs;
            if (dictionary.TryRemove(id, out tcs))
            {
                tcs.SetResult(responseBuffer);
            }
            else
            {
                // error
            }
        }
    }
}

Task RegisterTask(UniqueId id)
{
    var tcs = new TaskCompletionSource<byte[]>();
    if (!_dictionary.TryAdd(id, tcs))
    {
        // error
    }
    return tcs.Task;
}

然而,正如Stephen Cleary建议的那样,您可能想要使用现有的解决方案来实现此目的。

看起来是一个合理的解决方案。通常,我会使用发布/订阅或SignalR,但请求/响应都在服务器上作为外部请求/响应的一部分进行处理,因此我需要尽可能减少服务器跳数。 - Chris Gessler

3

因此,您需要将所有这些内容封装到一个新类中,因为您需要在读取和写入的地方共享状态。

每次写入流时,您需要接受唯一ID,并将条目添加到查找表中,将ID映射到TaskCompletionSource。然后,写入方法可以从该TCS返回Task

然后,您可以有一个单独的读取器,它只需坐在那里从流中读取,找到与该响应ID相关联的字典条目,并设置其结果。

public class MyNetworkStream : IDisposable
{
    private NetworkStream stream;
    private ConcurrentDictionary<int, TaskCompletionSource<byte[]>> lookup =
        new ConcurrentDictionary<int, TaskCompletionSource<byte[]>>();
    private CancellationTokenSource disposalCTS = new CancellationTokenSource();
    public MyNetworkStream(Socket socket)
    {
        stream = new NetworkStream(socket);
        KeepReading();
    }
    public void Dispose()
    {
        disposalCTS.Cancel();
        stream.Dispose();
    }

    public Task<byte[]> WriteAndWaitAsync(byte[] buffer, int offset, 
        int count, int uniqueID)
    {
        var tcs = lookup.GetOrAdd(uniqueID, new TaskCompletionSource<byte[]>());
        stream.WriteAsync(buffer, offset, count);
        return tcs.Task;
    }

    private async void KeepReading()
    {
        try
        {
            //TODO figure out what you want for a buffer size so that you can 
            //read a block of the appropriate size.
            byte[] buffer = null;
            while (!disposalCTS.IsCancellationRequested)
            {
                //TODO edit offset and count appropriately 
                await stream.ReadAsync(buffer, 0, 0, disposalCTS.Token);
                int id = GetUniqueIdFromBlock(buffer);
                TaskCompletionSource<byte[]> tcs;
                if (lookup.TryRemove(id, out tcs))
                    tcs.TrySetResult(buffer);
                else
                {
                    //TODO figure out what to do here
                }
            }
        }
        catch (Exception e)
        {
            foreach (var tcs in lookup.Values)
                tcs.TrySetException(e);
            Dispose();
            //TODO consider any other necessary cleanup
        }
    }

    private int GetUniqueIdFromBlock(byte[] buffer)
    {
        throw new NotImplementedException();
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接