一个处理通道,2个相同类型的IO源

61
在我的 GHC Haskell 应用程序中,我使用 stm、network-conduit 和 conduit,为每个套接字都创建了一个分支线程,这是通过 runTCPServer 自动完成的。不同的分支线程可以通过广播 TChan 进行通信。
下面是我想要设置 conduit “链”的示例图: enter image description here 所以,在这里我们有两个源(每个源都绑定到辅助 conduit),它们会产生一个 Packet 对象,encoder 会将其转换为 ByteString,并发送到套接字上。我在实现这两个输入的高效融合方面遇到了很大的困难(性能是一项考虑因素)。
如果有人能够指点我正确的方向,我将不胜感激。
最后,由于没有进行尝试就发布问题会显得无礼,所以我会在这里放出我之前尝试过的内容:
我编写/挑选了一个函数,从 TMChan(可关闭的通道)生成 Source(阻塞)。
-- | Takes a generic type of STM chan and, given read and close functionality,
--   returns a conduit 'Source' which consumes the elements of the channel.
chanSource 
    :: (MonadIO m, MonadSTM m)
    => a                    -- ^ The channel
    -> (a -> STM (Maybe b)) -- ^ The read function
    -> (a -> STM ())        -- ^ The close/finalizer function
    -> Source m b
chanSource ch readCh closeCh = ConduitM pull
    where close     = liftSTM $ closeCh ch
          pull      = PipeM $ liftSTM $ readCh ch >>= translate
          translate = return . maybe (Done ()) (HaveOutput pull close)

同样,将一个Chan转换为一个sink的函数;

-- | Takes a stream and, given write and close functionality, returns a sink
--   which wil consume elements and broadcast them into the channel 
chanSink
    :: (MonadIO m, MonadSTM m)
    => a                 -- ^ The channel
    -> (a -> b -> STM()) -- ^ The write function
    -> (a -> STM())      -- ^ The close/finalizer function
    -> Sink b m ()
chanSink ch writeCh closeCh = ConduitM sink
    where close  = const . liftSTM $ closeCh ch
          sink   = NeedInput push close
          write  = liftSTM . writeCh ch
          push x = PipeM $ write x >> return sink

然后mergeSources很简单;开启2个线程(我真的不想这样做,但无妨),它们可以将自己的新项放入一个列表中,然后我会生成一个源;

-- | Merges a list of 'Source' objects, sinking them into a 'TMChan' and returns
--   a source which consumes the elements of the channel.
mergeSources
    :: (MonadIO m, MonadBaseControl IO m, MonadSTM m)
    => [Source (ResourceT m) a]             -- ^ The list of sources
    -> ResourceT m (Source (ResourceT m) a)
mergeSources sx = liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    where push c s = s $$ chanSink c writeTMChan closeTMChan
          fsrc x c = mapM_ (\s -> resourceForkIO $ push c s) x
          retn c   = return $ chanSource c readTMChan closeTMChan

尽管我成功地使这些函数通过了类型检查,但我无法让这些函数的任何使用通过类型检查。

-- | Helper which represents a conduit chain for each client connection
serverApp :: Application SessionIO
serverApp appdata = do
    use ssBroadcast >>= liftIO . atomically . dupTMChan >>= assign ssBroadcast
    -- appSource appdata $$ decoder $= protocol =$= encoder =$ appSink appdata
    mergsrc $$ protocol $= encoder =$ appSink appdata
    where chansrc = chanSource (use ssBroadcast) readTMChan closeTMChan
          mergsrc = mergeSources [appSource appdata $= decoder, chansrc]

-- | Structure which holds mutable information for clients
data SessionState = SessionState
    { _ssBroadcast     :: TMChan Packet -- ^ Outbound packet broadcast channel
    }

makeLenses ''SessionState

-- | A transformer encompassing both SessionReader and SessionState
type Session m = ReaderT SessionReader (StateT SessionState m)

-- | Macro providing Session applied to an IO monad
type SessionIO = Session IO

无论如何,我认为这种方法存在缺陷 - 存在许多中间列表和转换。 这对性能不利。 寻求指导。


附注:据我所了解,这不是重复的情况;将具有多个输入的conduits融合,因为在我的情况下,两个来源都会产生相同类型的内容,并且我不关心从哪个来源生成Packet对象,只要我不等待一个同时另一个已经准备好被消耗的对象即可。

PPS. 对于在示例代码中使用Lens(因此需要了解)我表示歉意。


2
你为什么不使用stm-conduit包中的Data.Conduit.TMChan呢?它拥有你定义的所有函数,包括mergeSources - Gabriella Gonzalez
实际上有一个问题 - 我希望合并两个源的源代码尽快关闭。stm-conduit包使用引用计数(并等待最后一个源关闭以关闭结果源),这不是期望的行为。在任一源失效后立即关闭,使我能够在关闭全局TMChan时及时关闭每个套接字。 - kvanbere
3
一个空想:如果你从TMChan获取mergeSources,抛弃引用计数的内容,并替换decRefCount refcount这一部分的代码以关闭所有来源,会发生什么? - Iain
我(有点)尝试过上面的方法,但是在编译github上相应stm-conduit函数的版本时遇到了类型检查问题,所以不得不对它们进行大量修改(请参见OP)。今晚我会再次尝试使用hackage上的版本。最近我一直在思考这个问题,一个中间的Chan可能真的是必要的。 - kvanbere
1个回答

1

我不知道这是否有帮助,但我尝试了实现Iain的建议并创建了一个mergeSources'的变体,一旦任何一个通道停止,它就会立即停止:

mergeSources' :: (MonadIO m, MonadBaseControl IO m)
              => [Source (ResourceT m) a] -- ^ The sources to merge.
              -> Int -- ^ The bound of the intermediate channel.
              -> ResourceT m (Source (ResourceT m) a)
mergeSources' sx bound = do
    c <- liftSTM $ newTBMChan bound
    mapM_ (\s -> resourceForkIO $
                    s $$ chanSink c writeTBMChan closeTBMChan) sx
    return $ sourceTBMChan c

这个简单的加法可以在这里找到。

以下是对您的mergeSources版本的一些评论(请谨慎对待,因为我可能没有完全理解某些内容):

  • 使用...TMChan而不是...TBMChan似乎很危险。如果写入者比读取者更快,那么您的堆将炸。从您的图表上看,如果您的TCP对等端不足够快地读取数据,这种情况可能很容易发生。因此,我肯定会使用...TBMChan,并且可能具有大但有限的界限。
  • 您不需要MonadSTM m约束。所有STM内容都包装在IO中,其中

    liftSTM = liftIO . atomically
    

    也许在使用serverApp中的mergeSources'时,这会稍微帮助您。

  • 只是一个美观问题,我发现

    liftSTM newTMChan >>= liftA2 (>>) (fsrc sx) retn
    

    由于其在(->) r单子上使用了liftA2,所以很难阅读。我的意思是

    
        c <- liftSTM newTMChan
        fsrc sx c
        retn c
    

    会更长,但更容易阅读。

你能否创建一个自包含的项目,以便可以与serverApp进行交互?


谢谢你的建议。我会记在心里的(我很快就要重新审视这个问题)。 - kvanbere

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接