如何在.NET中使用RX扩展消费UDP字节流

4

我在网上反复搜索后,想出了这个解决方案(未经测试)。

Private Function ObserveUDP() As IObservable(Of bytes())


    Dim f = Function(observer)
                Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
                Dim client = New UdpClient(endpoint)

                Dim obs = observable.*emphasized text*Generate(Of Task(Of UdpReceiveResult), UdpReceiveResult) _
                      ( Nothing _
                      , Function(task As Task(Of UdpReceiveResult)) task Is Nothing Or Not task.IsCompleted() _
                      , Function(task As Task(Of UdpReceiveResult)) client.ReceiveAsync() _
                      , Function(task As Task(Of UdpReceiveResult)) task.Result)

                Dim observable = obs.Select(Function(r) r.Buffer)

                dim handle = observable.Subscribe(observer)

                Dim df = Sub() 
                    client.Close()
                    handle.Dispose()
                End Sub

                Return Disposable.Create(df)

    End Function

    Return observable.Create(f)

End Function

我的要求是确保当订阅被取消时关闭UDP客户端。我相信以上代码很接近,但我认为它还不够正确。欢迎提供任何意见。
*编辑*
实际上,上面的示例完全错误,并且只会同步创建大量任务对象,而不会等待它们。经过一些试验和错误,我想出了以下通用函数来展开一个可等待对象,该函数一遍又一遍地调用。有任何评论吗?
''' initializer - a function that initializes and returns the state object
''' generator   - a function that asynchronously using await generates each value
''' finalizer   - a function for cleaning up the state object when the sequence is unsubscribed

Private Function ObservableAsyncSeq(Of T, I)( _
    initializer As Func(Of I), _
    generator As Func(Of I, Task(Of T)), _
    finalizer As Action(Of I))  As IObservable(Of T)

    Dim q = Function(observer As IObserver(Of T))
                Dim go = True
                Try
                    Dim r = Async Sub()
                                Dim ii As I = initializer()
                                While go
                                    Dim result = Await generator(ii)
                                    observer.OnNext(result)
                                End While
                                finalizer(ii)
                                observer.OnCompleted()
                            End Sub
                    Task.Run(r)
                Catch ex As Exception
                    observer.OnError(ex)
                End Try

                ' Disposable for stopping the sequence as per
                ' the observable contract
                Return Sub() go = False

            End Function

    Return Observable.Create(q)
End Function

使用UDP的示例
Private Function ObserveMeasurementPoints2() As IObservable(Of ProcessedDate)
    Dim initializer = Function()
                          Dim endpoint = New IPEndPoint(IPAddress.Parse(Me.IpAdress), Me.IPPort)
                          Return New UdpClient(endpoint)
                      End Function

    Dim finalizer = Function(client As UdpClient)
                        client.Close()
                    End Function

    Dim generator = Function(client As UdpClient) As Task(Of UdpReceiveResult)
                        Return client.ReceiveAsync()
                    End Function

    Return ObservableAsyncSeq(initializer, generator, finalizer).Select(Function(r) ProcessBytes(r.Buffer))

End Function
3个回答

6
您可以使用Enigmativity提到的Observable.Using,也可以简单地使用接受IDisposable作为返回参数的常规Observable.Create方法-这足以确保安全处理。

使用迭代器或异步方式完全没有问题。我列出了一种更加Rx的方法:

Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
    Return Observable.Using(Of T, UdpClient)(
        Function() New UdpClient(endpoint),
        Function(udpClient) _
            Observable.Defer(Function() udpClient.ReceiveAsync().ToObservable()) _
            .Repeat() _
            .Select(Function(result) processor(result.Buffer))
    )
End Function

传统方式:

Public Shared Function UdpStream(Of T)(endpoint As IPEndPoint, processor As Func(Of Byte(), T)) As IObservable(Of T)
    Return Observable.Using(
        Function() New UdpClient(endpoint),
        Function(udpClient) Observable.Defer( _
        Observable.FromAsyncPattern(
            AddressOf udpClient.BeginReceive,
            Function(iar)
                Dim remoteEp = TryCast(iar.AsyncState, IPEndPoint)
                Return udpClient.EndReceive(iar, remoteEp)
            End Function)
        ).Repeat() _
         .Select(processor)
    )
End Function

测试:

Shared Sub Main()
    Using UdpStream(New IPEndPoint(IPAddress.Loopback, 13200),
                    Function(bytes) String.Join(",", bytes)
                    ).Subscribe(AddressOf Console.WriteLine)
        Console.ReadLine()
    End Using

    Console.WriteLine("Done")
    Console.ReadKey()
End Sub

以上解决方案仅从UDP客户端获取单个结果。此外,Visual Studio 2012告诉我,beingreceive和endreceive已被弃用,应使用基于任务的异步方法。 - bradgonesurfing
我打赌这样的代码可以工作。Observable.Defer( () => udpclient.AsyncReceive().ToObservable() ).Repeat() - bradgonesurfing
我看到你在解决方案中也使用了repeat。很抱歉我在第一次评论中错过了它。然而,我认为可以使用基于任务的可观察生成器使代码更清晰。 - bradgonesurfing
@bradgonesurfing 是的。老方法是向后兼容的。我也添加了新方法。 - Asti

3
请看Observable.Using - 它专门用于创建一个可观察对象,该对象使用一次性资源来生成其值,并在完成时自动处理该资源。
您会发现UdpClient具有相同的CloseDispose方法实现,因此如果调用Dispose,则不需要调用Close
从反射器中可以看到:
void IDisposable.Dispose()
{
    this.Dispose(true);
}

public void Close()
{
    this.Dispose(true);
}

以下是Using的签名:

Public Shared Function Using(Of TSource, TResource As IDisposable)(
    ByVal resourceFactory As Func(Of TResource),
    ByVal observableFactory As Func(Of TResource, IObservable(Of TSource)))
        As IObservable(Of TSource)

1

我以前没有使用过UDPClient,但是看起来你正在使用Tasks(Cardinality =1)尝试接收数据流(Cardinality = many)。为了解决这个问题,你似乎在你的查询中添加了一个重复操作。这意味着你的查询将会执行以下步骤:

  1. 创建一个UDPClient
  2. 调用请求数据的操作
  3. 接收第一个获得的数据
  4. 将数据推送到序列中
  5. 关闭序列
  6. 释放UDPClient
  7. 创建一个UDPClient(回到步骤1)
  8. 调用请求数据的操作
  9. 接收第一个获得的数据
  10. ....直到你释放连接。

我认为你只需要通过获取字节流来读取套接字/网络连接中的数据。我在我的博客文章中向你展示了如何做到这一点:

http://introtorx.com/Content/v1.0.10621.0/15_SchedulingAndThreading.html#CreatingYourOwnIterator

这样做可以只保持一个连接打开,并在接收到字节时推送它们。
通过快速搜索,我还发现了关于UDPClient实现可靠性的担忧。 http://www.codeproject.com/Articles/1938/Issues-with-UdpClient-Receive 希望对你有所帮助。

using System;
using System.IO;
using System.Reactive.Concurrency;
using System.Reactive.Disposables;
using System.Reactive.Linq;

namespace MyLib
{
    public static class ObservableExtensions
    {
        //TODO: Could potentially upgrade to using tasks/Await-LC
        public static IObservable<byte> ToObservable(
            this Stream source,
            int buffersize,
            IScheduler scheduler)
        {
            var bytes = Observable.Create<byte>(o =>
            {
                var initialState = new StreamReaderState(source, buffersize);
                var currentStateSubscription = new SerialDisposable();
                Action<StreamReaderState, Action<StreamReaderState>> iterator =
                (state, self) =>
                    currentStateSubscription.Disposable = state.ReadNext()
                        .Subscribe(
                            bytesRead =>
                            {
                                for (int i = 0; i < bytesRead; i++)
                                {
                                    o.OnNext(state.Buffer[i]);
                                }
                                if (bytesRead > 0)
                                    self(state);
                                else
                                    o.OnCompleted();
                            },
                            o.OnError);
                var scheduledWork = scheduler.Schedule(initialState, iterator);
                return new CompositeDisposable(currentStateSubscription, scheduledWork);
            });
            return Observable.Using(() => source, _ => bytes);
        }

        private sealed class StreamReaderState
        {
            private readonly int _bufferSize;
            private readonly Func<byte[], int, int, IObservable<int>> _factory;
            public StreamReaderState(Stream source, int bufferSize)
            {
                _bufferSize = bufferSize;
                _factory = Observable.FromAsyncPattern<byte[], int, int, int>(
                source.BeginRead,
                source.EndRead);
                Buffer = new byte[bufferSize];
            }
            public IObservable<int> ReadNext()
            {
                return _factory(Buffer, 0, _bufferSize);
            }
            public byte[] Buffer { get; set; }
        }
    }
}

实际上你错了。如果你看一下Asti的回答,它并没有在每次重复时丢弃UDPClient。它只是一遍又一遍地调用ReceiveAsync。这全部都被Using运算符包装起来,它只在注册被删除时才处置UDP客户端。 - bradgonesurfing
1
请原谅,我只向上滚动到“传统方式”。 - Lee Campbell

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接