使用Rx编写GetMessages
函数的最简洁方式是什么:
static void Main()
{
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
var messages = GetMessages(socket, IPAddress.Loopback, 4000);
messages.Subscribe(x => Console.WriteLine(x));
Console.ReadKey();
}
static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
// now will receive a stream of messages
// each message is prefixed with an 4 bytes/Int32 indicating it's length.
// the rest of the message is a string
// ????????????? Now What ?????????????
}
以下是上述示例代码的简单服务器驱动程序:http://gist.github.com/452893#file_program.cs
在Socket编程中使用Rx
我一直在探索使用Reactive Extensions进行一些Socket编程工作。我这么做的动机是它会使代码“更简单”。无论是意味着更少的代码,还是更少的嵌套等等。
然而目前为止似乎并不是这样的:
- 我没有找到很多使用Rx与sockets的示例。
- 我找到的那些示例看起来并不比我现有的BeginXXXX,EndXXXX代码简单。
- 虽然
Observable
具有FromAsyncPattern的扩展方法,但它并不包括SocketEventArgs
异步API。
当前无法工作的解决方案
这是我目前的解决方案。它无法工作,它会失败并出现堆栈溢出(呵呵),我还没有弄清楚语义,以便我可以创建一个IObservable
将读取指定字节数。
static IObservable<int> GetMessages(Socket socket, IPAddress addr, int port)
{
var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);
// keep reading until we get the first 4 bytes
byte[] buffer = new byte[1024];
var readAsync = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);
IObservable<int> readBytes = null;
var temp = from totalRead in Observable.Defer(() => readBytes)
where totalRead < 4
select readAsync(buffer, totalRead, totalRead - 4, SocketFlags.None);
readBytes = temp.SelectMany(x => x).Sum();
var nowDoSomethingElse = readBytes.SkipUntil(whenConnect);
}
IObservable
,因此我正在探索TPL和Rx的混合使用。例如,Connect操作更像是一个任务,而重复的读取可以被视为IO<T>
。如果最终结果足够简单,它将成为Nito.Async的一部分。 - Stephen Cleary