C# .NET 4.5的异步/多线程?

28

我正在编写一个C#控制台应用程序,从网页上爬取数据。

这个应用程序将会访问大约8000个网页并且爬取数据(每个页面的数据格式相同)。

目前我已经使程序可以正常工作,但是没有使用异步方法和多线程。

然而,我需要让程序运行更快。我认为这是因为它在等待下载HTML文件(WebClient.DownloadString(url)),所以只使用了CPU的3%-6%。

以下是我的程序的基本流程:

DataSet alldata;

foreach(var url in the8000urls)
{
    // ScrapeData downloads the html from the url with WebClient.DownloadString
    // and scrapes the data into several datatables which it returns as a dataset.
    DataSet dataForOnePage = ScrapeData(url);

    //merge each table in dataForOnePage into allData
}

// PushAllDataToSql(alldata);

我一直在尝试使用多线程来处理这个问题,但不确定如何正确地开始。我正在使用.net 4.5,据我理解,4.5中的异步和等待是为了更容易编程,但我仍然有点迷茫。

我的想法是只需保持创建新的异步线程来处理此行:

DataSet dataForOnePage = ScrapeData(url);

然后每当一个完成,就运行

//merge each table in dataForOnePage into allData

有人能指导我如何在 .net 4.5 c# 中将那一行变成异步,并在完成后运行我的合并方法吗?

谢谢。

编辑:这是我的 ScrapeData 方法:

public static DataSet GetProperyData(CookieAwareWebClient webClient, string pageid)
{
    var dsPageData = new DataSet();

    // DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT
    string url = @"https://domain.com?&id=" + pageid + @"restofurl";
    string html = webClient.DownloadString(url);
    var doc = new HtmlDocument();
    doc.LoadHtml(html );

    // A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData 
    return dsPageData ;
}

4
好的,我会尽力以最简练、准确地方式翻译这篇文章。请点击这里阅读:http://msdn.microsoft.com/zh-cn/library/hh556530(v=vs.110).aspx - dugas
1
请查看PLinq the8000urls.AsParallel().ForAll(...)。http://msdn.microsoft.com/en- - asawyer
1
@asawyer AsParallel 可以工作,但它会有点浪费,因为它会生成线程来等待固有的异步操作。尽管如此,它更容易并且可以工作,但还有更优雅的解决方案。 - casperOne
4个回答

42
如果你想使用asyncawait关键字(虽然不是必须的,但在.NET 4.5中它们确实使事情更容易),你首先需要将你的ScrapeData方法更改为使用async关键字返回一个Task<T> instance,如下所示:
async Task<DataSet> ScrapeDataAsync(Uri url)
{
    // Create the HttpClientHandler which will handle cookies.
    var handler = new HttpClientHandler();

    // Set cookies on handler.

    // Await on an async call to fetch here, convert to a data
    // set and return.
    var client = new HttpClient(handler);

    // Wait for the HttpResponseMessage.
    HttpResponseMessage response = await client.GetAsync(url);

    // Get the content, await on the string content.
    string content = await response.Content.ReadAsStringAsync();

    // Process content variable here into a data set and return.
    DataSet ds = ...;

    // Return the DataSet, it will return Task<DataSet>.
    return ds;
}

请注意,您可能需要远离WebClient类,因为它在异步操作中不支持Task<T>。在.NET 4.5中,更好的选择是HttpClient。我选择使用HttpClient。此外,请查看HttpClientHandler,特别是CookieContainer属性,您将使用它来发送每个请求的cookie。
然而,这意味着您很可能需要使用await关键字等待另一个异步操作,而在这种情况下,很可能是页面下载。您必须调整下载数据的调用以使用异步版本,并在这些调用上await
完成后,通常会调用await,但在此场景中无法这样做,因为您将await变量。在这种情况下,您正在运行循环,因此变量将在每次迭代中重置。在这种情况下,最好将Task<T>存储在数组中,如下所示:
DataSet alldata = ...;

var tasks = new List<Task<DataSet>>();

foreach(var url in the8000urls)
{
    // ScrapeData downloads the html from the url with 
    // WebClient.DownloadString
    // and scrapes the data into several datatables which 
    // it returns as a dataset.
    tasks.Add(ScrapeDataAsync(url));
}

现在需要将数据合并到 allData 中。为此,您需要调用返回的 Task<T> 实例上的 ContinueWith 方法 并执行将数据添加到 allData 的任务:

DataSet alldata = ...;

var tasks = new List<Task<DataSet>>();

foreach(var url in the8000urls)
{
    // ScrapeData downloads the html from the url with 
    // WebClient.DownloadString
    // and scrapes the data into several datatables which 
    // it returns as a dataset.
    tasks.Add(ScrapeDataAsync(url).ContinueWith(t => {
        // Lock access to the data set, since this is
        // async now.
        lock (allData)
        {
             // Add the data.
        }
    });
}

然后,您可以使用 Task 上的 WhenAll 方法 等待所有任务,并在其上 await

// After your loop.
await Task.WhenAll(tasks);

// Process allData

然而,请注意您使用了foreach,而WhenAll需要一个IEnumerable<T>实现。这是使用LINQ的好指标,它非常适合这种情况:
DataSet alldata;

var tasks = 
    from url in the8000Urls
    select ScrapeDataAsync(url).ContinueWith(t => {
        // Lock access to the data set, since this is
        // async now.
        lock (allData)
        {
             // Add the data.
        }
    });

await Task.WhenAll(tasks);

// Process allData

如果您愿意,您也可以选择不使用查询语法,在这种情况下并不重要。

请注意,如果包含方法未标记为async(因为您正在控制台应用程序中,并且必须等待结果才能终止应用程序),则可以在调用WhenAll时返回的Task上简单地调用Wait方法

// This will block, waiting for all tasks to complete, all
// tasks will run asynchronously and when all are done, then the
// code will continue to execute.
Task.WhenAll(tasks).Wait();

// Process allData.

换句话说,您想要将Task实例收集到一个序列中,然后在处理allData之前等待整个序列。然而,如果可能的话,我建议尝试在合并到allData之前处理数据。除非数据处理需要DataSet的全部内容,否则在收到数据时尽可能地处理数据,而不是等待全部数据返回,这样可以获得更多的性能提升。

@user1308743 更新了答案,包括在包含方法不是async时应该使用什么代替await以及如何使用HttpClient来使用cookies。 - casperOne
@StephenCleary 这有点过头了。为什么不直接调用Wait呢?我的意思是,这里真正需要完成的就是这个。 - casperOne
如果你理解这会改变你的异常处理,那么“等待”是可以接受的。 - Stephen Cleary
请记住,在使用Linq中的IEnumerables时,查询本身不会被执行,直到它被枚举(并且每次枚举时都会再次执行)。这个例子没有问题(我认为它很棒),但是要记住:这里有风险。我建议只需调用await Task.WhenAll([在此处放置内联Linq]);这样您就永远不会有任何“var tasks”对象来搞乱您的代码。;-) - BrainSlugs83
@BrainSlugs83,FYI Task<T> 实例是可等待的。此外,在大多数情况下,鉴于 HttpClient 的存在,WebClient 可能不再适用。 - casperOne
显示剩余7条评论

11
你也可以使用TPL Dataflow,它非常适合解决这种问题。
在这种情况下,你需要构建一个“数据流网格”,然后让你的数据通过它流动。
实际上,这更像是一个管道而不是一个“网格”。我将分为三个步骤:从URL下载(字符串)数据;将(字符串)数据解析为HTML,然后解析为DataSet;将DataSet合并到主DataSet中。
首先,我们创建将放入数据流网格中的块:
DataSet allData;
var downloadData = new TransformBlock<string, string>(
  async pageid =>
  {
    System.Net.WebClient webClient = null;
    var url = "https://domain.com?&id=" + pageid + "restofurl";
    return await webClient.DownloadStringTaskAsync(url);
  },
  new ExecutionDataflowBlockOptions
  {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
  });
var parseHtml = new TransformBlock<string, DataSet>(
  html =>
  {
    var dsPageData = new DataSet();
    var doc = new HtmlDocument();
    doc.LoadHtml(html);

    // HTML Agility parsing

    return dsPageData;
  },
  new ExecutionDataflowBlockOptions
  {
    MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
  });
var merge = new ActionBlock<DataSet>(
  dataForOnePage =>
  {
    // merge dataForOnePage into allData
  });

然后我们将这三个块连接在一起以创建网格:

downloadData.LinkTo(parseHtml);
parseHtml.LinkTo(merge);

接下来,我们开始将数据输入网格:

foreach (var pageid in the8000urls)
  downloadData.Post(pageid);

最后,我们等待网格中的每个步骤完成(这也会清楚地传播任何错误):

downloadData.Complete();
await downloadData.Completion;
parseHtml.Complete();
await parseHtml.Completion;
merge.Complete();
await merge.Completion;

TPL Dataflow的好处在于您可以轻松控制每个部分的并行度。目前,我已将下载和解析块都设置为Unbounded,但您可能希望对它们进行限制。合并块使用默认的最大并行度1,因此在合并时不需要锁定。


2
如果今天问这个问题,我会给出一个基于TPL的解决方案,而不是之前给出的那个(https://dev59.com/O2gu5IYBdhLWcg3wByq1#11639434);这样做肯定更容易连接所有内容,并且更加简洁。 - casperOne
2
TPL Dataflow是基于Task的异步网格。它实际上不是 .NET中TPL的一部分,而是一个附加库,由同一团队开发(他们还开发了 async 支持类型)。 - Stephen Cleary
2
@iNfinity 这是不正确的。它实际上非常接近它的名称。它不必是 CPU 绑定的,您可以轻松地将 I/O 绑定操作作为数据流的一部分。它是关于将操作分解成块,然后将所有块链接在一起,并具有控制所有块处理并行性、缓冲等方式的能力。在我看来,这一点都不过度,一旦你掌握了它,块就非常容易组合在一起,你会看到这些逻辑单元非常适合 TPL。 - casperOne
那么,如果你要估计同时下载的平均页面数量,会有多少个线程在运行?是1个还是2个?或者基本上由CPU速度决定,这种情况下可以增加到数百个?(在这种情况下,你的互联网速度很快就会遇到瓶颈)?那么,为了回答我的问题,以上代码中任何给定时间点发生了多少次下载实例?不需要精确,大致数字就可以。我打算用另一种方式来做这件事,在我的下一个评论中,我将概述将其分成10或20个任务的方法,其中每个例程... - Erx_VB.NExT.Coder
@user1308743 每个例程(网格)都在一个类中,您可以实例化该类(例如)20次,以便每个类的foreach循环都会跳过20(可以是10或50)。跳过数字(20)和线程设置数量(20)必须相同。然后,从父/主foreach循环开始每个线程的任务,一旦启动,每个线程将逐步增加20直到完成8,000。您无法直接访问类中的属性,但该类可以向运行它的父foreach类发送统计信息,并且您可以报告其中的信息。你喜欢吗? - Erx_VB.NExT.Coder
显示剩余3条评论

1

我建议阅读我的关于async/await的相对完整介绍

首先,将所有东西都变成异步的,从低级别的东西开始:

public static async Task<DataSet> ScrapeDataAsync(string pageid)
{
  CookieAwareWebClient webClient = ...;
  var dsPageData = new DataSet();

  // DOWNLOAD HTML FOR THE REO PAGE AND LOAD IT INTO AN HTMLDOCUMENT
  string url = @"https://domain.com?&id=" + pageid + @"restofurl";
  string html = await webClient.DownloadStringTaskAsync(url).ConfigureAwait(false);
  var doc = new HtmlDocument();
  doc.LoadHtml(html);

  // A BUNCH OF PARSING WITH HTMLAGILITY AND STORING IN dsPageData 
  return dsPageData;
}

然后您可以按照以下方式使用它(使用带有LINQ的async):

DataSet alldata;
var tasks = the8000urls.Select(async url =>
{
  var dataForOnePage = await ScrapeDataAsync(url);

  //merge each table in dataForOnePage into allData

});
await Task.WhenAll(tasks);
PushAllDataToSql(alldata);

并且在我的 AsyncEx 库中使用 AsyncContext,因为这是控制台应用程序

class Program
{
  static int Main(string[] args)
  {
    try
    {
      return AsyncContext.Run(() => MainAsync(args));
    }
    catch (Exception ex)
    {
      Console.Error.WriteLine(ex);
      return -1;
    }
  }

  static async Task<int> MainAsync(string[] args)
  {
    ...
  }
}

就这样了。不需要锁定、继续或任何其他的东西。


-1

我相信在这里你不需要使用 asyncawait。它们可以帮助桌面应用程序,在那里你需要将工作移动到非 GUI 线程。在我看来,更好的选择是在你的情况下使用 Parallel.ForEach 方法。像这样:

    DataSet alldata;
    var bag = new ConcurrentBag<DataSet>();

    Parallel.ForEach(the8000urls, url =>
    {
        // ScrapeData downloads the html from the url with WebClient.DownloadString 
        // and scrapes the data into several datatables which it returns as a dataset. 
        DataSet dataForOnePage = ScrapeData(url);
        // Add data for one page to temp bag
        bag.Add(dataForOnePage);
    });

    //merge each table in dataForOnePage into allData from bag

    PushAllDataToSql(alldata); 

这是暴力破解。你可以这样做,但同时你会浪费线程等待固有的异步操作(Parallel将生成线程来处理the8000urls的分区,然后这些线程在获取URL时会阻塞)。你不需要 async/await,但它肯定更优雅,更好地利用了你拥有的资源。 - casperOne
这就是我们的想法。它是一个控制台应用程序,应该更快。使用 async/await 仍然一次只能下载一个 URL,这是不可接受的。而使用 Parallel.ForEach 可以同时下载更多的 URL,从而提高整个应用程序的性能。这正是 user1308743 需要的。 - Alexander
这不是真的。使用 async/await,它们不是一个接一个地加载,而是异步启动并在最后等待所有内容。你对 async/await 的理解是错误的。 - casperOne
嗯,看起来我在第一次阅读您的帖子时没有注意到 List<Task>。在方法体中返回任务并与 async / await 一起等待它们绝对是最好的选择。 - Alexander

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接