.NET排队任务(使用async/await)

5

我有大量的任务(约1000个)需要执行。我的电脑是4核处理器,所以我想同时并行处理4个任务。

以下是示例代码,供您参考。

class Program
{
    public class LongOperation
    {
        private static readonly Random RandomNumberGenerator = new Random(0);
        const int UpdateFrequencyMilliseconds = 100;

        public int CurrentProgress { get; set; }

        public int TargetProcess { get; set; }

        public LongOperation()
        {
            TargetProcess = RandomNumberGenerator.Next(
                (int)TimeSpan.FromSeconds(5).TotalMilliseconds / UpdateFrequencyMilliseconds, 
                (int)TimeSpan.FromSeconds(10).TotalMilliseconds / UpdateFrequencyMilliseconds);
        }

        public async Task Execute()
        {
            while (!IsCompleted)
            {
                await Task.Delay(UpdateFrequencyMilliseconds);
                CurrentProgress++;
            }
        }

        public bool IsCompleted => CurrentProgress >= TargetProcess;
    }

    static void Main(string[] args)
    {
        Task.Factory.StartNew(async () =>
        {
            var operations = new List<LongOperation>();

            for(var x = 1; x <= 10; x++)
                operations.Add(new LongOperation());

            await ProcessOperations(4, operations);
        }).Wait();
    }

    public static async Task ProcessOperations(int maxSimultaneous, List<LongOperation> operations)
    {
        await Task.WhenAll(operations.Select(x => x.Execute()));
        // TODO: Process up to 4 operations at a time, until every operation is completed.
    }
}

我想请你提供一些关于使用哪些类以及如何构建ProcessOperations以每次处理最多4个操作直到完成所有操作的输入,以便在单个可等待的Task中。

我考虑使用SemaphoreSlim对象,因为它似乎适用于限制资源/进程的速率。


7
你应该研究一下TPL DataFlow,这是微软提供的适用于你正在尝试处理的准确情况的库。你只需要将你的 List<LongOperation> 替换为一个 ActionBlock<LongOperation> 并设置并行限制即可。 - Scott Chamberlain
我知道我在试图发明轮子,只是不知道在哪里找到它。谢谢! - Paul Knopf
1个回答

2

如前所述,您需要使用一个方便的TPL Dataflow库,其中包含两个块,用于在处理消息之前存储消息和实际执行操作:

// storage
var operations = new BufferBlock<LongOperation>();
// no more than 4 actions at the time
var actions = new ActionBlock<LongOperation>(x => x.Execute(),
    new ExecutionDataflowBlockOptions { MaxDegreeOfParallelism = 4 });

// consume new operations automatically
operations.LinkTo(actions);
for(var x = 1; x <= 10; ++x)
{
    // blocking sending
    operations.Post(new LongOperation());
    // awaitable send for async operations
    // await operations.SendAsync(new LongOperation());
}

你还可以通过设置BoundedCapacity选项来为缓冲区引入一些节流限制,例如一次不超过30个操作。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接