从同步方法调用异步方法

16

我试图从同步方法中运行异步方法。但由于我在同步方法中,因此无法等待异步方法。我可能没有理解TPL,因为这是我第一次使用它。

private void GetAllData()
{
    GetData1()
    GetData2()
    GetData3()
}

每种方法都需要前一种方法完成,因为第一种方法的数据会用于第二种方法。
但是,在每个方法内部,我想要启动多个Task操作以加快性能。然后,我想等待它们全部完成。 GetData1看起来像这样:
    internal static void GetData1 ()
    {
        const int CONCURRENCY_LEVEL = 15; 
        List<Task<Data>> dataTasks = new List<Task<Data>>();
        for (int item = 0; item < TotalItems; item++)
        {
            dataTasks.Add(MyAyncMethod(State[item]));
        }
        int taskIndex = 0;
        //Schedule tasks to concurency level (or all)
        List<Task<Data>> runningTasks = new List<Task<Data>>();
        while (taskIndex < CONCURRENCY_LEVEL && taskIndex < dataTasks.Count)
        {
            runningTasks.Add(dataTasks[taskIndex]);
            taskIndex++;
        }

        //Start tasks and wait for them to finish
        while (runningTasks.Count > 0)
        {
            Task<Data> dataTask = await Task.WhenAny(runningTasks);
            runningTasks.Remove(dataTask);
            myData = await dataTask;


            //Schedule next concurrent task
            if (taskIndex < dataTasks.Count)
            {
                runningTasks.Add(dataTasks[taskIndex]);
                taskIndex++;
            }
        }
        Task.WaitAll(dataTasks.ToArray()); //This probably isn't necessary
    }

我在这里使用了await,但是出现了错误。

'await'操作符只能在异步方法中使用。请考虑用'async'修改器标记此方法,并将其返回类型更改为'Task'

然而,如果我使用async修饰符,这将成为一个异步操作。因此,如果我的对GetData1的调用不使用await运算符,控制权不会在第一个await时转到GetData2,这正是我想避免的。是否有可能将GetData1保留为调用异步方法的同步方法?我是否设计了异步方法错误?正如您所看到的,我很困惑。

这可能是如何从C#同步方法调用异步方法?的重复问题。然而,我不知道如何应用提供的解决方案,因为我正在启动多个任务,想要WaitAny,对该任务进行一些处理,然后等待所有任务完成后再将控制权交还给调用者。

更新

以下是我根据下面的答案采用的解决方案:

    private static List<T> RetrievePageTaskScheduler<T>(
        List<T> items,
        List<WebPageState> state,
        Func<WebPageState, Task<List<T>>> func)
    {
        int taskIndex = 0;

        // Schedule tasks to concurency level (or all)
        List<Task<List<T>>> runningTasks = new List<Task<List<T>>>();
        while (taskIndex < CONCURRENCY_LEVEL_PER_PROCESSOR * Environment.ProcessorCount
            && taskIndex < state.Count)
        {
            runningTasks.Add(func(state[taskIndex]));
            taskIndex++;
        }

        // Start tasks and wait for them to finish
        while (runningTasks.Count > 0)
        {
            Task<List<T>> task = Task.WhenAny(runningTasks).Result;
            runningTasks.Remove(task);

            try
            {
                items.AddRange(task.Result);
            }
            catch (AggregateException ex)
            {
                /* Throwing this exception means that if one task fails 
                 * don't process any more of them */

                // https://dev59.com/UV_Va4cB1Zd3GeqPRkFz
                System.Runtime.ExceptionServices.ExceptionDispatchInfo.Capture(
                    ex.Flatten().InnerExceptions.First()).Throw();
            }

            // Schedule next concurrent task
            if (taskIndex < state.Count)
            {
                runningTasks.Add(func(state[taskIndex]));
                taskIndex++;
            }
        }

        return items;
    }

Task 类型有两种完全不同的用途:异步工作(例如 async/await)和并行处理(例如 Task.Factory.StartNew/Task.WaitAll)。你正在做什么样的工作(即它是 CPU 绑定的吗)? - Stephen Cleary
我正在获取一个网页(I/O绑定),然后进行CPU绑定的处理。 MyAsyncMethod使用两个等待构建,一个用于I/O绑定工作,另一个用于CPU绑定工作。 - Harrison
3个回答

9

Task<TResult>.Result(或在没有结果时使用Task.Wait())与await类似,但是是同步操作。您应该将GetData1()更改为使用此方法。以下是需要更改的部分:

Task<Data> dataTask = Task.WhenAny(runningTasks).Result;
runningTasks.Remove(dataTask);
myData = gameTask.Result;

感谢您帮助新手。这看起来是一个很好的解决方案,肯定可以解决我的编译错误。我想您删除了 Task.WaitAll(dataTasks.ToArray());,因为您同意它不是必要的,因为当所有的 WhenAny 完成时,这已经保证已经发生了。 - Harrison
@Harrison 实际上,我只是强调了你使用 await 的部分,但现在你提到了这一点,我认为你是正确的。 - Tim S.
如果我从函数中返回结果,我可以直接这样做吗:return Task.WhenAny(runningTasks).Result.Result - Anuj Pandey
如果遇到UI死锁问题:https://dev59.com/QmYq5IYBdhLWcg3wcgLq - Jeremy Ray Brown

9

首先,我建议您的“内部”任务在实现时不要使用Task.Run。您应该使用一个同步执行CPU密集型部分的async方法。

一旦您的MyAsyncMethod是一个执行一些CPU密集型处理的async方法,那么您可以将其包装在一个Task中并使用并行处理,如下所示:

internal static void GetData1()
{
    // Start the tasks
    var dataTasks = Enumerable.Range(0, TotalItems)
        .Select(item => Task.Run(() => MyAyncMethod(State[item]))).ToList();

    // Wait for them all to complete
    Task.WaitAll(dataTasks);
}

你原始代码中的并发限制根本不起作用,为了简化问题,我将其删除了。如果你想应用限制,你可以使用SemaphoreSlim或TPL Dataflow。

1
你可能不是有意的,但你正在关闭循环变量。 - Servy
感谢您的帮助。在阅读实现基于任务的异步模式[http://msdn.microsoft.com/en-us/library/hh873177.aspx]时,它说“在.NET Framework 4.5中,对于CPU绑定的工作,请使用静态Task.Run方法作为TaskFactory.StartNew的快捷方式”。听起来您正在推荐不同的方法。我是否理解正确?由于我的方法是CPU和I/O的混合体,您是否建议我将其拆分为两个部分?此外,您能否解释一下为什么并发限制不起作用?再次感谢。 - Harrison
1
你说 MyAsyncMethod 有一个 CPU 绑定的部分,所以需要使用 Task.Run。但是,应该在调用方法时使用它,而不是在实现中使用它。你不需要将它拆分成两个部分,但你确实有一个(不寻常的)情况:一个“async”方法也是 CPU-bound 的,因此你应该对它进行良好的文档说明。并发限制不起作用,因为一旦调用了 MyAsyncMethod,方法已经在运行了。 - Stephen Cleary
@StephenCleary - 为什么在Select中需要Task.Run?为什么不能只是.Select(item => MyAsyncMethod(State[item])) - Seth Flowers
@sethflowers:该操作指示MyAsyncMethod具有非平凡的CPU绑定部分,因此希望它在线程池上运行。这是Task.Run复杂用例 - Stephen Cleary
@StephenCleary - 谢谢您 - 您的博客文章帮助我更好地理解OP所遇到的问题。 - Seth Flowers

0
您可以调用以下内容:
GetData1().Wait();
GetData2().Wait();
GetData3().Wait();

7
请注意,如果当前定义了 SynchronizationContext,则此代码将发生死锁。 - Servy

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接