“Monad-friendly”事件驱动的IO

13

我想要使用"epoll"样式的事件管理来实现高效的单线程套接字通信。

如果我从头开始写一个非常命令式的程序,我会基本上像这样做(只是一些我刚刚打出的伪代码 - 可能不会编译):

import Control.Concurrent

import Data.ByteString (ByteString)
import qualified Data.ByteString as ByteString

import qualified GHC.Event as Event

import Network
import Network.Socket
import Network.Socket.ByteString

main = withSocketFromSomewhere $ \ socket -> do
  let fd = fromIntegral . fdSocket $ socket

  -- Some app logic
  state <- newMVar "Bla"

  -- Event manager
  manager <- Event.new

  -- Do an initial write
  initialWrite socket state manager

  -- Manager does its thing
  Event.loop manager

write manager socket bs =
  -- Should be pretty straight-forward
  Event.registerFd manager theWrite fd Event.evtWrite
  where
    fd = fromIntegral . fdSocket $ socket
    theWrite key _ = do
      Event.unregisterFd manager key
      sendAll socket bs

read manager socket cont =
  -- Ditto
  Event.registerFd manager theRead fd Event.evtRead
  where
    fd = fromIntegral . fdSocket $ socket
    theRead key _ = do
      Event.unregisterFd manager key
      bs <- recv socket 4096
      cont bs

initialWrite socket state manager = do
  msg <- readMVar state
  write manager socket msg
  read manager socket $ \ bs -> do
    ByteString.putStrLn bs
    putMVar state msg

假设有一些函数将超时事件添加到管理器中。

但是,这段代码存在几个问题:

  1. 需要手动携带事件管理器。
  2. 我必须为应用程序逻辑使用 MVar,因为我无法告诉不透明的事件管理器它应该为我传递一些状态,尽管我知道它仅使用一个线程,因此可能被用作单子变换器栈的基础。
  3. 对于读取操作,我必须显式创建分界限延续(甚至可能要为写入操作这样做;在这种情况下,我不知道哪种方法更明智)。

现在,这就呼吁使用大量的单子变换器等。我希望能够像这样轻松地完成:

main =
  withSocketFromSomewhere $ \ socket ->
  runEvents . flip runStateT "Bla" $ initialWrite socket

initialWrite socket = do
  msg <- lift get
  write socket msg
  resp <- read socket
  liftIO $ ByteString.putStrLn resp
  lift $ put msg

这段代码应该与上面的代码具有相同的行为;例如,通过挂起计算直到在resp <- read socket行上接收到读取,让我在同一线程/管理器上管理多个套接字。

问题:

  1. 是否有更高级的接口可以访问GHC事件API/libevent或相当的接口,以便用户能够拥有更多的控制权?考虑到近期GHC中发生的同步IO调度变化(我使用的是7.4.1),这样做值得吗?
  2. 如果我想实现协作并发,例如有一个函数始终处理来自套接字的读取,但是将此函数与写“线程”共享相同的状态单子,可以使用来自问题1的任何解决方案吗?
1个回答

23
我强烈建议您阅读基于语言的方法来统一事件和线程。它讲述了如何在所选的IO子系统之上构建任何并发系统,并且在他们的论文中,他们实际上是在epoll之上实现的。
不幸的是,论文中的数据类型和代码示例非常糟糕,需要一些时间(至少对我来说)才能将其反向工程化,并且论文中甚至还有一些错误。然而,他们的方法实际上是一个非常强大和通用的方法的子集,称为“自由单子”。
例如,他们的Trace数据类型只是伪装成自由单子。为了理解这一点,让我们看一下自由单子的Haskell定义:
data Free f r = Pure r | Free (f (Free f r))

一个自由单子类似于“函子列表”,其中 Pure 类似于列表的 Nil 构造函数,Free 类似于列表的 Cons 构造函数,因为它在“列表”上添加了另一个函子。 严格来说,如果我非常追求精确,没有什么可以说明自由单子必须实现为上述类似列表的数据类型,但是无论你实现什么都必须与上述数据类型同构。
自由单子的好处在于,给定一个函子 fFree f 自动成为一个单子:
instance (Functor f) => Monad (Free f) where
    return = Pure
    Pure r >>= f = f r
    Free x >>= f = Free (fmap (>>= f) x)

这意味着我们可以将它们的 Trace 数据类型分解为两部分,即基本函子 f 和由 f 生成的自由单子:

-- The base functor
data TraceF x =
    SYS_NBIO (IO x)
  | SYS_FORK x x
  | SYS_YIELD x
  | SYS_RET
  | SYS_EPOLL_WAIT FD EPOLL_EVENT x

-- You can even skip this definition if you use the GHC
-- "DerivingFunctor" extension
instance Functor TraceF where
    fmap f (SYS_NBIO x) = SYS_NBIO (liftM f x)
    fmap f (SYS_FORK x) = SYS_FORK (f x) (f x)
    fmap f (SYS_YIELD x) = SYS_YIELD (f x)
    fmap f SYS_RET = SYS_RET
    fmap f (SYS_EPOLL_WAIT FD EPOLL_EVENT x) = SYS_EPOLL_WAIT FD EPOLL_EVEN (f x)

鉴于该函子,您可以“免费”获得Trace单子:

type Trace a = Free TraceF a
-- or: type Trace = Free TraceF

虽然这不是“自由”monad被称为“自由”的原因。

这样,定义它们所有的函数就更容易了:

liftF = Free . fmap Pure
-- if "Free f" is like a list of "f", then
-- this is sort of like: "liftF x = [x]"
-- it's just a convenience function

-- their definitions are written in continuation-passing style,
-- presumably for efficiency, but they are equivalent to these
sys_nbio io = liftF (SYS_NBIO io)
sys_fork t = SYS_FORK t (return ()) -- intentionally didn't use liftF
sys_yield = liftF (SYS_YIELD ())
sys_ret = liftF SYS_RET
sys_epoll_wait fd event = liftF (SYS_EPOLL_WAIT fd event ())

那么你可以像使用单子一样使用这些命令:

myTrace fd event = do
    sys_nbio (putStrLn "Hello, world")
    fork $ do
        sys_nbio (putStrLn "Hey")
    sys_expoll_wait fd event

现在,这里是关键概念。我刚写的那个monad只是创建了一个数据类型。仅此而已。它根本不解释它。这就像你为表达式编写抽象语法树一样。完全由您决定如何评估它。在论文中,他们给出了一个表达式解释器的具体示例,但编写自己的解释器非常简单。
重要的概念是,这个解释器可以在任何您想要的monad中运行。因此,如果您想通过并发线程传递一些状态,您可以这样做。例如,这里有一个玩具解释器,它使用StateT IO monad来跟踪调用IO操作的次数:
interpret t = case t of
    SYS_NBIO io -> do
        modify (+1)
        t' <- lift io
        interpret t'
    ...

您甚至可以在forkIO的操作中跨线程处理单子!这是我很久以前写的一些非常陈旧、有缺陷且不太好的代码,因为当时我经验不足,不知道什么是自由单子,但它展示了这个过程:

module Thread (Thread(..), done, lift, branch, fork, run) where

import Control.Concurrent
import Control.Concurrent.STM
import Control.Monad.Cont
import Data.Sequence
import qualified Data.Foldable as F

data Thread f m =
    Done
  | Lift (m (Thread f m))
  | LiftIO (IO (Thread f m))
  | Branch (f (Thread f m))
  | Exit

done = cont $ \c -> Done
lift' x = cont $ \c -> Lift $ liftM c x
liftIO' x = cont $ \c -> LiftIO $ liftM c x
branch x = cont $ \c -> Branch $ fmap c x
exit = cont $ \c -> Exit

fork x = join $ branch [return (), x >> done]

run x = do
    q <- liftIO $ newTChanIO
    enqueue q $ runCont x $ \_ -> Done
    loop q
  where
    loop q = do
        t <- liftIO $ atomically $ readTChan q
        case t of
            Exit -> return ()
            Done -> loop q
            Branch ft -> mapM_ (enqueue q) ft >> loop q
            Lift mt -> (mt >>= enqueue q) >> loop q
            LiftIO it -> (liftIO $ forkIO $ it >>= enqueue q) >> loop q
    enqueue q = liftIO . atomically . writeTChan q

自由单子的重点在于它们提供单子实例,而没有其他。换句话说,它们退居幕后,让你完全自由地决定如何解释它们,这就是为什么它们非常有用的原因。


1
这非常有趣。我猜已经存在的 Haskell 概念之一是 Coroutine monad(形成一个自由的 monad)。协程基本上让你将任何 functor 视为带有“挂起”的自由 monad,这样你就可以跳过解释与读写无关的“无聊东西”。我会尝试这种方法并暂时接受这个解决方案。 - dflemstr
是的,我实际上将其实现为“自由单子变换器”,Ross计划在下一个transformers版本中包含它。您可以在此处查看我的实现(https://github.com/Gabriel439/Haskell-Pipes-Library/blob/master/Control/Monad/Trans/Free.hs)。我在我的`pipes`库中使用它。 - Gabriella Gonzalez
如果一个自由单子是一个转换器,那么你如何引入像forkIO这样的功能呢?"单子堆栈"如何在IO单子中被"序列化"?那部分对我来说毫无意义。 - dflemstr
1
我的示例代码中,神奇的是最后一个case语句。 it的类型是IO Trace,并且它是通过forkIO线程传递的。简而言之,它允许您仅将代码的IO部分通过forkIO线程化。 - Gabriella Gonzalez

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接