使用响应式扩展(Rx)进行套接字编程实用吗?

23

使用Rx编写GetMessages函数的最简洁方式是什么:

static void Main()
{
    Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);

    var messages = GetMessages(socket, IPAddress.Loopback, 4000);
    messages.Subscribe(x => Console.WriteLine(x));

    Console.ReadKey();
}

static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
{
    var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);

    // now will receive a stream of messages
    // each message is prefixed with an 4 bytes/Int32 indicating it's length. 
    // the rest of the message is a string

    // ????????????? Now What ????????????? 
}

以下是上述示例代码的简单服务器驱动程序:http://gist.github.com/452893#file_program.cs

在Socket编程中使用Rx

我一直在探索使用Reactive Extensions进行一些Socket编程工作。我这么做的动机是它会使代码“更简单”。无论是意味着更少的代码,还是更少的嵌套等等。

然而目前为止似乎并不是这样的:

  1. 我没有找到很多使用Rx与sockets的示例。
  2. 我找到的那些示例看起来并不比我现有的BeginXXXX,EndXXXX代码简单。
  3. 虽然Observable具有FromAsyncPattern的扩展方法,但它并不包括SocketEventArgs异步API。

当前无法工作的解决方案

这是我目前的解决方案。它无法工作,它会失败并出现堆栈溢出(呵呵),我还没有弄清楚语义,以便我可以创建一个IObservable将读取指定字节数。

    static IObservable<int> GetMessages(Socket socket, IPAddress addr, int port)
    {
        var whenConnect = Observable.FromAsyncPattern<IPAddress, int>(socket.BeginConnect, socket.EndConnect)(addr, port);

        // keep reading until we get the first 4 bytes
        byte[] buffer = new byte[1024];
        var readAsync = Observable.FromAsyncPattern<byte[], int, int, SocketFlags, int>(socket.BeginReceive, socket.EndReceive);

        IObservable<int> readBytes = null;
        var temp = from totalRead in Observable.Defer(() => readBytes)
                   where totalRead < 4
                   select readAsync(buffer, totalRead, totalRead - 4, SocketFlags.None);
        readBytes = temp.SelectMany(x => x).Sum();

        var nowDoSomethingElse = readBytes.SkipUntil(whenConnect);
    }

我正在处理类似的事情。单个“socket”不能直接转换为IObservable,因此我正在探索TPL和Rx的混合使用。例如,Connect操作更像是一个任务,而重复的读取可以被视为IO<T>。如果最终结果足够简单,它将成为Nito.Async的一部分。 - Stephen Cleary
我的想法是你可以在流的开头使用Connect,然后在结尾使用Close,这与在一些现有的Rx示例中使用的MouseDown和MouseUp IObservable事件有些相似。 - Joseph Kingry
当这个问题被提出时,它可能还不存在,但现在它已经存在了,可能会引起您的兴趣:“Rxx提供了几个API,使异步调用Web请求变得容易,包括(...)System.Net.Sockets.Socket,System.Net.Sockets.TcpClient”。http://rxx.codeplex.com/ - Stéphane Gourichon
2个回答

8

以下内容可能有所帮助。这未经测试,没有考虑到异常情况和返回部分消息的情况。但除此之外,我认为这是正确的方向。

    public static IObservable<T> GetSocketData<T>(this Socket socket,
        int sizeToRead, Func<byte[], T> valueExtractor)
    {
        return Observable.CreateWithDisposable<T>(observer =>
        {
            var readSize = Observable
                .FromAsyncPattern<byte[], int, int, SocketFlags, int>(
                socket.BeginReceive,
                socket.EndReceive);
            var buffer = new byte[sizeToRead];
            return readSize(buffer, 0, sizeToRead, SocketFlags.None)
                .Subscribe(
                x => observer.OnNext(valueExtractor(buffer)),
                    observer.OnError,
                    observer.OnCompleted);
        });
    }

    public static IObservable<int> GetMessageSize(this Socket socket)
    {
        return socket.GetSocketData(4, buf => BitConverter.ToInt32(buf, 0));
    }

    public static IObservable<string> GetMessageBody(this Socket socket,
        int messageSize)
    {
        return socket.GetSocketData(messageSize, buf =>
            Encoding.UTF8.GetString(buf, 0, messageSize));
    }

    public static IObservable<string> GetMessage(this Socket socket)
    {

        return
            from size in socket.GetMessageSize()
            from message in Observable.If(() => size != 0,
                socket.GetMessageBody(size),
                Observable.Return<string>(null))
            select message;
    }

    public static IObservable<string> GetMessagesFromConnected(
        this Socket socket)
    {
        return socket
            .GetMessage()
            .Repeat()
            .TakeWhile(msg => !string.IsNullOrEmpty(msg));
    }

    public static IObservable<string> GetMessages(this Socket socket,
        IPAddress addr, int port)
    {
        return Observable.Defer(() => 
        {
            var whenConnect = Observable
                .FromAsyncPattern<IPAddress, int>(
                    socket.BeginConnect, socket.EndConnect);
            return from _ in whenConnect(addr, port)
                   from msg in socket.GetMessagesFromConnected()
                       .Finally(socket.Close)
                   select msg;
        });
    }

编辑:为了处理不完整的读取,可以像Dave Sexton在RX论坛上相同主题中提出的那样,在GetSockedData中使用Observable.While。

编辑:另外,请查看Jeffrey Van Gogh的文章:异步System.IO.Stream读取


是的,这并不真正起作用,因为EndRecieve可能会返回比完整大小少的内容,需要另一个请求。 - Joseph Kingry

2

好的,这可能有点“作弊”,但我想你可以重新运用我的非Rx答案,并将其包装在Observable.Create中。

我相当确定将套接字作为IDisposable返回是错误的语义,但不确定应该怎么做。

    static IObservable<string> GetMessages(Socket socket, IPAddress addr, int port)
    {
        return Observable.CreateWithDisposable<string>(
            o =>
            {
                byte[] buffer = new byte[1024];

                Action<int, Action<int>> readIntoBuffer = (length, callback) =>
                {
                    var totalRead = 0;

                    AsyncCallback receiveCallback = null;
                    AsyncCallback temp = r =>
                    {
                        var read = socket.EndReceive(r);

                        if (read == 0)
                        {
                            socket.Close();
                            o.OnCompleted();
                            return;
                        }

                        totalRead += read;

                        if (totalRead < length)
                        {
                            socket.BeginReceive(buffer, totalRead, length - totalRead, SocketFlags.None, receiveCallback, null);
                        }
                        else
                        {
                            callback(length);
                        }
                    };
                    receiveCallback = temp;

                    socket.BeginReceive(buffer, totalRead, length, SocketFlags.None, receiveCallback, null);
                };

                Action<int> sizeRead = null;

                Action<int> messageRead = x =>
                {
                    var message = Encoding.UTF8.GetString(buffer, 0, x);
                    o.OnNext(message);
                    readIntoBuffer(4, sizeRead);
                };

                Action<int> temp2 = x =>
                {
                    var size = BitConverter.ToInt32(buffer, 0);
                    readIntoBuffer(size, messageRead);
                };
                sizeRead = temp2;

                AsyncCallback connectCallback = r =>
                {
                    socket.EndConnect(r);
                    readIntoBuffer(4, sizeRead);
                };

                socket.BeginConnect(addr, port, connectCallback, null);

                return socket;
            });
    }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接