在并行环境下运行异步方法8次

18

我如何将以下内容转换为Parallel.ForEach?

public async void getThreadContents(String[] threads)
{
    HttpClient client = new HttpClient();
    List<String> usernames = new List<String>();
    int i = 0;

    foreach (String url in threads)
    {
        i++;
        progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>();
        HttpResponseMessage response = await client.GetAsync(url);
        String content = await response.Content.ReadAsStringAsync();
        String user;
        Predicate<String> userPredicate;
        foreach (Match match in regex.Matches(content))
        {
            user = match.Groups[1].ToString();
            userPredicate = (String x) => x == user;
            if (usernames.Find(userPredicate) != user)
            {
                usernames.Add(match.Groups[1].ToString());
            }
        }
        progressBar1.PerformStep();
    }
}

我原本的编码假设异步和并行处理是一样的,但我刚刚意识到它们是不同的。我查看了所有我能找到的相关问题,但好像都找不到一个适合我的示例。大部分示例中使用难以理解的变量名,只用单个字母的变量名不能清楚地说明其包含的内容。

我的线程数组(包含论坛帖子的URL)通常有300到2000个条目,因为要进行多个HTTP请求,所以采用并行处理似乎可以加速执行。

在使用Parallel.ForEach之前,我是否需要将所有异步操作都删除?我该如何做?我能否在不阻塞主线程的情况下完成此操作?

顺便说一下,我正在使用.NET 4.5。


你的“ threads ”数组里面有什么?为什么它被称为“threads”?它并不包含“线程”。 - spender
@spender 线程是指论坛中的帖子,其中包含指向论坛帖子的URL。 - Steen Schütt
1
我不明白抱怨你找到的某些答案对你的问题有什么帮助。 - svick
@svick 我写这个是因为我想要一个有意义的变量名的例子。 - Steen Schütt
这个回答解决了你的问题吗?使用异步lambda的并行foreach - Michael Freidgeim
4个回答

16
我编写代码时假设异步处理和并行处理是相同的。异步处理和并行处理是非常不同的。如果您不了解区别,建议先阅读相关资料(例如:什么是C#中异步和并行编程之间的关系?)。现在,您想要实现的任务并不简单,因为您需要使用特定的并行度(8)异步地处理大量集合。对于同步处理,您可以使用Parallel.ForEach()(连同ParallelOptions配置并行度),但是没有简单的替代方案适用于async。在您的代码中,这变得复杂,因为您希望所有操作都在UI线程上执行。(尽管理想情况下,您不应该直接从计算中访问UI。而是应该使用IProgress,这意味着代码不再必须在UI线程上执行。)

在.Net 4.5中,最好的方法可能是使用TPL Dataflow。它的ActionBlock正是你想要的,但它可能会相当冗长(因为它比你需要的更灵活)。因此,创建一个辅助方法是有意义的:

public static Task AsyncParallelForEach<T>(
    IEnumerable<T> source, Func<T, Task> body,
    int maxDegreeOfParallelism = DataflowBlockOptions.Unbounded,
    TaskScheduler scheduler = null)
{
    var options = new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = maxDegreeOfParallelism
    };
    if (scheduler != null)
        options.TaskScheduler = scheduler;

    var block = new ActionBlock<T>(body, options);

    foreach (var item in source)
        block.Post(item);

    block.Complete();
    return block.Completion;
}

在您的情况下,您需要像这样使用它:
await AsyncParallelForEach(
    threads, async url => await DownloadUrl(url), 8,
    TaskScheduler.FromCurrentSynchronizationContext());

在这里,DownloadUrl() 是一个处理单个 URL(循环体)的async Task方法,8是并行度的程度(实际代码中不应该是字面常量),FromCurrentSynchronizationContext()确保代码在UI线程上执行。

我应该在哪个命名空间中找到 DataflowBlockOptionsExecutionDataflowBlockOptionsActionBlock<T>?我查了一下MSDN,它说是在 System.Threading.Tasks.Dataflow 中,但我无法使用它,因为它显示该命名空间不存在。 - Steen Schütt
1
您需要将其安装到您的项目中 - Stephen Cleary
我确实从网站上下载了它,但是当我从NuGet管理器中获取时它可以工作。 无论如何,感谢您的帮助 :) - Steen Schütt
我试图从这里借鉴你的优秀工作,但是在两个嵌套的IAsyncEnumerable可等待的foreach中,我得到了奇怪的结果。我是否漏掉了什么? 也许我不应该在可等待的AsyncParallelForEach中都使用相同的TaskScheduler.FromCurrentSynchronizationContext()?你认为呢? - Florin Vîrdol

10

Stephen Toub在他的博客文章中讲解了如何实现ForEachAsync。对于支持Dataflow平台的情况,Svick的回答也很好。

这里提供一种使用TPL分区器的替代方案:

public static Task ForEachAsync<T>(this IEnumerable<T> source,
    int degreeOfParallelism, Func<T, Task> body)
{
  var partitions = Partitioner.Create(source).GetPartitions(degreeOfParallelism);
  var tasks = partitions.Select(async partition =>
  {
    using (partition) 
      while (partition.MoveNext()) 
        await body(partition.Current); 
  });
  return Task.WhenAll(tasks);
}

你可以像这样使用它:

public async Task getThreadContentsAsync(String[] threads)
{
  HttpClient client = new HttpClient();
  ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>();

  await threads.ForEachAsync(8, async url =>
  {
    HttpResponseMessage response = await client.GetAsync(url);
    String content = await response.Content.ReadAsStringAsync();
    String user;
    foreach (Match match in regex.Matches(content))
    {
      user = match.Groups[1].ToString();
      usernames.TryAdd(user, null);
    }
    progressBar1.PerformStep();
  });
}

在 .Net VNext 中拥有 Parallel.ForeachAsync 功能会非常不错。这个功能的实现机会如何? - rudimenter
@rudimenter:我认为这不是优先考虑的问题。大多数情况下,您需要执行并行异步工作。对于那些极其罕见的需要在并行中执行异步工作的情况,Task.RunTask.WhenAll提供了基本的并行性,除非您需要限流。在这种情况下,TPL Dataflow内置了限流支持,如果您的用例确实如此复杂,那么您应该使用Dataflow。因此,我认为没有一个好的使用案例来使用ForEachAsync(即通常当人们要求这个时,已经有更好的替代方案)。 - Stephen Cleary
我同意你的观点,大部分工作都是并行或异步的,但新的异步特性具有传染性。整个应用程序现在从根到枝都在使用异步。迭代异步任务(并行或顺序)将变得非常普遍。如果BCL中有一些东西而不需要引入额外的库,如Dataflow和自定义节流实现,那就太好了。无论如何,谢谢。 - rudimenter
@rudimenter:我的观点是,您已经拥有并行和顺序迭代了:Task.WhenAllawait。罕见的情况是,当您有大量既是异步又是 CPU 绑定的任务,并且您需要对它们进行限制时。不使用 TPL Dataflow 也可以干净地处理其他每种情况。 - Stephen Cleary
1
@AndrewHanlon:调度是关键,内置的ConcurrentExclusiveSchedulerPair应该足够了。只需将最大并发级别指定为8,并使用ConcurrentScheduler属性(忽略另一个属性)。请记住,您需要“Unwrap”由任务调度程序执行的异步代码。 - Stephen Cleary
显示剩余3条评论

3
另一种选择是使用SemaphoreSlimAsyncSemaphore(后者包含在我的AsyncEx库中,支持比SemaphoreSlim更多的平台):
public async Task getThreadContentsAsync(String[] threads)
{
  SemaphoreSlim semaphore = new SemaphoreSlim(8);
  HttpClient client = new HttpClient();
  ConcurrentDictionary<String, object> usernames = new ConcurrentDictionary<String, object>();

  await Task.WhenAll(threads.Select(async url =>
  {
    await semaphore.WaitAsync();
    try
    {
      HttpResponseMessage response = await client.GetAsync(url);
      String content = await response.Content.ReadAsStringAsync();
      String user;
      foreach (Match match in regex.Matches(content))
      {
        user = match.Groups[1].ToString();
        usernames.TryAdd(user, null);
      }
      progressBar1.PerformStep();
    }
    finally
    {
      semaphore.Release();
    }
  }));
}

0
你可以尝试使用AsyncEnumerator NuGet Package中的ParallelForEachAsync扩展方法:
using System.Collections.Async;

public async void getThreadContents(String[] threads)
{
    HttpClient client = new HttpClient();
    List<String> usernames = new List<String>();
    int i = 0;

    await threads.ParallelForEachAsync(async url =>
    {
        i++;
        progressLabel.Text = "Scanning thread " + i.ToString() + "/" + threads.Count<String>();
        HttpResponseMessage response = await client.GetAsync(url);
        String content = await response.Content.ReadAsStringAsync();
        String user;
        Predicate<String> userPredicate;
        foreach (Match match in regex.Matches(content))
        {
            user = match.Groups[1].ToString();
            userPredicate = (String x) => x == user;
            if (usernames.Find(userPredicate) != user)
            {
                usernames.Add(match.Groups[1].ToString());
            }
        }

        // THIS CALL MUST BE THREAD-SAFE!
        progressBar1.PerformStep();
    },
    maxDegreeOfParallelism: 8);
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接