我希望处理通过MQTT接收到的事件流。我使用的库使用回调来提供结果。我所做的处理不仅依赖于最新的事件,还依赖于之前的状态。而且未来的事件可能会从其他来源收集。
首先,我决定将其组成列表,这听起来是个好主意。我遇到了小问题,因为IO阻止了惰性求值,并且等待无限流可能需要很长时间,但我用交错IO解决了这个问题。
我浏览了很多库,例如 STM 和 TQueue,但不是我想要的。
我决定创建自定义类型并使其成为可折叠的,以便能够对其进行折叠。但由于IO的原因,我失败了。
首先,我决定将其组成列表,这听起来是个好主意。我遇到了小问题,因为IO阻止了惰性求值,并且等待无限流可能需要很长时间,但我用交错IO解决了这个问题。
stream :: IO [Event]
让我能够做很多不错的事情,比如 foldl
、foldM
、map
、mapM
等等... 不幸的是,采用这种方法,我不太可能组合两个流,因为那里没有更多的锁定特性。我浏览了很多库,例如 STM 和 TQueue,但不是我想要的。
我决定创建自定义类型并使其成为可折叠的,以便能够对其进行折叠。但由于IO的原因,我失败了。
import Control.Concurrent.STM
newtype Stream a = Stream (STM a)
runStream
:: ((a -> IO ()) -> IO i)
-> IO (Stream a)
runStream block = do
queue <- newTQueueIO
block (atomically . writeTQueue queue)
return $ Stream (readTQueue queue)
foldStream :: (a -> b -> IO b) -> b -> Stream a -> IO b
foldStream f s (Stream read) = do
n <- atomically read
m <- f n s
foldStream f m (Stream read)
mapStream :: (a -> b) -> Stream a -> Stream b
mapStream f (Stream read) = Stream $ f <$> read
zipStream :: [Stream a] -> Stream a
zipStream = undefined
这可以像这样使用:main = foldStream (\x _ -> print x) () =<< events
是否可能实现一些基础类,使其能够像常规列表一样使用此流?
events :: IO [Event]
中使用惰性IO是一个好主意。你有考虑过使用像streaming
这样的库吗? - HTNWrunStream
(net-mqtt通过回调返回数据),以及如何压缩/组合/选择流列表(获取第一个可用元素,然后是下一个,...,类似于orElse :: STM a-> STM a-> STM a)。有什么想法吗? - majkrzak