事件驱动的TCP客户端

7

我需要一个事件驱动的、基于Windows平台的、使用C#开发的TCP客户端。

当读取缓存中至少有35个字节时,我需要调用一个处理程序来读取这35字节,并从该“数据包”中获取长度值,然后以阻塞方式读取该长度的第二部分数据。


流就是流。任何大于一个字节的结构的解释都在你的职责范围内。你的代码需要组装消息头,解析它,累积剩余的消息数据,缓冲任何额外的字节以供后续解释...。可悲的是,19世纪赢得了网络协议战争。 - HABO
哇...Kernigan和Ritchie没有提到输掉协议战争的事... - Chuck Bland
感谢 ceykooo 和 ikh。我会深入研究你们的建议。 - Chuck Bland
其实现在是21世纪的啦 ;-) - oo_dev
4个回答

4

有一个相对较新的项目可以提供这个功能:https://github.com/clariuslabs/reactivesockets

从他们的页面上可以看到:

基于IObservable实现了一套非常易于使用的套接字API。它允许非常简单的协议实现,例如:

var client = new ReactiveClient("127.0.0.1", 1055);

// The parsing of messages is done in a simple Rx query over the receiver observable
// Note this protocol has a fixed header part containing the payload message length
// And the message payload itself. Bytes are consumed from the client.Receiver 
// automatically so its behavior is intuitive.
IObservable<string> messages = from header in client.Receiver.Buffer(4)
                               let length = BitConverter.ToInt32(header.ToArray(), 0)
                               let body = client.Receiver.Take(length)
                               select Encoding.UTF8.GetString(body.ToEnumerable().ToArray());

// Finally, we subscribe on a background thread to process messages when they are available
messages.SubscribeOn(TaskPoolScheduler.Default).Subscribe(message => Console.WriteLine(message));
client.ConnectAsync().Wait();

2

我不认为BCL中有任何基于事件的Socket类可用,但如果你只是想要比裸的Socket更高级一些的东西,也许你应该研究一下TcpClient。它将为您处理底层流的缓冲,让您可以通过StreamReader等方式访问它:

TcpClient client;
// ... construct, connect, etc ...
new StreamReader(client.GetStream());

如果您正在使用基于行的协议,您只需要使用StreamReader.ReadLine(),但StreamReader.Read()也可以很容易地满足您的需求。

1
为了朝着正确的方向前进,请查看Socket.BeginReceive()Socket.BeginSend()
此外,这里有一系列微软提供的使用上述函数的示例。这帮助我开始使用它们。
不幸的是,除非读取缓冲区中至少有35个字节,否则我无法看到调用回调的选项;它将在接收到任何内容时被调用——即使是零字节。然而,对方很可能不会逐字节地发送消息给您。

1
答案可能太晚了。我最近遇到了类似的问题,并为此开发了两个项目。第一个项目提供了一个基于事件的TcpClient,已经在所有主要操作系统上进行了测试。它将所有传入的数据作为可以轻松订阅的事件传递。第二个项目负责处理传入的数据。有一个分析器等待StartToken,然后分析包的长度。StartTokenWithLengthInfoDataPackageAnalyzer。如果现有的分析器不足,您也可以实现自己的分析器。
var dataPackageAnalyzer = new StartTokenWithLengthInfoDataPackageAnalyzer(0x01);
var dataPackageHandler = new DataPackageHandler(dataPackageAnalyzer);
dataPackageHandler.NewDataPackage += NewDataPackage;

void NewDataPackage(DataPackage dataPackage)
{
    //final package
}

void OnDataReceived(byte[] receivedData)
{
    //fragmented data
    dataPackageHandler.AddData(receivedData);
}

using var cancellationTokenSource = new CancellationTokenSource(1000);
using var tcpClient = new TcpClient();
tcpClient.DataReceived += OnDataReceived;
await tcpClient.ConnectAsync("127.0.0.1", 9000, cancellationTokenSource.Token);

//Wait for data
Console.ReadKey();

tcpClient.Disconnect();
tcpClient.DataReceived -= OnDataReceived;
dataPackageHandler.NewDataPackage -= NewDataPackage;

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接