如何在Haskell中将IO操作的输出导入到进程中

4
我想要创建一个进程,并定期将来自Haskell程序的一些文本写入该进程的stdin(从IO操作中)。以下在GHCi中正确工作,但在构建和运行时不起作用。在GHCi中,一切都完美地工作,并且IO操作的值会定期输入。但是,在构建和运行时,似乎会在写入进程的stdin时暂停任意长的时间。我已经使用了CreateProcess(来自System.Process)来创建该句柄,并尝试过hPutStrLn(缓冲区设置为NoBuffering--LineBuffering也没用)。所以我尝试使用process-streaming包和pipes,但似乎根本无法让它们工作。真正的问题是:如何从Haskell创建进程并定期向其写入?下面是一个最小化的示例,展示了这种行为:
import System.Process
import Data.IORef
import qualified Data.Text as T  -- from the text package
import qualified Data.Text.IO as TIO
import Control.Concurrent.Timer  -- from the timers package
import Control.Concurrent.Suspend -- from the suspend package

main = do
    (Just hin, _,_,_) <- createProcess_ "bgProcess" $
        (System.Process.proc "grep"  ["10"]) { std_in = CreatePipe }

    ref <- newIORef 0 :: IO (IORef Int)
    flip repeatedTimer (msDelay 1000) $ do
        x <- atomicModifyIORef' ref $ \x -> (x + 1, x)
        hSetBuffering hin NoBuffering
        TIO.hPutStrLn hin $ T.pack $ show x

非常感谢您的帮助。

2个回答

3

这是一个管道生产者,它以每秒一次的频率发出一系列数字:

{-# language NumDecimals #-}
import Control.Concurrent
import Pipes
import qualified Data.ByteString.Char8 as Bytes

periodic :: Producer Bytes.ByteString IO ()
periodic = go 0
    where
        go n = do
            d <- liftIO (pure (Bytes.pack (show n ++ "\n"))) -- put your IO action here
            Pipes.yield d
            liftIO (threadDelay 1e6)
            go (succ n)

而且,使用process-streaming,我们可以将生产者提供给外部进程进行处理,如下所示:

import System.Process.Streaming

main :: IO ()
main = do
    executeInteractive (shell "grep 10"){ std_in = CreatePipe } (feedProducer periodic)

我使用了executeInteractive。它会自动将std_in设置为NoBuffering
另外,如果你想要处理每个匹配项,需要立即传递--line-buffered选项给grep(或使用stdbuf命令),以确保匹配项立即在输出中可用。

我希望得到这样的答案。我会尽快尝试并告诉你(现在是凌晨4点:))。 - Ajit Singh
如果您想确保每个匹配项立即在输出中可用,请确保将“--line-buffered”选项传递给“grep”。 - danidiaz
谢谢 :) 这个方法对于 grep 示例(以及 cat)对我很有用;但是我的实际应用程序仍然显示旧的行为,这似乎是代码中的其他问题。让我先对我的应用程序进行分析并找出原因,但这似乎是我需要的答案。 - Ajit Singh

0

使用threadDelay怎么样,例如:

import Control.Monad (forever)
import Control.Concurrent (threadDelay)
...

forever $ do
    x <- atomicModifyIORef' ref $ \x -> (x + 1, x)
    hSetBuffering hin NoBuffering
    TIO.hPutStrLn hin $ T.pack $ show x
    threadDelay 1000000  -- 1 sec

如果你需要同时进行其他工作,可以将其分配到另一个线程中。

您可以使用以下方法消除对IORef的需求:

loop h x = do 
    hSetBuffering h NoBuffering
    TIO.hPutStrLn h $ T.pack $ show x
    threadDelay 1000000
    loop h (x+1)

当然,你只需要执行一次 - 例如,在进入循环之前执行它。

threadDelay作为一个时间机制非常不准确,这就是为什么我使用suspend(它只指定了最小等待时间)。我尝试过hSetBuffering解决方案,但没有任何改变。 - Ajit Singh
嗯,“suspend”在Haskell中是通过使用“threadDelay”来实现的,几乎所有的计时延迟也都是这样实现的。 - ErikR
这个回答是否真正回答了问题(关于编译时发现任意长的延迟)?它似乎只包含风格建议。 - Daniel Wagner
我认为实际问题是:“真正的问题是:如何从Haskell创建一个进程并定期向其写入?” - ErikR
@ErikR 我认为只有在你确定了给定代码出了什么问题并提供证据表明这段代码不会以同样的方式出错时,才能回答那个问题。 - Daniel Wagner
我实际上使用 grep 1 而不是 grep 10 编译并运行了代码,我没有看到任何递增的延迟 - 行以每秒一次的速度输出在 10 和 100 之间。我发布它是因为这是一种更简单(和更惯用)的循环执行方式,而且在我这里似乎可行。 - ErikR

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接