使用TPL/ async await进行递归异步调用

3
我希望能够使用C#异步特性(TPL/async/await)以递归方式处理分层结构。以下是我尝试做的概述。
我有一个作业集合需要处理,如下所示。每个作业都有一些任务要完成,还可以有一个或多个子作业,这些子作业也有任务要完成。所有父作业和子作业都调用同一个函数来执行实际的“工作”,并且这个函数是“异步”的(代码如下)。
/*
 *  Jobs Collection
 *  |
 *  |__ Job1
 *  |    |__ Job4
 *  |    |     |__ Job7
 *  |    |
 *  |    |__ Job5
 *  |
 *  |__ Job2
 *  |    |__ Job6
 *  |
 *  |__ Job3
 *  |
 */
  1. 层次结构中有3个层级。

  2. 我希望能够同时处理第一层级(Job1、Job2、Job3)。

  3. 一旦它们开始并行处理,每个单独的作业将自行启动处理,等待其处理完成(重要),然后递归地继续处理其子级,直到层次结构结束。 子级依赖于父级处理的数据,因此它们会等待父级处理完成。

  4. 实际“作业”的处理(由父级和子级调用)是异步进行的,因为调用方法是异步的 - 因此不需要“new thread”(Task.StartNew())。

这是我用来演示场景的样本代码 -

public void Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs

    // first level 
    Parallel.ForEach(jobs,
                new ParallelOptions { MaxDegreeOfParallelism = 2 }, // parallelism hardcoded for simplicity
                (job) => ExecuteJob(job));
}

private void ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
    Console.ForegroundColor = ConsoleColor.DarkYellow;
    Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);

    Task t = GetDataAsync(job);
    t.Wait(); // needed such that parent response is received before children start over (?).


    if (job.Children != null)
    {
        job.Children.ToList().ForEach((r) =>
        {
            r.ParentResponse = job.Response; // Children need parent's response
            ExecuteJob(r);
        });
    }
}

private async Task GetDataAsync(WebJob j)
{
    // This is just test code. Ideally it would be an external call to some "async" method
    await Task.Delay(1000);
    j.Response = string.Format("{0} complete", j.Name);
    Console.ForegroundColor = ConsoleColor.Cyan;
    Console.WriteLine("parentResp>> {0} :: {1} Job>> {2} :: {3} Thread>> {4}", j.ParentResponse, "\t", j.Name, "\t", Thread.CurrentThread.ManagedThreadId);
    Console.WriteLine("--------------");
}

private WebJob[] CreateWebJobs()
{
    return new WebJob[] {
        new WebJob() { Id=1, Name = "Job1", ExecURL = "http://url1", 
            Children = new WebJob[] 
            {
                new WebJob() 
                { 
                    Id=2, Name = "Job2", ExecURL = "http://url2", 
                    Children = new WebJob[] 
                    {
                        new WebJob() { Id=4, Name = "Job4", ExecURL = "http://url4" }
                    }
                },
                new WebJob() 
                { 
                    Id=3, Name = "Job3", ExecURL = "http://url3" 
                }
            }
        },
        new WebJob() { Id=5, Name = "Job5", ExecURL = "http://url5"}                
    };
}
  • Process方法首先并行启动所有“第一级”作业。
  • ExecuteJob方法接管并递归遍历子级以完成所有处理。

这个方法可以工作,但我不确定这种递归异步模式是否是一种高效的方法。我考虑避免使用t.Wait()。我尝试在t上使用ContinueWith,但据我理解,这似乎没有任何区别。我也了解了ForEachAsync模式,并想知道它是否适用。这个解决方案最终将成为一个ASP.NET Web API服务。您对这种递归异步模式有什么想法吗?


你在那里重复了Job5。这是表示它依赖于1和4还是打错字了? - Mike Zboray
@mike - 那是个打字错误。已经更正了。谢谢。 - Lalman
2个回答

4
如果你只有一个阻塞操作 GetDataAsync,那么你可以在整个程序中使用异步编程,避免需要调用 Parallel.ForEach 或阻塞的 Wait 调用。
public async Task Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs

    await Task.WhenAll(jobs.Select(ExecuteJob));
}

private async Task ExecuteJob(WebJob job, [CallerMemberName] string memberName = "")
{
    Console.ForegroundColor = ConsoleColor.DarkYellow;
    Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);

    await GetDataAsync(job);

    if (job.Children != null)
    {
        var childTasks = job.Children.Select(r =>
        {
            r.ParentResponse = job.Response;
            return ExecuteJob(r);
        });

        await Task.WhenAll(childTasks);
    }
}

编辑: 如果顶层方法应该阻塞(而不是冒着消费者忘记的风险),请执行以下操作:

public void Process()
{
    WebJob[] jobs = CreateWebJobs(); // dummy jobs

    Task.WaitAll(jobs.Select(ExecuteJob));
}

2
由于您在其中使用了await,所以Process必须是async吗? - juharr
@juharr:没错。最终必须有人await,所以我可以把这个任务留给调用Process的人,或者自己使用Wait来阻塞。 - Douglas
@Douglas - 很有道理,感谢您的想法。我在想是否您曾经想过使用“Parallel.ForEach”来启动至少第一组作业。假设在GetDataAsync能够处理数据之前,我必须对数据进行预处理。这可能会延迟处理,因为它将是顺序的,直到预处理完成(假设需要相当多的CPU周期),并且到达“GetDataAsync”方法? - Lalman
@Lalman:你提出了一个很有说服力的情况。不幸的是,“Parallel.ForEach”与“async”不兼容。最简单的解决方案是在其自己的任务中执行预处理(在“ExecuteJob”开头),并等待其结果:“await Task.Run(()=> Preprocess(job))”。但是,这不允许类似于“MaxDegreeOfParallelism”的功能(除非您定义自己的任务调度程序)。 - Douglas

2

由于您的核心是异步的,所以您不应该使用并行或多线程。您需要的是没有并行性的并发 - 也就是异步并发,通常使用 Task.WhenAll 实现。

这是双重真实的,因为您计划部署到 ASP.NET,而并发会显著降低您的可扩展性。

public async Task ProcessAsync()
{
  WebJob[] jobs = CreateWebJobs();

  await Task.WhenAll(jobs.Select(x => ExecuteJobAsync(x)));
}

private async Task ExecuteJobAsync(WebJob job, [CallerMemberName] string memberName = "")
{
  Console.ForegroundColor = ConsoleColor.DarkYellow;
  Console.WriteLine("Caller> {0} :: {1} Job> {2} :: {3} Thread> {4}", memberName, "\t", job.Name, "\t", Thread.CurrentThread.ManagedThreadId);

  await GetDataAsync(job);
  if (job.Children != null)
  {
    var childTasks = job.Children.Select(async x =>
    {
      x.ParentResponse = job.Response; // Children need parent's response
      await ExecuteJobAsync(x);
    });
    await Task.WhenAll(childTasks);
  }
}

确实...只是匆匆浏览了方法签名而没有仔细查看实际代码。删除了注释。.10...9... - Alexei Levenkov

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接