如何正确地并行运行多个异步任务?

10

如果您需要并行运行多个异步I/O任务,但需要确保同时不超过X个I/O进程正在运行;且前后的I/O处理任务不应该有这样的限制。

这里有一个场景-假设有1000个任务;每个任务都接受一个文本字符串作为输入参数;转换该文本(预 I/O 处理),然后将该转换后的文本写入文件。目标是使预处理逻辑利用 CPU / 核心的 100%,并使最多有 10 个 I/O 进程并行运行(最多同时打开10个文件进行写操作)。

您能提供一个使用 C# / .NET 4.5 实现此功能的示例代码吗?

http://blogs.msdn.com/b/csharpfaq/archive/2012/01/23/using-async-for-file-access-alan-berman.aspx


Rx 2.0可能是一个不错的选择(将第二阶段限制为每次10个),但我对它不够熟悉,不能确定。 :-/ - James Manning
这个回答解决了您的问题吗? 在Parallel.ForEach中嵌套await - Michael Freidgeim
3个回答

9

我认为使用TPL Dataflow是个不错的主意:你可以创建带有无限并行性的预处理和后处理块,以及带有有限并行性的文件写入块,并将它们链接在一起。类似这样:

var unboundedParallelismOptions =
    new ExecutionDataflowBlockOptions
    {
        MaxDegreeOfParallelism = DataflowBlockOptions.Unbounded
    };

var preProcessBlock = new TransformBlock<string, string>(
    s => PreProcess(s), unboundedParallelismOptions);

var writeToFileBlock = new TransformBlock<string, string>(
    async s =>
            {
                await WriteToFile(s);
                return s;
            },
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 10 });

var postProcessBlock = new ActionBlock<string>(
    s => PostProcess(s), unboundedParallelismOptions);

var propagateCompletionOptions =
    new DataflowLinkOptions { PropagateCompletion = true };

preProcessBlock.LinkTo(writeToFileBlock, propagateCompletionOptions);
writeToFileBlock.LinkTo(postProcessBlock, propagateCompletionOptions);

// use something like await preProcessBlock.SendAsync("text") here

preProcessBlock.Complete();
await postProcessBlock.Completion;

WriteToFile()可能如下所示:

private static async Task WriteToFile(string s)
{
    using (var writer = new StreamWriter(GetFileName()))
        await writer.WriteAsync(s);
}

这里的 PreProcessPostProcess 方法是什么? - shashwat
1
@shashwat 他们会做必要的事情。原始问题谈到了“I/O前后处理任务”,因此我使用方法来表示它。 - svick

1

听起来你可能想考虑使用Djikstra信号量来控制任务启动的访问。

然而,这听起来像是一个典型的队列/固定消费者数量的问题,可能更适合采用这种方式进行结构化。


0

我会创建一个扩展方法,使得可以设置最大并行度。SemaphoreSlim 在这里将是救星。

    /// <summary>
    /// Concurrently Executes async actions for each item of <see cref="IEnumerable<typeparamref name="T"/>
    /// </summary>
    /// <typeparam name="T">Type of IEnumerable</typeparam>
    /// <param name="enumerable">instance of <see cref="IEnumerable<typeparamref name="T"/>"/></param>
    /// <param name="action">an async <see cref="Action" /> to execute</param>
    /// <param name="maxDegreeOfParallelism">Optional, An integer that represents the maximum degree of parallelism,
    /// Must be grater than 0</param>
    /// <returns>A Task representing an async operation</returns>
    /// <exception cref="ArgumentOutOfRangeException">If the maxActionsToRunInParallel is less than 1</exception>
    public static async Task ForEachAsyncConcurrent<T>(
        this IEnumerable<T> enumerable,
        Func<T, Task> action,
        int? maxDegreeOfParallelism = null)
    {
        if (maxDegreeOfParallelism.HasValue)
        {
            using (var semaphoreSlim = new SemaphoreSlim(
                maxDegreeOfParallelism.Value, maxDegreeOfParallelism.Value))
            {
                var tasksWithThrottler = new List<Task>();

                foreach (var item in enumerable)
                {
                    // Increment the number of currently running tasks and wait if they are more than limit.
                    await semaphoreSlim.WaitAsync();

                    tasksWithThrottler.Add(Task.Run(async () =>
                    {
                        await action(item).ContinueWith(res =>
                        {
                            // action is completed, so decrement the number of currently running tasks
                            semaphoreSlim.Release();
                        });
                    }));
                }

                // Wait for all tasks to complete.
                await Task.WhenAll(tasksWithThrottler.ToArray());
            }
        }
        else
        {
            await Task.WhenAll(enumerable.Select(item => action(item)));
        }
    }

示例用法:

await enumerable.ForEachAsyncConcurrent(
    async item =>
    {
        await SomeAsyncMethod(item);
    },
    5);

嗨,我在一个线程中使用了它。我尝试使用Abort函数停止线程,但是ForEachAsyncConcurrent任务仍在运行。你有解决这个问题的方法吗? - Tien Nguyen
1
@TienNguyen 我建议将cancelationToken作为ForEachAsyncConcurrent方法的参数添加,并在停止线程时取消它。 - Jay Shah
你能否在示例代码中加入cancelationToken?非常感谢! - Tien Nguyen

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接