为什么我的并发 Haskell 程序会过早终止?

5
我有一个UDP服务器,它会反射接收到的每个ping消息(我认为这很有效)。在客户端,我想要做两件事:
  1. 确保我发送了N个(例如10000)消息,以及
  2. 计算正确接收到的响应数量。
似乎由于UDP的性质或forkIO的原因,我的客户端代码在结束时过早或根本没有进行任何计数。
此外,我非常惊讶看到函数tryOnePing返回250次Int 4。这可能是什么原因?
main = withSocketsDo $ do
        s <- socket AF_INET Datagram defaultProtocol
        hostAddr <- inet_addr host
        thread <- forkIO $ receiveMessages s
        -- is there any better way to eg to run that in parallel and make sure
        -- that sending/receiving are asynchronous? 


        -- forM_ [0 .. 10000] $ \i -> do
              -- sendTo s "ping" (SockAddrInet port hostAddr)
        -- actually this would be preferred since I can discard the Int 4 that
        -- it returns but forM or forM_ are out of scope here?

        let tryOnePing i = sendTo s "ping" (SockAddrInet port hostAddr)
        pings <- mapM tryOnePing [0 .. 1000]
        let c = length $ filter (\x -> x==4) pings

        -- killThread thread
        -- took that out to make sure the function receiveMessages does not
        -- end prematurely. still seems that it does

        sClose s
        print c
        -- return()

receiveMessages :: Socket -> IO ()
receiveMessages socket = forever $ do
        -- also tried here forM etc. instead of forever but no joy
        let recOnePing i = recv socket 1024
        msg <- mapM recOnePing [0 .. 1000]
        let r = length $ filter (\x -> x=="PING") msg
        print r
        print "END"

forM和相关函数可以在Control.Monad中找到。 - hammar
1个回答

6
这里的主要问题是,当您的主线程完成时,所有其他线程都会自动终止。您必须让主线程等待“接收消息线程”,否则很可能在收到任何响应之前就会简单地结束。一种简单的方法是使用“MVar”。
“MVar”是一个同步单元,可以为空或精确地持有一个值。如果当前线程尝试从空的“MVar”中获取或插入满的“MVar”,则该线程将阻塞。 在这种情况下,我们不关心值本身,所以我们只需在其中存储一个“()”。
我们将从“MVar”开始。然后,主线程将分离接收器线程,发送所有数据包,并尝试从“MVar”中获取值。
import Control.Concurrent.MVar

main = withSocketsDo $ do
    -- prepare socket, same as before

    done <- newEmptyMVar

    -- we need to pass the MVar to the receiver thread so that
    -- it can use it to signal us when it's done
    forkIO $ receiveMessages sock done

    -- send pings, same as before

    takeMVar done    -- blocks until receiver thread is done

在接收线程中,我们将接收所有消息,然后在 MVar 中放置一个 (),以表示我们已完成接收。
receiveMessages socket done = do
    -- receive messages, same as before

    putMVar done ()  -- allows the main thread to be unblocked

这解决了主要问题,程序在我的Ubuntu笔记本上运行良好,但还有几件事情需要注意。
  • sendTo不能保证整个字符串都被发送。您需要检查返回值以查看发送了多少,并在未全部发送时重试。即使是像"ping"这样的短消息,如果发送缓冲区已满,也可能发生这种情况。

  • recv需要连接的套接字。您应该使用recvFrom代替(尽管出于某种未知的原因,在我的PC上仍然有效)。

  • 向标准输出打印不是同步的,因此您可能希望更改此设置,以便使用MVar来传递接收到的数据包数量,而不仅仅是()。这样,您可以从主线程执行所有输出。或者,使用另一个MVar作为互斥锁来控制对标准输出的访问。

最后,我建议仔细阅读Network.SocketControl.ConcurrentControl.Concurrent.MVar的文档。我的大部分答案都是从那里找到的信息拼凑而成的。


谢谢,有没有办法让它们保持异步/非阻塞状态,同时让它们两个同时运行,但不需要将它们作为单独的应用程序启动? - J Fritsch
@JFritsch:不确定我是否理解了“分离的应用程序”部分。您是在谈论客户端和服务器吗? - hammar
有没有办法我可以例如使用par组合器或类似的东西来让主线程通过(同时仍然继续接收者线程),以防它在实际中成为更大程序的一部分。 - J Fritsch
@JFritsch: par 是用于并行计算而非并发。如果你想让两个线程都一直运行,那没问题。只有当整个应用程序的主线程完成时,其他线程才会被终止。由于我不知道你的大型程序的具体情况,所以很难在这里提供更详细的答案,但你可能需要在某个时候使用一些同步技术来在线程之间进行通信。MVar 只是众多同步选项中的一个示例。 - hammar
它确实在你的电脑上工作吗?当MVar参与时,我遇到了两个奇怪的错误:recv: invalid argument (Bad file descriptor)thread blocked indefinitely in an MVar operation,后者根据书中所述,应该是在线程存在循环依赖关系时发生的。但我找不到任何问题。 - J Fritsch
正如我之前提到的,recv 的文档说明它仅适用于已连接的套接字。它在我的电脑上恰好可以工作并不是你可以依赖的事情。你应该使用 recvFrom - hammar

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接