将 IObservable<Task<T>> 转换为保持顺序的 IObservable<T>

7

有没有一种方法可以将IObservable<Task<T>>解包成IObservable<T>,同时保持相同的事件顺序,像这样?

Tasks:  ----a-------b--c----------d------e---f---->
Values: -------A-----------B--C------D-----E---F-->

假设我有一个桌面应用程序,它消费消息流,其中一些消息需要进行重度后处理:

IObservable<Message> streamOfMessages = ...;

IObservable<Task<Result>> streamOfTasks = streamOfMessages
    .Select(async msg => await PostprocessAsync(msg));

IObservable<Result> streamOfResults = ???; // unwrap streamOfTasks

我可以想象两种处理方式。
首先,我可以使用异步事件处理程序订阅streamOfTasks:
streamOfTasks.Subscribe(async task =>
{
    var result = await task;
    Display(result);
});

其次,我可以使用 Observable.CreatestreamOfTasks 转换成如下形式:

var streamOfResults =
    from task in streamOfTasks
    from value in Observable.Create<T>(async (obs, cancel) =>
    {
        var v = await task;
        obs.OnNext(v);

        // TODO: don't know when to call obs.OnComplete()
    })
    select value;

streamOfResults.Subscribe(result => Display(result));

无论哪种方式,消息的顺序都不会被保留:一些不需要任何后处理的较晚的消息比需要后处理的早期消息更快地出现。我的两个解决方案都可以并行处理传入的消息,但我希望它们按顺序逐个处理。
我可以编写一个简单的任务队列,一次只处理一个任务,但这可能有点过度杀伤力。对我来说,似乎我错过了一些显而易见的东西。
更新。我编写了一个示例控制台程序来演示我的方法。到目前为止,所有解决方案都没有保留原始事件的顺序。以下是程序的输出:
Timer: 0
Timer: 1
Async handler: 1
Observable.Create: 1
Observable.FromAsync: 1
Timer: 2
Async handler: 2
Observable.Create: 2
Observable.FromAsync: 2
Observable.Create: 0
Async handler: 0
Observable.FromAsync: 0

以下是完整的源代码:

// "C:\Program Files (x86)\MSBuild\14.0\Bin\csc.exe" test.cs /r:System.Reactive.Core.dll /r:System.Reactive.Linq.dll /r:System.Reactive.Interfaces.dll

using System;
using System.Reactive;
using System.Reactive.Concurrency;
using System.Reactive.Linq;
using System.Threading.Tasks;

class Program
{
    static void Main()
    {
        Console.WriteLine("Press ENTER to exit.");

        // the source stream
        var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromSeconds(1));
        timerEvents.Subscribe(x => Console.WriteLine($"Timer: {x}"));

        // solution #1: using async event handler
        timerEvents.Subscribe(async x =>
        {
            var result = await PostprocessAsync(x);
            Console.WriteLine($"Async handler: {x}");
        });

        // solution #2: using Observable.Create
        var processedEventsV2 =
            from task in timerEvents.Select(async x => await PostprocessAsync(x))
            from value in Observable.Create<long>(async (obs, cancel) =>
            {
                var v = await task;
                obs.OnNext(v);
            })
            select value;
        processedEventsV2.Subscribe(x => Console.WriteLine($"Observable.Create: {x}"));

        // solution #3: using FromAsync, as answered by @Enigmativity
        var processedEventsV3 =
            from msg in timerEvents
            from result in Observable.FromAsync(() => PostprocessAsync(msg))
            select result;

        processedEventsV3.Subscribe(x => Console.WriteLine($"Observable.FromAsync: {x}"));

        Console.ReadLine();
    }

    static async Task<long> PostprocessAsync(long x)
    {
        // some messages require long post-processing
        if (x % 3 == 0)
        {
            await Task.Delay(TimeSpan.FromSeconds(2.5));
        }

        // and some don't
        return x;
    }
}

3
为什么人们投票关闭这个问题而不留下评论? - satnhak
@yallie - 你能解释一下你所说的“事件顺序”是什么意思吗?是指消息创建的顺序而不是任务完成的顺序吗? - Enigmativity
@Enigmativity 是的,确切地说。我需要保持消息到达的顺序,而不管处理它们所需的时间。 - yallie
我写了一个扩展来做这件事情 - 我想将源代码和处理后的结果一起压缩 - 并且顺序很重要。我会看看能否找到它。 - Enigmativity
@Enigmativity 那太好了! - yallie
5个回答

3
RX库包含三个操作符,可以解开一个可观察的任务序列,它们分别是ConcatMergeSwitch。它们都接受一个类型为IObservable<Task<T>>的单一source参数,并返回一个IObservable<T>。以下是它们在文档中的描述: Concat

连接所有任务结果,只要前一个任务成功终止。

Merge

将来自所有源任务的结果合并成单个可观察序列。

Switch 将一个任务的可观察序列转换为仅产生最近的可观察序列的值的可观察序列。每次接收到新任务时,都会忽略上一个任务的结果。
换句话说,`Concat` 按其原始顺序返回结果,`Merge` 按完成顺序返回结果,而 `Switch` 过滤掉在发出下一个任务之前未完成的任务的任何结果。因此,您可以通过使用内置的 `Concat` 运算符来解决问题。不需要自定义运算符。
var streamOfResults = streamOfTasks
    .Select(async task =>
    {
        var result1 = await task;
        var result2 = await PostprocessAsync(result1);
        return result2;
    })
    .Concat();

任务在它们被 streamOfTasks 发出之前就已经开始了。换句话说,它们处于 "热" 状态。因此,Concat 操作符等待它们一个接一个地出现对操作的并发性没有影响,只影响它们结果的顺序。如果你有冷的可观察对象,比如由 Observable.FromAsyncObservable.Create 方法创建的对象,那么这将是一个考虑因素,此时 Concat 会按顺序执行操作。

2

结合@Enigmativity的简单方法,以及@VMAtm的附加计数器的想法和这个SO问题中的一些代码片段,我提出了这个解决方案:

// usage
var processedStream = timerEvents.SelectAsync(async t => await PostprocessAsync(t));

processedStream.Subscribe(x => Console.WriteLine($"Processed: {x}"));

// my sample console program prints the events ordered properly:
Timer: 0
Timer: 1
Timer: 2
Processed: 0
Processed: 1
Processed: 2
Timer: 3
Timer: 4
Timer: 5
Processed: 3
Processed: 4
Processed: 5
....

这是我的 SelectAsync 扩展方法,可以将 IObservable<Task<TSource>> 转换为 IObservable<TResult>,并保持事件的原始顺序:
public static IObservable<TResult> SelectAsync<TSource, TResult>(
    this IObservable<TSource> src,
    Func<TSource, Task<TResult>> selectorAsync)
{
    // using local variable for counter is easier than src.Scan(...)
    var counter = 0;
    var streamOfTasks =
        from source in src
        from result in Observable.FromAsync(async () => new
        {
            Index = Interlocked.Increment(ref counter) - 1,
            Result = await selectorAsync(source)
        })
        select result;

    // buffer the results coming out of order
    return Observable.Create<TResult>(observer =>
    {
        var index = 0;
        var buffer = new Dictionary<int, TResult>();

        return streamOfTasks.Subscribe(item =>
        {
            buffer.Add(item.Index, item.Result);

            TResult result;
            while (buffer.TryGetValue(index, out result))
            {
                buffer.Remove(index);
                observer.OnNext(result);
                index++;
            }
        });
    });
}

我并不是很满意我的解决方案,因为它看起来太复杂了,但至少它不需要任何外部依赖。我在这里使用了一个简单的字典来缓存和重新排序任务结果,因为订阅者不需要线程安全(订阅永远不会同时调用)。
欢迎任何评论或建议。我仍然希望找到本地RX的方法来完成这个任务,而无需使用自定义缓冲扩展方法。

1
以下简单的方法是否对您有帮助?
IObservable<Result> streamOfResults =
    from msg in streamOfMessages
    from result in Observable.FromAsync(() => PostprocessAsync(msg))
    select result;

不,我已经尝试过这种方法,但对我来说并不起作用。它会在第一个需要后处理的消息上停止(即在第一个异步完成的任务上)。 - yallie
我将代码复制到控制台应用程序中,现在它不再停止,所以我猜这是一个UI线程问题。但是代码的行为与我的两个解决方案相同,即事件的正确顺序没有得到保留。 - yallie

1
为了保持事件的顺序,您可以将流导入TPL Dataflow中的TransformBlockTransformBlock 将执行您的后处理逻辑,并默认按顺序维护其输出。
using System;
using System.Collections.Generic;
using System.Reactive.Linq;
using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;
using NUnit.Framework;

namespace HandlingStreamInOrder {

    [TestFixture]
    public class ItemHandlerTests {

        [Test]
        public async Task Items_Are_Output_In_The_Same_Order_As_They_Are_Input() {
            var itemHandler = new ItemHandler();
            var timerEvents = Observable.Timer(TimeSpan.Zero, TimeSpan.FromMilliseconds(250));
            timerEvents.Subscribe(async x => {
                var data = (int)x;
                Console.WriteLine($"Value Produced: {x}");                
                var dataAccepted = await itemHandler.SendAsync((int)data);
                if (dataAccepted) {
                    InputItems.Add(data);
                }                
            });

            await Task.Delay(5000);
            itemHandler.Complete();
            await itemHandler.Completion;

            CollectionAssert.AreEqual(InputItems, itemHandler.OutputValues);
        }

        private IList<int> InputItems {
            get;
        } = new List<int>();
    }

    public class ItemHandler {


        public ItemHandler() {            
            var options = new ExecutionDataflowBlockOptions() {
                BoundedCapacity = DataflowBlockOptions.Unbounded,
                MaxDegreeOfParallelism = Environment.ProcessorCount,
                EnsureOrdered = true
            };
            PostProcessBlock = new TransformBlock<int, int>((Func<int, Task<int>>)PostProcess, options);

            var output = PostProcessBlock.AsObservable().Subscribe(x => {
                Console.WriteLine($"Value Output: {x}");
                OutputValues.Add(x);
            });
        }

        public async Task<bool> SendAsync(int data) {
            return await PostProcessBlock.SendAsync(data);
        }

        public void Complete() {
            PostProcessBlock.Complete();
        }

        public Task Completion {
            get { return PostProcessBlock.Completion; }
        }

        public IList<int> OutputValues {
            get;
        } = new List<int>();

        private IPropagatorBlock<int, int> PostProcessBlock {
            get;
        }

        private async Task<int> PostProcess(int data) {
            if (data % 3 == 0) {
                await Task.Delay(TimeSpan.FromSeconds(2));
            }            
            return data;
        }
    }
}

谢谢您的建议!这种方法确实很好用。我以前从未使用过TPL DataFlow,所以需要一些时间来理解这里发生了什么。我很惊讶一个看似基本的任务除了RX还需要另一个依赖项。 - yallie
BoundedCapacity is equal to DataflowBlockOptions.Unbounded by default, as do the EnsureOrdered equal to true by default. Also there is no EnsureOrdered in the last version of the Microsoft.Tpl.Dataflow - VMAtm

1

RxTPL可以轻松结合使用TPL默认情况下会保存事件的顺序,因此您的代码可能如下所示:

using System.Threading.Tasks;
using System.Threading.Tasks.Dataflow;

static async Task<long> PostprocessAsync(long x) { ... }

IObservable<Message> streamOfMessages = ...;
var streamOfTasks = new TransformBlock<long, long>(async msg => 
    await PostprocessAsync(msg)
    // set the concurrency level for messages to handle
    , new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = Environment.ProcessorCount });
// easily convert block into observable
IObservable<long> streamOfResults = streamOfTasks.AsObservable();
编辑: Rx扩展旨在为UI创建反应式事件管道。由于此类应用程序通常是单线程的,因此消息处理时需要保存顺序。但是,一般来说C#中的事件不是线程安全的,因此您需要提供一些额外的逻辑来确保顺序相同。

如果您不喜欢引入其他依赖项的想法,则需要使用Interlocked类存储操作编号,类似于以下内容:

// counter for operations get started
int operationNumber = 0;
// counter for operations get done
int doneNumber = 0;
...
var currentOperationNumber = Interlocked.Increment(ref operationNumber);
...
while (Interlocked.CompareExchange(ref doneNumber, currentOperationNumber + 1, currentOperationNumber) != currentOperationNumber)
{
    // spin once here
}
// handle event
Interlocked.Increment(ref doneNumber);

非常感谢!我其实很惊讶,使用RX自己提供的工具无法完成这个任务。我在想,保留事件顺序是不是对RX来说不是惯用方法? - yallie
@yallie更新了答案。在C#中,事件默认情况下不是线程安全的,因为它们由委托列表表示,所以您需要一些额外的同步逻辑。 - VMAtm
我不太明白事件线程安全和可观察序列产生的事件顺序之间的联系。John Skeet的文章是关于订阅和调用周围可能发生的竞态条件。在我的情况下,没有竞争条件,因为所有订阅都在程序开始时完成。我喜欢计算源事件以匹配源事件(任务)与其后处理结果的想法,但我不确定是否有RX原语可以执行这样的匹配。 - yallie
1
不太对。多播委托按顺序一个接一个地在调用委托的线程上被调用。如果事件处理程序引发异常,则整个调用链将被中止。因此,在调用期间不存在竞争条件。这是演示:https://gist.github.com/yallie/fd1105ba98919006d3e93d956f705ef8 - yallie
@yallie 我觉得我的英语不足以解释我想表达的意思。我只是试图通过比较Rx设计和C#事件来解释它。就我所知,与C#本机事件相比,Rx确实以不同的方式处理事件。 - VMAtm
显示剩余3条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接