我正在尝试理解约翰·休斯著名的《将箭头泛化为单子》 ("Generalising Arrows to Monads") 中的“Streams as arrows”部分。更确切地说,我对编写斐波那契流感兴趣。
我稍微调整了休斯的定义:
data StreamProcessor a b = Get (a -> StreamProcessor a b) |
Put b (StreamProcessor a b) |
Halt
put = Put
get = Get
首先,我将流处理器视为可能会阻塞(等待输入)的列表。也就是说:
Put :: b -> StreamProcessor a b -> StreamProcessor a b
匹配(:) :: a -> [a] -> [a]
;Halt :: StreamProcessor a b
匹配[] :: [a]
;Get :: (a -> StreamProcessor a b) -> StreamProcessor a b
帮助我们阻塞流并等待输入。
Get
,我们得到一个类似于列表的结构。如果我们还去掉Halt
,我们得到一个类似于无限列表的结构。以下是我理解“斐波那契数列的流”两种方式:
a non-blocked infinite stream (infinite-list-like):
zipNonBlockedStreamsWith :: (a -> b -> c) -> StreamProcessor () a -> StreamProcessor () b -> StreamProcessor () c zipNonBlockedStreamsWith f (Put x sp) (Put y sp') = Put (f x y) (zipNonBlockedStreamsWith f sp sp') zipNonBlockedStreamsWith f Halt sp = Halt zipNonBlockedStreamsWith f sp Halt = Halt fibs :: StreamProcessor () Int fibs = put 0 (put 1 $ zipNonBlockedStreamsWith (+) fibs (tailNonBlockedStream fibs)) -- matches a well-known definition of an infinite Fibonacci list. fibsList :: [Int] fibsList = 0 : 1 : (zipWith (+) fibsList (tail fibsList)) -- with the 'fibsList', we can use folds to do the same thing. putStream :: [b] -> StreamProcessor a b -> StreamProcessor a b putStream bs sp = foldr Put sp bs fibs' :: StreamProcessor () Int fibs' = putStream fibsList Halt
A blocked stream waits for
n
, outputs then
th Fibonacci number and blocks again. Hughes'Arrow
interface helps us express it in a quite concise way:instance Category StreamProcessor where ... instance Arrow StreamProcessor where arr f = Get $ \ a -> Put (f a) (arr f) ... fibsList :: [Int] fibsList = 0 : 1 : (zipWith (+) fibsList (tail fibsList)) blockedFibs :: StreamProcessor Int Int blockedFibs = arr (fibsList !!)
然而,在我提供的论文中,约翰·休斯展示了另一种解决方案,通过箭头
的方式进行:
instance Category StreamProcessor where
id = Get (\ x -> Put x id)
Put c bc . ab = Put c (bc . ab)
Get bbc . Put b ab = (bbc b) . ab
Get bbc . Get aab = Get $ \ a -> (Get bbc) . (aab a)
Get bbc . Halt = Halt
Halt . ab = Halt
bypass :: [b] -> [d] -> StreamProcessor b c -> StreamProcessor (b, d) (c, d)
bypass [] ds (Get f) = Get $ \ ~(b, d) -> bypass [] (ds ++ [d]) (f b)
bypass (b : bs) [] (Get f) = bypass bs [] (f b)
bypass [] (d : ds) (Put c sp) = Put (c, d) $ bypass [] ds sp
bypass bs [] (Put c sp) =
Get $ \ ~(b, d) -> Put (c, d) (bypass (bs ++ [b]) [] sp)
bypass bs ds Halt = Halt
instance Arrow StreamProcessor where
arr f = Get $ \ a -> Put (f a) (arr f)
first = bypass [] []
liftArr2 :: Arrow k => (a -> b -> c) -> k r a -> k r b -> k r c
liftArr2 f a b = a &&& b >>^ uncurry f
fibsHughes = let
fibsHughes' = put 1 (liftArr2 (+) fibsHughes fibsHughes')
in put 0 fibsHughes'
但它并不按我所期望的方式工作。下面的函数将帮助我们从流中获取值,直到它被阻塞或停止(使用
Data.List.unfoldr
):popToTheBlockOrHalt :: StreamProcessor a b -> [b]
popToTheBlockOrHalt = let
getOutput (Put x sp) = Just (x, sp)
getOutput getOrHalt = Nothing
in unfoldr getOutput
所以,这就是我们得到的:
GHCi> popToTheBlockOrHalt fibsHughes
[0, 1]
GHCi> :t fibsHughes
fibsHughes :: StreamProcessor a Integer
如果我们检查这些模式,我们会发现它会阻止(也就是说我们会遇到
Get
)。我无法确定这是否应该是这样。如果这是我们想要的,那么为什么?如果不是,问题出在哪里?我已经检查和反复检查了我编写的代码,它基本上与休斯(Hughes)文章中的定义相匹配(好吧,我不得不添加
id
和Halt
的模式 - 我看不出他们如何能搞砸这个过程)。
编辑: 正如评论中所说,在这篇论文的后期版本中,bypass
稍作修改(我们使用这个版本)。它能够累积被扣留的b和(即有两个队列),而原始论文中的bypass
只能累积(即一个队列)。
编辑 #2: 我只想写一个函数,从
fibsHughes
中弹出斐波那契数列的数字。popToTheHaltThroughImproperBlocks :: StreamProcessor () b -> [b]
popToTheHaltThroughImproperBlocks = let
getOutput (Put x sp) = Just (x, sp)
getOutput (Get c) = getOutput $ c ()
getOutput Halt = Nothing
in unfoldr getOutput
接下来开始:
GHCi> (take 10 . popToTheHaltThroughImproperBlocks) fibsHughes
[0,1,1,2,3,5,8,13,21,34]
(&&&)
的定义,但我仍在努力确定核心问题。 - Li-yao Xia