在我的 GHC Haskell 应用程序中,我使用 stm、network-conduit 和 conduit,为每个套接字都创建了一个分支线程,这是通过 runTCPServer 自动完成的。不同的分支线程可以通过广播 TChan 进行通信。
下面是我想要设置 conduit “链”的示例图: 所以,在这里我们有两个源(每个源都绑定到辅助 conduit),它们会产生一个 Packet 对象,encoder 会将其转换为 ByteString,并发送到套接字上。我在实现这两个输入的高效融合方面遇到了很大的困难(性能是一项考虑因素)。
如果有人能够指点我正确的方向,我将不胜感激。
最后,由于没有进行尝试就发布问题会显得无礼,所以我会在这里放出我之前尝试过的内容:
我编写/挑选了一个函数,从 TMChan(可关闭的通道)生成 Source(阻塞)。
下面是我想要设置 conduit “链”的示例图: 所以,在这里我们有两个源(每个源都绑定到辅助 conduit),它们会产生一个 Packet 对象,encoder 会将其转换为 ByteString,并发送到套接字上。我在实现这两个输入的高效融合方面遇到了很大的困难(性能是一项考虑因素)。
如果有人能够指点我正确的方向,我将不胜感激。
最后,由于没有进行尝试就发布问题会显得无礼,所以我会在这里放出我之前尝试过的内容:
我编写/挑选了一个函数,从 TMChan(可关闭的通道)生成 Source(阻塞)。
-- | Takes a generic type of STM chan and, given read and close functionality,
-- returns a conduit 'Source' which consumes the elements of the channel.
chanSource
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> STM (Maybe b)) -- ^ The read function
-> (a -> STM ()) -- ^ The close/finalizer function
-> Source m b
chanSource ch readCh closeCh = ConduitM pull
where close = liftSTM $ closeCh ch
pull = PipeM $ liftSTM $ readCh ch >>= translate
translate = return . maybe (Done ()) (HaveOutput pull close)
同样,将一个Chan转换为一个sink的函数;
-- | Takes a stream and, given write and close functionality, returns a sink
-- which wil consume elements and broadcast them into the channel
chanSink
:: (MonadIO m, MonadSTM m)
=> a -- ^ The channel
-> (a -> b -> STM()) -- ^ The write function
-> (a -> STM()) -- ^ The close/finalizer function
-> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
where close = const . liftSTM $ closeCh ch
sink = NeedInput push close
write = liftSTM . writeCh ch
push x = PipeM $ write x >> return sink
然后mergeSources很简单;开启2个线程(我真的不想这样做,但无妨),它们可以将自己的新项放入一个列表中,然后我会生成一个源;
-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
-- a source which consumes the elements of the channel.
mergeSources
:: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
=> [Source (ResourceT m) a] -- ^ The list of sources
-> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
where push c s = s $$ chanSink c writeTMChan closeTMChan
fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
retn c = return $ chanSource c readTMChan closeTMChan
尽管我成功地使这些函数通过了类型检查,但我无法让这些函数的任何使用通过类型检查。
-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
-- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
mergsrc $$ protocol $= encoder =$ appSink appdata
where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
mergsrc = mergeSources [appSource appdata $= decoder, chansrc]
-- | Structure which holds mutable information for clients
data SessionState = SessionState
{ _ssBroadcast :: TMChan Packet -- ^ Outbound packet broadcast channel
}
makeLenses ''SessionState
-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)
-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO
无论如何,我认为这种方法存在缺陷 - 存在许多中间列表和转换。 这对性能不利。 寻求指导。
附注:据我所了解,这不是重复的情况;将具有多个输入的conduits融合,因为在我的情况下,两个来源都会产生相同类型的内容,并且我不关心从哪个来源生成Packet
对象,只要我不等待一个同时另一个已经准备好被消耗的对象即可。
PPS. 对于在示例代码中使用Lens(因此需要了解)我表示歉意。
stm-conduit
包中的Data.Conduit.TMChan
呢?它拥有你定义的所有函数,包括mergeSources
。 - Gabriella GonzalezdecRefCount refcount
这一部分的代码以关闭所有来源,会发生什么? - Iain