将大量的可观察对象聚合成一个新的可观察对象

6
我有1000个可观察对象。现在,我想将所有事件聚合到一个新的可观察对象中,只有当其他所有对象都发送了事件时,该对象才会触发OnNext。使用Rx,最好的方法是什么?
更新: 在Rx论坛上得到了一些很好的反馈,特别是Dave Sexton的回答。他展示了如何创建一个Zip扩展方法来处理多个可观察对象:http://social.msdn.microsoft.com/Forums/en-US/rx/thread/daaa84db-b560-4eda-871e-e523098db20c/

1000个可观察量的类型都相同吗?你认为聚合可观察量的类型应该是什么? - Richard Anthony Freeman-Hein
所有的1000个可观察对象都是相同类型,新聚合可以是一个新类型。例如,Event 变为 AggregateEvent。 - lukebuehler
你想要仅合并它们的最新值吗?例如,如果Observable a触发了两个事件并且Observable b只触发了一个事件,您想使用a的第一个事件或最后一个事件与b的事件聚合吗? - Richard Anthony Freeman-Hein
@Richard Hein 让我们从获取 a 中的最后一个事件开始,因为我们可以假设它们是有序到达的,即在某些可观察对象触发新事件之前,所有 1000 个事件都已到达。但是,将它们配对以使第一个 a 事件与第一个 b 事件相匹配当然会更好。 - lukebuehler
我认为你最好在Rx论坛上问这个问题。 - Richard Anthony Freeman-Hein
你需要在源 observable 发出的每个值后都有输出,还是仅在第一个值后? - Richard Szalay
1个回答

2

在F#中有一个MailboxProcessor……在C#中,我会使用SynchronizationContext来实现相同的目的。请给我几分钟时间,我会写出一个示例。

另外:这是我的F#代码,实现了类似的功能……在C#中需要更多的努力,但使用Rx仍然可以完成。

open System.Diagnostics

let numWorkers = 20
let asyncDelay = 100

type MessageForMailbox =
   | DataMessage of AsyncReplyChannel<unit>
   | GetSummary of AsyncReplyChannel<unit>

let main =
   let actor =
      MailboxProcessor.Start( fun inbox ->
         let rec loop acc =
            async {
               let! message = inbox.Receive()
               match message with
               | DataMessage replyChannel -> replyChannel.Reply(); return! loop acc
               | GetSummary replyChannel -> replyChannel.Reply(); return! loop acc
            }

         loop 0 // seed for acc
      )

   let codeBlocks = [for i in 1..numWorkers -> 
                        async {
                           do! Async.Sleep asyncDelay
                           return! actor.PostAndAsyncReply DataMessage
                        } ]

   while true do
      printfn "Concurrent started..."
      let sw = new Stopwatch()
      sw.Start()
      codeBlocks |> Async.Parallel |> Async.RunSynchronously |> ignore
      actor.PostAndReply GetSummary
      sw.Stop()
      printfn "Concurrent in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * 100) / sw.ElapsedMilliseconds)

      printfn "Synchronous started..."
      let sw = new Stopwatch()
      sw.Start()
      for codeBlock in codeBlocks do codeBlock |> Async.RunSynchronously |> ignore
      sw.Stop()
      printfn "Synchronous in %d millisec" sw.ElapsedMilliseconds
      printfn "efficiency: %d%%" (int64 (asyncDelay * numWorkers * 100) / sw.ElapsedMilliseconds)

main

嗯,你的意思是使用SynchronizationContext.Send()来同步所有创建事件的可观察对象?我有点明白你的F#代码是做什么的,但我不够精通,无法完全理解它。 - lukebuehler
我觉得你明白了。RunSynchronously使用异步工作流实现了ForkJoin。 - GregC
+1:我以前从未见过一个好的MailboxProcessor示例。 :) - Tuomas Hietanen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接