F#中的全局状态和异步工作流

6

一个常用的示例,用于说明在F#中使用异步工作流程的是并行检索多个网页。此类示例可以在以下链接中找到:http://en.wikibooks.org/wiki/F_Sharp_Programming/Async_Workflows 如果链接在未来更改,这里也提供了代码:

open System.Text.RegularExpressions
open System.Net

let download url =
    let webclient = new System.Net.WebClient()
    webclient.DownloadString(url : string)

let extractLinks html = Regex.Matches(html, @"http://\S+")

let downloadAndExtractLinks url =
    let links = (url |> download |> extractLinks)
    url, links.Count

let urls =
     [@"http://www.craigslist.com/";
     @"http://www.msn.com/";
     @"http://en.wikibooks.org/wiki/Main_Page";
     @"http://www.wordpress.com/";
     @"http://news.google.com/";]

let pmap f l =
    seq { for a in l -> async { return f a } }
    |> Async.Parallel
    |> Async.Run

let testSynchronous() = List.map downloadAndExtractLinks urls
let testAsynchronous() = pmap downloadAndExtractLinks urls

let time msg f =
    let stopwatch = System.Diagnostics.Stopwatch.StartNew()
    let temp = f()
    stopwatch.Stop()
    printfn "(%f ms) %s: %A" stopwatch.Elapsed.TotalMilliseconds msg temp

let main() =
    printfn "Start..."
    time "Synchronous" testSynchronous
    time "Asynchronous" testAsynchronous
    printfn "Done."

main()

我想知道如何处理全局状态的变化,例如网络连接丢失?有没有一种优雅的方法来解决这个问题?
可以在进行Async.Parallel调用之前检查网络状态,但是状态可能会在执行期间发生变化。假设我们想要暂停执行直到网络再次可用,那么有没有一种函数式的方法来实现呢?
1个回答

5

首先,这个示例代码存在一个问题 - 它使用了Async.Parallel来运行多个操作以实现并行处理,但是这些操作本身并没有被实现成异步的,因此这并不能避免线程池中阻塞过多的线程。

异步化。为使代码完全异步化,downloaddownloadAndExtractLinks函数也应该被实现成异步的,以便你可以使用WebClientAsyncDownloadString方法:

let asyncDownload url = async {
    let webclient = new System.Net.WebClient()
    return! webclient.AsyncDownloadString(System.Uri(url : string)) }

let asyncDownloadAndExtractLinks url = async {
    let! html = asyncDownload url
    let links = extractLinks html
    return url, links.Count }

let pmap f l =
    seq { for a in l -> async { return! f a } }
    |> Async.Parallel
    |> Async.RunSynchronously

重试。 现在,回答这个问题 - 没有内置机制来处理网络故障等错误,所以您需要自己实现这个逻辑。什么是正确的方法取决于您的情况。一种常见的方法是尝试操作特定次数,并仅在不成功(例如10次)时抛出异常。您可以将其编写为接受其他异步工作流程的原始函数:

let rec asyncRetry times op = async {
  try
    return! op
  with e ->
    if times <= 1 then return (reraise e)
    else return! asyncRetry (times - 1) op }

然后,您可以更改主函数以构建一个工作流,重新尝试下载10次:

let testAsynchronous() = 
  pmap (asyncRetry 10 downloadAndExtractLinks) urls

共享状态。 另一个问题是Async.Parallel只有在所有下载完成后才会返回(如果有一个网站出现故障,你将不得不等待)。如果你想要显示结果,需要更复杂的东西。

一种不错的方法是使用 F#代理 - 创建一个代理,它可以存储到目前为止获得的结果,并可以处理两个消息 - 一个添加新结果,另一个返回当前状态。然后,您可以启动多个异步任务,将结果发送到代理中,在单独的异步工作流程中,您可以使用轮询来检查当前状态(例如更新用户界面)。

我写了一篇关于代理的 MSDN 系列文章,还写了两篇developerFusion 的文章,其中包含大量关于 F#代理的代码示例。


汤姆,虽然我非常喜欢F#代理,但我不认为这像Haskell一样是函数式编程。它似乎所做的是将状态(在Haskell中的IO Monad)视为要传递给函数的东西,而是将状态视为可以被多个代理“同时”改变并通过代理之间的消息传递进行仲裁。 - JonnyBoats
2
使用代理并不像 Haskell 那样是函数式编程。我真的不认为纯函数式解决方案那么优雅和有用。消息传递并发只是 F# 中可用的另一种有用范例,我认为它非常适合需要协调的并发进程。 - Tomas Petricek
这是我目前正在努力理解的事情。通过像Haskell这样的语言发现FP(函数式编程),诱惑是在F#中采用完全纯粹的方法。找到正确的范式组合将是一个漫长的学习过程,我认为。 - anton.burger

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接