使用流(lazy lists)可以从若干个源(比如说两个,为了简单起见)按需获取数据。迭代器(Iteratee)可以用于处理来自单个源的数据。
是否存在一种类似于迭代器的函数概念,用于处理多个输入源?我可以想象一个迭代器,其状态信号指示它要从哪个源拉取数据。
使用流(lazy lists)可以从若干个源(比如说两个,为了简单起见)按需获取数据。迭代器(Iteratee)可以用于处理来自单个源的数据。
是否存在一种类似于迭代器的函数概念,用于处理多个输入源?我可以想象一个迭代器,其状态信号指示它要从哪个源拉取数据。
使用管道实现此操作时,您将 Pipe Monad Transformer 嵌套在自身中,每个要交互的生产者都需要嵌套一次。例如:
import Control.Monad
import Control.Monad.Trans
import Control.Pipe
producerA, producerB :: (Monad m) => Producer Int m ()
producerA = mapM_ yield [1,2,3]
producerB = mapM_ yield [4,5,6]
consumes2 :: (Show a, Show b) =>
Consumer a (Consumer b IO) r
consumes2 = forever $ do
a <- await -- await from outer producer
b <- lift await -- await from inner producer
lift $ lift $ print (a, b)
就像 Haskell 中的多变量柯里化函数一样,你可以使用组合和 runPipe 部分地将其应用于每个源:
consumes1 :: (Show b) => Consumer b IO ()
consumes1 = runPipe $ consumes2 <+< producerA
fullyApplied :: IO ()
fullyApplied = runPipe $ consumes1 <+< producerB
运行上述函数时,输出如下:
>>> fullyApplied
(1, 4)
(2, 5)
(3, 6)
这个技巧适用于任意数量的上游或下游管道的产出或等待。它也适用于代理,即管道的双向模拟。
注意:这也适用于任何iteratee库,而不仅仅是pipes
。事实上,John Milikin和Oleg是最初倡导这种方法的人,我只是从他们那里偷了这个想法。
Tee
模块上:mergeOuterJoin
和hashJoin
。以下是hashJoin
的代码(假设两个流已排序):/**
* A natural hash join according to keys of type `K`.
*/
def hashJoin[A, B, K](f: A => K, g: B => K): Tee[A, B, (A, B)] = {
def build(m: Map[K, A]): Plan[T[A, B], Nothing, Map[K, A]] = (for {
a <- awaits(left[A])
mp <- build(m + (f(a) -> a))
} yield mp) orElse Return(m)
for {
m <- build(Map())
r <- (awaits(right[B]) flatMap (b => {
val k = g(b)
if (m contains k) emit(m(k) -> b) else Return(())
})) repeatedly
} yield r
}
repeatedly
方法将其编译为一个Machine
。此处构建的类型是Tee[A, B, (A, B)]
,它是一台带有两个输入的机器。您可以使用awaits(left)
和awaits(right)
在左侧和右侧请求输入,并使用emit
输出。同时,还有Haskell版本的Machines。导管(也可以为管道构建,但该代码尚未发布)具有一个zip
原语,它将两个上游合并为元组流。
import Control.Pipe
import Control.Monad
import Control.Monad.State
import Data.Void
source0, source1 :: Producer Char IO ()
source0 = mapM_ yield "say"
source1 = mapM_ yield "what"
sink :: Show b => Consumer b IO ()
sink = forever $ await >>= \x -> lift $ print x
pipeline :: Pipe () Void IO ()
pipeline = sink <+< (source0 >> source1)
序列操作符(>>)
垂直连接源,产生输出(在runPipe
上)
's'
'a'
'y'
'w'
'h'
'a'
't'