Haskell - 基于Actor的可变性

5
我正在开发一个Haskell网络应用程序,并使用Actor模式管理多线程。我遇到的一个问题是如何存储客户端套接字/句柄集合,这些信息必须对所有线程都可访问,并且在客户端登录/退出时可能会更改。
由于我来自命令式编程世界,所以我想到了一种锁定机制,但当我意识到这种方法很丑陋时,我思考了“纯”可变性,实际上它有点纯净。
import Control.Concurrent
import Control.Monad
import Network
import System.IO
import Data.List
import Data.Maybe
import System.Environment
import Control.Exception


newStorage :: (Eq a, Show a) => IO (Chan (String, Maybe (Chan [a]), Maybe a))
newStorage = do
  q <- newChan
  forkIO $ storage [] q
  return q


newHandleStorage :: IO (Chan (String, Maybe (Chan [Handle]), Maybe Handle))
newHandleStorage = newStorage


storage :: (Eq a, Show a) => [a] -> Chan (String, Maybe (Chan [a]), Maybe a) -> IO ()
storage s q = do
  let loop = (`storage` q)
  (req, reply, d) <- readChan q
  print ("processing " ++ show(d))
  case req of
    "add" -> loop ((fromJust d) : s)
    "remove" -> loop (delete (fromJust d) s)
    "get" -> do
      writeChan (fromJust reply) s
      loop s


store s d = writeChan s ("add", Nothing, Just d)
unstore s d = writeChan s ("remove", Nothing, Just d)
request s = do
  chan <- newChan
  writeChan s ("get", Just chan, Nothing)
  readChan chan

重点是,一个线程(actor)管理着一组项目,并根据传入的请求修改该清单。由于线程非常便宜,我认为这可能是一种非常好的函数替代方法。

当然,这只是一个原型(快速的脏证明概念)。 所以我的问题是:

  1. 这是管理共享可变变量的“好”方法吗(在actor世界中)?
  2. 是否已经有了这种模式的库?(我已经搜索过了,但什么也没找到)

问候, Chris


3
如果你愿意探索替代 Actor 模型的方案,我建议你尝试 Haskell 的软件事务内存(Software Transactional Memory)。它是一种类似于数据库事务的美妙机制。请参阅《The Real World Haskell》的第28章 - Petr
从技术上讲,这是一个很好的选择,但我听说在使用大量线程(每个客户端一个线程,在Haskell中是标准的),以及相对较长的操作(从列表中删除一个项目是O(n),当然哈希集/映射可以帮助解决这个问题)时,STM的性能可能会大幅降低。当然,MVar通道可以被STM通道替换,这意味着使用了两种技术的最佳部分。编辑:在这种情况下,Actor模式通常非常好,因为添加/删除项是O(1)(只需发送消息),实际工作是在线程中完成... - Kr0e
你说得对。使用STM时,事务可能会被多次重启,导致性能降低。但是,如果你的同步操作时间很长,使用Actor也可能会遇到类似的问题——如果消息过多,它的状态将落后于实际情况。因此,使用平衡树(Map/Set)或基于ST/IO的哈希集合肯定会有所帮助。 - Petr
也许一个有趣且有趣的解决方案是创建一个平衡树,其中节点被处理为STM变量(或类似的哈希集)。这样不同的线程就可以同时更新树的不同部分。 - Petr
这绝对也是一个不错的解决方案。我只是喜欢Actor的概念,但这个解决方案可能更好,也许对于每个并发部分都使用Actor有点过度了。但Actor非常容易使用,这也很吸引人。 - Kr0e
2个回答

5
这是一个使用stmpipes-network的快速而简单的示例。它将建立一个简单的服务器,允许客户端连接并递增或递减一个计数器。它将显示一个非常简单的状态栏,显示所有连接的客户端的当前总数,并在他们断开连接时从该栏中删除客户端总数。
首先我将从服务器开始,我很慷慨地对代码进行了注释以解释它的工作方式:
import Control.Concurrent.STM (STM, atomically)
import Control.Concurrent.STM.TVar
import qualified Data.HashMap.Strict as H
import Data.Foldable (forM_)

import Control.Concurrent (forkIO, threadDelay)
import Control.Monad (unless)
import Control.Monad.Trans.State.Strict
import qualified Data.ByteString.Char8 as B
import Control.Proxy
import Control.Proxy.TCP
import System.IO

main = do
    hSetBuffering stdout NoBuffering

    {- These are the internal data structures.  They should be an implementation
       detail and you should never expose these references to the
       "business logic" part of the application. -}
    -- I use nRef to keep track of creating fresh Ints (which identify users)
    nRef <- newTVarIO 0       :: IO (TVar Int)
    {- hMap associates every user (i.e. Int) with a counter

       Notice how I've "striped" the hash map by storing STM references to the
       values instead of storing the values directly.  This means that I only
       actually write the hashmap when adding or removing users, which reduces
       contention for the hash map.

       Since each user gets their own unique STM reference for their counter,
       modifying counters does not cause contention with other counters or
       contention with the hash map. -}
    hMap <- newTVarIO H.empty :: IO (TVar (H.HashMap Int (TVar Int)))

    {- The following code makes heavy use of Haskell's pure closures.  Each
       'let' binding closes over its current environment, which is safe since
        Haskell is pure. -}

    let {- 'getCounters' is the only server-facing command in our STM API.  The
           only permitted operation is retrieving the current set of user
           counters.

           'getCounters' closes over the 'hMap' reference currently in scope so
           that the server never needs to be aware about our internal
           implementation. -}
        getCounters :: STM [Int]
        getCounters = do
            refs <- fmap H.elems (readTVar hMap)
            mapM readTVar refs

        {- 'init' is the only client-facing command in our STM API.  It
            initializes the client's entry in the hash map and returns two
            commands: the first command is what the client calls to 'increment'
            their counter and the second command is what the client calls to log
            off and delete
            'delete' command.

            Notice that those two returned commands each close over the client's
            unique STM reference so the client never needs to be aware of how
            exactly 'init' is implemented under the hood. -}
        init :: STM (STM (), STM ())
        init = do
            n   <- readTVar nRef
            writeTVar nRef $! n + 1

            ref <- newTVar 0
            modifyTVar' hMap (H.insert n ref)

            let incrementRef :: STM ()
                incrementRef = do
                    mRef <- fmap (H.lookup n) (readTVar hMap)
                    forM_ mRef $ \ref -> modifyTVar' ref (+ 1)

                deleteRef :: STM ()
                deleteRef = modifyTVar' hMap (H.delete n)

            return (incrementRef, deleteRef)

    {- Now for the actual program logic.  Everything past this point only uses
       the approved STM API (i.e. 'getCounters' and 'init').  If I wanted I
       could factor the above approved STM API into a separate module to enforce
       the encapsulation boundary, but I am lazy. -}

    {- Fork a thread which polls the current state of the counters and displays
       it to the console.  There is a way to implement this without polling but
       this gets the job done for now.

       Most of what it is doing is just some simple tricks to reuse the same
       console line instead of outputting a stream of lines.  Otherwise it
       would be just:

       forkIO $ forever $ do
           ns <- atomically getCounters
           print ns
    -}
    forkIO $ (`evalStateT` 0) $ forever $ do
        del <- get
        lift $ do
            putStr (replicate del '\b')
            putStr (replicate del ' ' )
            putStr (replicate del '\b')
        ns <- lift $ atomically getCounters
        let str = show ns
        lift $ putStr str
        put $! length str
        lift $ threadDelay 10000

    {- Fork a thread for each incoming connection, which listens to the client's
       commands and translates them into 'STM' actions -}
    serve HostAny "8080" $ \(socket, _) -> do
        (increment, delete) <- atomically init

        {- Right now, just do the dumb thing and convert all keypresses into
           increment commands, with the exception of the 'q' key, which will
           quit -}
        let handler :: (Proxy p) => () -> Consumer p Char IO ()
            handler () = runIdentityP loop
              where
                loop = do
                    c <- request ()
                    unless (c == 'q') $ do
                        lift $ atomically increment
                        loop

        {- This uses my 'pipes' library.  It basically is a high-level way to
           say:

           * Read binary packets from the socket no bigger than 4096 bytes

           * Get the first character from each packet and discard the rest

           * Handle the character using the above 'handler' function -}
        runProxy $ socketReadS 4096 socket >-> mapD B.head >-> handler

        {- The above pipeline finishes either when the socket closes or
           'handler' stops looping because it received a 'q'.  Either case means
           that the client is done so we log them out using 'delete'. -}
        atomically delete

接下来是客户端,它只需打开连接并将所有按键转发为单个数据包:
import Control.Monad
import Control.Proxy
import Control.Proxy.Safe
import Control.Proxy.TCP.Safe
import Data.ByteString.Char8 (pack)
import System.IO

main = do
    hSetBuffering stdin NoBuffering
    hSetEcho      stdin False

    {- Again, this uses my 'pipes' library.  It basically says:

        * Read characters from the console using 'commands'

        * Pack them into a binary format

        * send them to a server running at 127.0.0.1:8080

        This finishes looping when the user types a 'q' or the connection is
        closed for whatever reason.
    -}
    runSafeIO $ runProxy $ runEitherK $
         try . commands
     >-> mapD (\c -> pack [c])
     >-> connectWriteD Nothing "127.0.0.1" "8080"

commands :: (Proxy p) => () -> Producer p Char IO ()
commands () = runIdentityP loop
  where
    loop = do
        c <- lift getChar
        respond c
        unless (c == 'q') loop

很简单: commands 生成一个字符流,然后将其转换为 ByteString 并作为数据包发送到服务器。
如果您运行服务器和几个客户端,并让他们每个人输入一些键,您的服务器显示将输出一个列表,显示每个客户端输入了多少个键。
[1,6,4]

如果一些客户端断开连接,它们将从列表中移除:

[1,4]

请注意,这些示例中的pipes组件将在即将发布的pipes-4.0.0版本中大大简化,但当前的pipes生态系统仍可正常运行。

很棒的解决方案,我一定会考虑的 ;) - Kr0e
只是为了我的理解:STM被认为是纯的吗?我猜它不是,因为它完全涉及可变性而没有使用锁机制,对吗? - Kr0e
2
@Kr0e 对的。把STM想象成可组合、线程安全的可变内存引用。 - Gabriella Gonzalez
2
如果您不使用显式线程(仅使用“par”和类似的纯构造),则无需处理可变性和状态。但是,一旦开始使用并发,即显式地使用线程,并且希望它们以某种方式进行通信,则必须存在某些可变状态(即随时间变化的值),在线程之间共享和可见。因此,如果需要利用多个 CPU 核心,则有两个选择:保持纯净并仅使用并行性,或者使用并发并处理可变世界中的线程互通。 - Petr
@Kr0e 使用STM也是一样的 - 只使用MVarTVar,没有其他的。但是使用STM更容易创建原子组件并将它们组合成复杂的表达式。 - Petr
显示剩余4条评论

2

首先,我强烈建议使用自己特定的数据类型来表示命令。当使用 (String, Maybe (Chan [a]), Maybe a) 时,一个有缺陷的客户端可能会通过发送未知命令或发送 ("add", Nothing, Nothing) 等来使您的 actor 崩溃。我建议使用类似以下的内容:

data Command = AddItem Item | RemoveItem Item | UpdateItem Item | GetItems
data Command a = Add a | Remove a | Get (Chan [a])

然后您可以在storage中以安全的方式匹配命令。

Actor有其优点,但我认为它们也有一些缺点。例如,从Actor获取答案需要向其发送命令,然后等待回复。客户端无法完全确定它是否收到了回复,以及回复将是某种特定类型 - 您无法说要针对此特定命令仅获得此类答案(以及其中有多少个)。

因此,我将提供一个简单的STM解决方案作为示例。最好使用哈希表或(平衡树)集合,但由于Handle既不实现Ord也不实现Hashable,因此我们无法使用这些数据结构,所以我将继续使用列表。

module ThreadSet (
    TSet, add, remove, get
) where

import Control.Monad
import Control.Monad.STM
import Control.Concurrent.STM.TVar
import Data.List (delete)

newtype TSet a = TSet (TVar [a])

add :: (Eq a) => a -> TSet a -> STM ()
add x (TSet v) = readTVar v >>= writeTVar v . (x :)

remove :: (Eq a) => a -> TSet a -> STM ()
remove x (TSet v) = readTVar v >>= writeTVar v . delete x

get :: (Eq a) => TSet a -> STM [a]
get (TSet v) = readTVar v

该模块实现了基于STM的任意元素集合。您可以拥有多个这样的集合,并在单个STM事务中一起使用它们,该事务会一次性成功或失败。例如:
-- | Ensures that there is exactly one element `x` in the set.
add1 :: (Eq a) => a -> TSet a -> STM ()
add1 x v = remove x v >> add x v

这在使用演员时会很困难,你必须将其作为演员的另一个命令添加,不能由现有的动作组成并仍保持原子性。
更新:有一篇有趣的文章解释了Clojure设计师选择不使用演员的原因。例如,即使对可变结构进行许多读取和仅少量写入,它们也都被串行化,这可能会极大地影响性能。请参阅文章

序列化/反序列化确实会消耗很多资源,这是事实。CloudHaskell也有同样的“序列化开销”,他们称之为一种特性。但最近他们添加了一个不安全的发送函数,可以在不进行序列化/反序列化的情况下传递消息,速度快了一个数量级。理论上,消息传递应该像简单的函数调用一样便宜,以使Actor模式成为一种真正的替代方案,在Erlang中当然是这样的。我认为STM是一个非常棒的特性,也许使用两种技术是正确的选择,因为与Actor模式相比,STM确实是低级别的。 - Kr0e

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接