Haskell,通道,STM,-threaded,消息传递

6
我正在尝试使用channels/STM在Haskell中实现消息传递。也许这是一个糟糕的想法,有更好的方法来实现/使用Haskell中的消息传递。如果是这样,请告诉我;然而,我的探索引发了一些关于并发Haskell的基本问题。
我听说STM非常棒,特别是在Haskell中的实现。由于它支持读取和写入,并且具有一些安全优势,所以我认为可以从这里开始。这带来了我最大的问题:是否可以
msg <- atomically $ readTChan chan

当 chan 是 TChan Int 类型时,cause a wait that waits for the channel to have a value on it?

考虑以下程序:

p chan = do
    atomically $ writeTChan chan 1
    atomically $ writeTChan chan 2

q chan = do
    msg1 <- atomically $ readTChan chan 
    msg2 <- atomically $ readTChan chan
    -- for testing purposes
    putStrLn $ show msg1 
    putStrLn $ show msg2

main = do
    chan <- atomically $ newTChan
    p chan
    q chan

使用ghc --make -threaded编译此代码,然后运行程序,你会在控制台上看到1和2的输出。现在,假设我们这样做:
main = do 
    chan <- atomically $ newTChan
    forkIO $ p chan 
    forkIO $ q chan

现在,如果我们使用-threaded,它将在终端上打印无内容、1或1后跟2; 然而,如果您不使用-threaded编译,它总是会打印1后跟2。问题2:-threaded和非-threaded之间有什么区别?我想它们实际上并没有作为并发事物运行,它们只是一个接一个地运行。这与接下来的内容一致。

现在,在我的想法中,如果我同时运行p和q;即我forkIO'd它们,它们应该能够以相反的顺序运行。假设

main = do
    chan <- atomically newTChan
    forkIO $ q chan
    forkIO $ p chan

现在,如果我没有使用-threaded编译,就无法在控制台上打印任何内容。如果我使用了-threaded,有时候会有输出。虽然很少会出现1后面跟着2的情况,通常只是1或什么都没有。我也尝试过使用Control.Concurrent.Chan,结果得到了一致的结果。
第二个重要问题:通道和分叉如何相互配合,在上面的程序中发生了什么?
无论如何,似乎我不能如此天真地使用STM来模拟消息传递。也许Cloud Haskell是解决这些问题的选择——我真的不知道。如果有任何关于如何进行消息传递的信息,除了串行化~~>写入套接字~~>从套接字读取~~>反序列化之外,将非常感激。

关于“-threaded和非-threaded的区别”,您可能会喜欢我对Haskell线程模型的阐述(http://dmwit.com/gtk2hs)。请忽略与gtk相关的部分。 - Daniel Wagner
1个回答

10

不,你的想法是正确的——这就是 TChan 的作用——只是你忽略了 forkIO 的一个小细节:

问题在于,主线程不会等待使用 forkIO 创建的线程终止(请参见此处作为参考)。

因此,如果我使用参考文献中给出的提示

import Control.Concurrent
import Control.Concurrent.STM

p :: Num a => TChan a -> IO ()
p chan = do
    atomically $ writeTChan chan 1
    atomically $ writeTChan chan 2

q chan = do
    msg1 <- atomically $ readTChan chan 
    msg2 <- atomically $ readTChan chan
    -- for testing purposes
    putStrLn $ show msg1 
    putStrLn $ show msg2

main :: IO ()
main = do
    children <- newMVar []
    chan <- atomically $ newTChan
    _ <- forkChild children $ p chan
    _ <- forkChild children $ q chan
    waitForChildren children
    return ()

waitForChildren :: MVar [MVar ()] -> IO ()
waitForChildren children = do
  cs <- takeMVar children
  case cs of
    []   -> return ()
    m:ms -> do
      putMVar children ms
      takeMVar m
      waitForChildren children

forkChild :: MVar [MVar ()] -> IO () -> IO ThreadId
forkChild children io = do
  mvar <- newEmptyMVar
  childs <- takeMVar children
  putMVar children (mvar:childs)
  forkFinally io (\_ -> putMVar mvar ())

它按预期工作:

d:/Temp $ ghc --make -threaded tchan.hs
[1 of 1] Compiling Main             ( tchan.hs, tchan.o )
Linking tchan.exe ...
d:/Temp $ ./tchan.exe 
1
2
d:/Temp $

当然,如果您将调用更改为 pq,它也将继续工作。


3
有没有模块/库可以简化forkChild/waitForChildren这个东西? - Bergi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接