ASP .Net Core 队列化后台任务并行处理

4

我有一个ASP.NET Core Web API,使用了类似于此处描述的队列后台任务。

我使用提供的代码示例,并按照文章中的描述添加了IBackgroundTaskQueueBackgroundTaskQueueQueuedHostedService

在我的Startup.cs文件中,我只注册了一个QueuedHostedService实例,如下所示:services.AddHostedService<QueuedHostedService>();

来自WebApi控制器的任务会被加入队列,接着由QueuedHostedService一个一个出队并执行。

我希望允许多个后台处理线程出队并执行传入的任务。我能想到的最直接的解决方案是在我的Startup.cs文件中注册多个QueuedHostedService实例,就像这样:

 int maxNumOfParallelOperations;
 var isValid = int.TryParse(Configuration["App:MaxNumOfParallelOperations"], out maxNumOfParallelOperations);

 maxNumOfParallelOperations = isValid && maxNumOfParallelOperations > 0 ? maxNumOfParallelOperations : 2;

 for (int index = 0; index < maxNumOfParallelOperations; index++) 
 {
    services.AddHostedService<QueuedHostedService>();
 }

我还注意到,由于BackgroundTaskQueue中的信号量(Semaphore),QueuedHostedService实例并不总是在工作,只有在队列中有新任务可用时才会唤醒。

这个解决方案在我的测试中似乎完全正常。

但是,在这种特定的用例中,它真的是并行处理的有效和推荐解决方案吗?


使用QueuedHostedService是最佳解决方案。不幸的是,您无法完全控制线程池。 请注意,并行度不是由您使用的托管服务数量确定的,而是由线程池中的线程数确定的。如果您使用10个线程,则可以并行执行10个操作。这些线程与webhost共享,并且我认为asp.net core会优先处理Web请求而不是后台任务。 - ddfra
好的,谢谢,我明白了。但是像这里描述的那样注册多个相同后台任务以进行并行处理,这是一个好的做法吗?我的方向是注册最多5个任务,以便在我的服务中允许基本级别的并行处理。 - LioH
1个回答

9

您可以使用一个或多个线程的 IHostedService 消费 IBackgroundTaskQueue

这是一个基本实现。我假设您正在使用相同的 IBackgroundTaskQueueBackgroundTaskQueue,详情请参见此处

public class QueuedHostedService : IHostedService
{
    private readonly ILogger _logger;

    private readonly Task[] _executors;
    private readonly int _executorsCount = 2; //--default value: 2
    private CancellationTokenSource _tokenSource;
    public IBackgroundTaskQueue TaskQueue { get; }

    public QueuedHostedService(IBackgroundTaskQueue taskQueue,
        ILoggerFactory loggerFactory,
        IConfiguration configuration)
    {
        TaskQueue = taskQueue;
        _logger = loggerFactory.CreateLogger<QueuedHostedService>();

        if (ushort.TryParse(configuration["App:MaxNumOfParallelOperations"], out var ct))
        {
            _executorsCount = ct;
        }
        _executors = new Task[_executorsCount];
    }

    public Task StartAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Queued Hosted Service is starting.");

        _tokenSource = CancellationTokenSource.CreateLinkedTokenSource(cancellationToken);

        for (var i = 0; i < _executorsCount; i++)
        {
            var executorTask = new Task(
                async () =>
                {
                    while (!cancellationToken.IsCancellationRequested)
                    {
#if DEBUG
                    _logger.LogInformation("Waiting background task...");
#endif
                    var workItem = await TaskQueue.DequeueAsync(cancellationToken);

                        try
                        {
#if DEBUG
                        _logger.LogInformation("Got background task, executing...");
#endif
                        await workItem(cancellationToken);
                        }
                        catch (Exception ex)
                        {
                            _logger.LogError(ex,
                                "Error occurred executing {WorkItem}.", nameof(workItem)
                            );
                        }
                    }
                }, _tokenSource.Token);

            _executors[i] = executorTask;
            executorTask.Start();
        }

        return Task.CompletedTask;
    }

    public Task StopAsync(CancellationToken cancellationToken)
    {
        _logger.LogInformation("Queued Hosted Service is stopping.");
        _tokenSource.Cancel(); // send the cancellation signal

        if (_executors != null)
        {
            // wait for _executors completion
            Task.WaitAll(_executors, cancellationToken);
        }

        return Task.CompletedTask;
    }
}

您需要在 Startup 类的 ConfigureServices 方法中注册服务。
...
services.AddSingleton<IBackgroundTaskQueue, BackgroundTaskQueue>();
services.AddHostedService<QueuedHostedService>();
...

此外,您可以在配置文件(appsettings.json)中设置线程数。

...
"App": {
    "MaxNumOfParallelOperations": 4
}
...

感谢您提供的解决方案。它非常有效,但即使队列中没有作业,CPU使用率始终很高。我正在运行默认的_executorsCount为5,.net计数器的CPU使用率约为60%。 - batwadi

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接