是否存在一种类似于迭代器的概念,可以从多个数据源中拉取数据?

11

使用流(lazy lists)可以从若干个源(比如说两个,为了简单起见)按需获取数据。迭代器(Iteratee)可以用于处理来自单个源的数据。

是否存在一种类似于迭代器的函数概念,用于处理多个输入源?我可以想象一个迭代器,其状态信号指示它要从哪个源拉取数据。


1
所以你有两个来源,想要从其中任何一个首先可用的地方获取事件? - Kim Stebel
从多个来源获取数据的任何方式都可能等同于将两个来源合并成一个的某种方式。有许多方法可以实现这一点;您能否澄清您要寻找的行为? - C. A. McCann
我考虑了一些通用的东西,其中汇聚可以根据状态控制拉取哪个源。具体情况是合并两个已排序的流,以便获得元素键相等的元组,并且无法配对的元素将被删除。 - ron
1
任何迭代库都可以通过简单地堆叠单子来实现这一点。Oleg首先展示了如何做到这一点(http://okmij.org/ftp/Streams.html#2enum1iter)。我已经在我的迭代包中使用了它。它适用于其中的任何一个。 - John L
@John:谢谢!顺便说一下,我记得已经听过“Oleg先展示”的短语 :) - ron
4个回答

15

使用管道实现此操作时,您将 Pipe Monad Transformer 嵌套在自身中,每个要交互的生产者都需要嵌套一次。例如:

import Control.Monad
import Control.Monad.Trans
import Control.Pipe

producerA, producerB :: (Monad m) => Producer Int m ()
producerA = mapM_ yield [1,2,3]
producerB = mapM_ yield [4,5,6]

consumes2 :: (Show a, Show b) =>
    Consumer a (Consumer b IO) r
consumes2 = forever $ do
    a <- await       -- await from outer producer
    b <- lift await  -- await from inner producer
    lift $ lift $ print (a, b)

就像 Haskell 中的多变量柯里化函数一样,你可以使用组合和 runPipe 部分地将其应用于每个源:

consumes1 :: (Show b) => Consumer b IO ()
consumes1 = runPipe $ consumes2 <+< producerA

fullyApplied :: IO ()
fullyApplied = runPipe $ consumes1 <+< producerB

运行上述函数时,输出如下:

>>> fullyApplied
(1, 4)
(2, 5)
(3, 6)

这个技巧适用于任意数量的上游或下游管道的产出或等待。它也适用于代理,即管道的双向模拟。

注意:这也适用于任何iteratee库,而不仅仅是pipes。事实上,John Milikin和Oleg是最初倡导这种方法的人,我只是从他们那里偷了这个想法。


1
我想强调的是,这适用于任何其他iteratee软件包,不仅仅是pipes。 - John L

6
我们在Scala中使用Machines来拉取任意数量的来源,而不仅仅是两个。库本身提供了二进制连接的两个示例,在Tee模块上:mergeOuterJoinhashJoin。以下是hashJoin的代码(假设两个流已排序):
/**
 * A natural hash join according to keys of type `K`.
 */
def hashJoin[A, B, K](f: A => K, g: B => K): Tee[A, B, (A, B)] = {
  def build(m: Map[K, A]): Plan[T[A, B], Nothing, Map[K, A]] = (for {
    a  <- awaits(left[A])
    mp <- build(m + (f(a) -> a))
  } yield mp) orElse Return(m)
  for {
    m <- build(Map())
    r <- (awaits(right[B]) flatMap (b => {
      val k = g(b)
      if (m contains k) emit(m(k) -> b) else Return(())
    })) repeatedly
  } yield r
}

这段代码构建了一个“计划”,使用repeatedly方法将其编译为一个Machine。此处构建的类型是Tee[A, B, (A, B)],它是一台带有两个输入的机器。您可以使用awaits(left)awaits(right)在左侧和右侧请求输入,并使用emit输出。同时,还有Haskell版本的Machines

3

导管(也可以为管道构建,但该代码尚未发布)具有一个zip原语,它将两个上游合并为元组流。


1

请查看pipes库,其中垂直连接可能会满足您的需求。例如,

import Control.Pipe
import Control.Monad
import Control.Monad.State
import Data.Void

source0, source1 :: Producer Char IO ()
source0 = mapM_ yield "say"
source1 = mapM_ yield "what"

sink :: Show b => Consumer b IO ()
sink = forever $ await >>= \x -> lift $ print x

pipeline :: Pipe () Void IO ()
pipeline = sink <+< (source0 >> source1)

序列操作符(>>)垂直连接源,产生输出(在runPipe上)

's'
'a'
'y'
'w'
'h'
'a'
't'

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接