在Haskell中处理事件流

6
我希望处理通过MQTT接收到的事件流。我使用的库使用回调来提供结果。我所做的处理不仅依赖于最新的事件,还依赖于之前的状态。而且未来的事件可能会从其他来源收集。
首先,我决定将其组成列表,这听起来是个好主意。我遇到了小问题,因为IO阻止了惰性求值,并且等待无限流可能需要很长时间,但我用交错IO解决了这个问题。 stream :: IO [Event] 让我能够做很多不错的事情,比如 foldlfoldMmapmapM 等等... 不幸的是,采用这种方法,我不太可能组合两个流,因为那里没有更多的锁定特性。
我浏览了很多库,例如 STM 和 TQueue,但不是我想要的。
我决定创建自定义类型并使其成为可折叠的,以便能够对其进行折叠。但由于IO的原因,我失败了。
import Control.Concurrent.STM

newtype Stream a = Stream (STM a)

runStream
  :: ((a -> IO ()) -> IO i)
  -> IO (Stream a)
runStream block = do
  queue <- newTQueueIO
  block (atomically . writeTQueue queue)
  return $ Stream (readTQueue queue)

foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
  n <- atomically read
  m <- f n s
  foldStream f m (Stream read)

mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read

zipStream :: [Stream a] -> Stream a
zipStream = undefined

这可以像这样使用:main = foldStream (\x _ -> print x) () =<< events

是否可能实现一些基础类,使其能够像常规列表一样使用此流?


我真的不认为在events :: IO [Event]中使用惰性IO是一个好主意。你有考虑过使用像streaming这样的库吗? - HTNW
@HTNW 是的,但我不知道如何使用它来实现runStream(net-mqtt通过回调返回数据),以及如何压缩/组合/选择流列表(获取第一个可用元素,然后是下一个,...,类似于orElse :: STM a-> STM a-> STM a)。有什么想法吗? - majkrzak
1个回答

4
在这种情况下通常的技巧是让回调函数写入队列,然后从队列的另一端读取。 使用stm-chans包中的bounded, closeable队列,我们可以定义这个函数:
import Control.Concurrent.STM
import Control.Concurrent.STM.TBMQueue

foldQueue :: TBMQueue a -> (x -> a -> IO x) -> IO x -> (x -> IO b) -> IO b
foldQueue queue step start done =
    let go state = 
            do m <- atomically (readTBMQueue queue)
               case m of 
                   Nothing -> done state
                   Just a  -> step state a >>= go
     in start >>= go

它需要信道、一个阶跃函数(类似于foldM所需的函数)、一个获取初始状态的操作和一个返回最终结果的“完成”操作,然后从信道中提供数据,直到它关闭。请注意,调用者选择foldQueue的折叠状态x
如果以后我们想要升级到foldl包中的单子折叠——它们具有非常有用的Applicative实例——我们可以这样做:
import qualified Control.Foldl as L

foldQueue' :: TBMQueue a -> L.FoldM IO a b -> IO b 
foldQueue' queue = L.impurely (foldQueue queue)

使用“foldl”包中的impurely

有时(例如解析、分组或解码时)使用基于拉取的消费者更容易。我们可以使用streaming包来实现:

import Streaming
import qualified Streaming.Prelude as S

foldQueue' :: TBMQueue a -> (Stream (Of a) IO () -> IO r) -> IO r
foldQueue' queue consume = consume (S.untilRight (do
    m <- atomically (readTBMQueue queue)
    return (case m of
        Nothing -> Right ()
        Just a -> Left a)))

给定一个消耗流的函数,我们将从队列中读取的值的流提供给它。
通常,从通道读取和写入必须在不同的线程中进行。我们可以使用 concurrently 等函数来清晰地处理它,这些函数来自 async

使用concurrently协调处理阶段的示例:https://stackoverflow.com/a/50307727/1364288 - danidiaz

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接