在.NET Core中,HostingEnvironment.QueueBackgroundWorkItem的替代方案

72
我们正在使用.NET Core Web Api,并寻求一种轻量级的解决方案来记录具有可变强度的请求并保存到数据库中,但不希望客户端等待保存过程。
不幸的是,在dnx中没有实现HostingEnvironment.QueueBackgroundWorkItem(..),而Task.Run(..)也不安全。
是否有任何优雅的解决方案?

3
"HostingEnvironment.QueueBackgroundWorkItem" 也不安全。它比 "Task.Run" 更安全,但仍不安全。 - Stephen Cleary
一个很好的问题。我自己正在尝试实现一个SignalR进度报告器(使用IProgress接口),但由于SignalR的异步性质,我需要将进度报告处理为任务(虽然是非常短暂的任务),而不会减慢它们所报告的操作。 - Shazi
2
在WebAPI的情况下,您可以简单地使用Response.OnCompleted(Func<Task>)函数,它将添加一个委托,在响应完成后将被调用。 - Larsi
8个回答

28

如@axelheer所提到的,IHostedService 是 .NET Core 2.0及以上版本中的首选方法。

我需要一个轻量级的与ASP.NET Core中HostingEnvironment.QueueBackgroundWorkItem相同的替代品,因此我编写了DalSoft.Hosting.BackgroundQueue,它使用.NET Core 2.0的IHostedService

PM> Install-Package DalSoft.Hosting.BackgroundQueue

在你的ASP.NET Core Startup.cs文件中:

public void ConfigureServices(IServiceCollection services)
{
   services.AddBackgroundQueue(onException:exception =>
   {
                   
   });
}

要将后台任务加入队列,只需将BackgroundQueue添加到控制器的构造函数中,并调用Enqueue方法。

public EmailController(BackgroundQueue backgroundQueue)
{
   _backgroundQueue = backgroundQueue;
}
    
[HttpPost, Route("/")]
public IActionResult SendEmail([FromBody]emailRequest)
{
   _backgroundQueue.Enqueue(async cancellationToken =>
   {
      await _smtp.SendMailAsync(emailRequest.From, emailRequest.To, request.Body);
   });

   return Ok();
}

2
是的,但我会看一下 Microsoft.NET.Sdk.Worker,因为它可以做到我这个包能做的一切,而且更多。 - DalSoft
@DalSoft,是的,我也很困惑,你能解释一下吗?我明白你的意思,但是Worker怎么会“做我包里的所有事情”呢?我没看出来,你能解释一下吗? - Seabizkit
3
抱歉,各位,Queued background tasks 的微软文档已经有所更改。如果你们仍然认为需要一个包装库,请告诉我。 - DalSoft
1
微软将实现方式留给最终用户决定,因此我将继续支持和改进这个包。 - DalSoft
嗨,我尝试了你的包,但仍然会出现有关已释放对象(例如IServiceProvider或IConfigurationService)的错误。有没有办法防止这种情况发生,或者我必须重写我的代码,因为这些在每个请求结束时都被释放了? - marhyno
显示剩余3条评论

17

QueueBackgroundWorkItem已经被移除,但我们有IApplicationLifetime代替了IRegisteredObject,后者被前者使用。我认为这看起来对于这样的场景非常有前途。

这个想法(我仍然不确定它是否是一个相当糟糕的想法;因此,请注意!)是注册一个单例,该单例生成并观察新任务。在该单例中,我们还可以注册一个“停止事件”,以便正确等待仍在运行的任务。

这个“概念”可以用于像日志记录、邮件发送之类的短时间运行的东西。这些事情不应该花费太多时间,但会为当前请求产生不必要的延迟。

public class BackgroundPool
{
    protected ILogger<BackgroundPool> Logger { get; }

    public BackgroundPool(ILogger<BackgroundPool> logger, IApplicationLifetime lifetime)
    {
        if (logger == null)
            throw new ArgumentNullException(nameof(logger));
        if (lifetime == null)
            throw new ArgumentNullException(nameof(lifetime));

        lifetime.ApplicationStopped.Register(() =>
        {
            lock (currentTasksLock)
            {
                Task.WaitAll(currentTasks.ToArray());
            }

            logger.LogInformation(BackgroundEvents.Close, "Background pool closed.");
        });

        Logger = logger;
    }

    private readonly object currentTasksLock = new object();

    private readonly List<Task> currentTasks = new List<Task>();

    public void SendStuff(Stuff whatever)
    {
        var task = Task.Run(async () =>
        {
            Logger.LogInformation(BackgroundEvents.Send, "Sending stuff...");

            try
            {
                // do THE stuff

                Logger.LogInformation(BackgroundEvents.SendDone, "Send stuff returns.");
            }
            catch (Exception ex)
            {
                Logger.LogError(BackgroundEvents.SendFail, ex, "Send stuff failed.");
            }
        });

        lock (currentTasksLock)
        {
            currentTasks.Add(task);

            currentTasks.RemoveAll(t => t.IsCompleted);
        }
    }
}

这样的BackgroundPool应该被注册为单例,并可以通过 DI 被任何其他组件使用。我目前正在使用它来发送邮件,而且它运行良好(在应用程序关闭期间进行了测试)。
注意:在后台任务中访问诸如当前HttpContext之类的内容可能不起作用。旧解决方案使用UnsafeQueueUserWorkItem来禁止这种情况。
你认为呢?
更新:
随着 ASP.NET Core 2.0 的推出,有了新的后台任务功能,而在 ASP.NET Core 2.1 中将会更好:使用 IHostedService 和 BackgroundService 类在 .NET Core 2.x Web 应用程序或微服务中实现后台任务

在您的ApplicationStopped.Register委托中,您实际上并没有等待从“Task.WaitAll(currentTask.ToArray());”返回的任务。这样做是有点毫无意义的。 - Shazi

11

您可以在 .NET Core 中使用 Hangfire (http://hangfire.io/) 来执行后台作业。

例如:

var jobId = BackgroundJob.Enqueue(
    () => Console.WriteLine("Fire-and-forget!"));

4
此解决方案需要 SQL Server。虽然它不能替代您可能使用 HostingEnvironment.QueueBackgroundWorkItem 进行的某些操作,但它是一个显著“沉重”的解决方案,我认为值得在这里提到。 - Todd Menier
@ToddMenier Hangfire 能够与许多不同的存储解决方案一起使用,包括 Redis 和他们现在正在开发的新内存存储。 - Mason G. Zhwiti

6
这是 Axel's answer的优化版本,可以传递委托并更积极地清理完成的任务。
using System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Logging;

namespace Example
{
    public class BackgroundPool
    {
        private readonly ILogger<BackgroundPool> _logger;
        private readonly IApplicationLifetime _lifetime;
        private readonly object _currentTasksLock = new object();
        private readonly List<Task> _currentTasks = new List<Task>();

        public BackgroundPool(ILogger<BackgroundPool> logger, IApplicationLifetime lifetime)
        {
            if (logger == null)
                throw new ArgumentNullException(nameof(logger));
            if (lifetime == null)
                throw new ArgumentNullException(nameof(lifetime));

            _logger = logger;
            _lifetime = lifetime;

            _lifetime.ApplicationStopped.Register(() =>
            {
                lock (_currentTasksLock)
                {
                    Task.WaitAll(_currentTasks.ToArray());
                }

                _logger.LogInformation("Background pool closed.");
            });
        }

        public void QueueBackgroundWork(Action action)
        {
#pragma warning disable 1998
            async Task Wrapper() => action();
#pragma warning restore 1998

            QueueBackgroundWork(Wrapper);
        }

        public void QueueBackgroundWork(Func<Task> func)
        {
            var task = Task.Run(async () =>
            {
                _logger.LogTrace("Queuing background work.");

                try
                {
                    await func();

                    _logger.LogTrace("Background work returns.");
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex.HResult, ex, "Background work failed.");
                }
            }, _lifetime.ApplicationStopped);

            lock (_currentTasksLock)
            {
                _currentTasks.Add(task);
            }

            task.ContinueWith(CleanupOnComplete, _lifetime.ApplicationStopping);
        }

        private void CleanupOnComplete(Task oldTask)
        {
            lock (_currentTasksLock)
            {
                _currentTasks.Remove(oldTask);
            }
        }
    }
}

就像Axel的回答一样,你实际上不需要等待从“Task.WaitAll(currentTask.ToArray());”返回的任务。 - Shazi

1

我知道这有点晚了,但我们也遇到了这个问题。所以在阅读了很多想法后,这是我们想出的解决方案。

    /// <summary>
    /// Defines a simple interface for scheduling background tasks. Useful for UnitTesting ASP.net code
    /// </summary>
    public interface ITaskScheduler
    {
        /// <summary>
        /// Schedules a task which can run in the background, independent of any request.
        /// </summary>
        /// <param name="workItem">A unit of execution.</param>
        [SecurityPermission(SecurityAction.LinkDemand, Unrestricted = true)]
        void QueueBackgroundWorkItem(Action<CancellationToken> workItem);

        /// <summary>
        /// Schedules a task which can run in the background, independent of any request.
        /// </summary>
        /// <param name="workItem">A unit of execution.</param>
        [SecurityPermission(SecurityAction.LinkDemand, Unrestricted = true)]
        void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
    }


    public class BackgroundTaskScheduler : BackgroundService, ITaskScheduler
    {
        public BackgroundTaskScheduler(ILogger<BackgroundTaskScheduler> logger)
        {
            _logger = logger;
        }

        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogTrace("BackgroundTaskScheduler Service started.");

            _stoppingToken = stoppingToken;

            _isRunning = true;
            try
            {
                await Task.Delay(-1, stoppingToken);
            }
            catch (TaskCanceledException)
            {
            }
            finally
            {
                _isRunning = false;
                _logger.LogTrace("BackgroundTaskScheduler Service stopped.");
            }
        }

        public void QueueBackgroundWorkItem(Action<CancellationToken> workItem)
        {
            if (workItem == null)
            {
                throw new ArgumentNullException(nameof(workItem));
            }

            if (!_isRunning)
                throw new Exception("BackgroundTaskScheduler is not running.");

            _ = Task.Run(() => workItem(_stoppingToken), _stoppingToken);
        }

        public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem)
        {
            if (workItem == null)
            {
                throw new ArgumentNullException(nameof(workItem));
            }

            if (!_isRunning)
                throw new Exception("BackgroundTaskScheduler is not running.");

            _ = Task.Run(async () =>
                {
                    try
                    {
                        await workItem(_stoppingToken);
                    }
                    catch (Exception e)
                    {
                        _logger.LogError(e, "When executing background task.");
                        throw;
                    }
                }, _stoppingToken);
        }

        private readonly ILogger _logger;
        private volatile bool _isRunning;
        private CancellationToken _stoppingToken;
    }
ITaskScheduler(我们已经在旧的ASP.NET客户端代码中定义了它,用于UTest测试目的)允许客户端添加后台任务。 BackgroundTaskScheduler 的主要目的是捕获停止取消标记(由Host拥有),并将其传递到所有后台Task;根据定义,这些任务在 System.Threading.ThreadPool 中运行,因此无需创建自己的线程池。
要正确配置托管服务,请参见this post
祝使用愉快!

这会告诉工作项通过CancellationToken停止它们的工作,但它实际上并不确保任何正在运行的工作项在ExecuteAsync完成之前都已经完成了,对吧? - StriplingWarrior
@StriplingWarrior 该代码的功能与ASP.NET类似,即只要IIS未关闭,排队的后台任务将从头到尾运行。如果IIS正在关闭,则取消令牌将被签署,因此所有当前正在运行的任务都将得到通知。 - Kabua
@StriplingWarrior 一个改进的方法可能是保持一个正在运行任务的集合,并且在所有当前正在运行的任务优雅地完成之前不从ExecuteAsync()返回。 - Kabua
1
ASP.NET的QueueBackgroundWorkItem使用BackgroundWorkScheduler,该调度程序会费尽心思确保其IRegisteredObject仅在最后一个正在运行的工作项完成后才被注销。 - StriplingWarrior

0

我已经使用了Quartz.NET(不需要 SQL Server),并使用以下扩展方法轻松设置和运行作业:

public static class QuartzUtils
{
        public static async Task<JobKey> CreateSingleJob<JOB>(this IScheduler scheduler,
            string jobName, object data) where JOB : IJob
        {
            var jm = new JobDataMap { { "data", data } };

            var jobKey = new JobKey(jobName);

            await scheduler.ScheduleJob(
                JobBuilder.Create<JOB>()
                .WithIdentity(jobKey)
                .Build(),

                TriggerBuilder.Create()
                .WithIdentity(jobName)
                .UsingJobData(jm)
                .StartNow()
                .Build());

            return jobKey;
        }
}

数据作为一个必须可序列化的对象进行传递。创建一个处理该作业的IJob,如下所示:
public class MyJobAsync :IJob
{
   public async Task Execute(IJobExecutionContext context)
   {
          var data = (MyDataType)context.MergedJobDataMap["data"];
          ....

执行如下:

await SchedulerInstance.CreateSingleJob<MyJobAsync>("JobTitle 123", myData);

0
Microsoft最近发布了一篇文章ASP.NET Core中使用托管服务的后台任务,日期为5/24/2023。这个例子排队的后台任务与@Dalsoft的解决方案非常相似。
public interface IBackgroundTaskQueue
{
    ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);

    ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

    public BackgroundTaskQueue(int capacity)
    {
        // Capacity should be set based on the expected application load and
        // number of concurrent threads accessing the queue.            
        // BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task,
        // which completes only when space became available. This leads to backpressure,
        // in case too many publishers/calls start accumulating.
        var options = new BoundedChannelOptions(capacity)
        {
            FullMode = BoundedChannelFullMode.Wait
        };
        _queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
    }

    public async ValueTask QueueBackgroundWorkItemAsync(
        Func<CancellationToken, ValueTask> workItem)
    {
        if (workItem == null)
        {
            throw new ArgumentNullException(nameof(workItem));
        }

        await _queue.Writer.WriteAsync(workItem);
    }

    public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
        CancellationToken cancellationToken)
    {
        var workItem = await _queue.Reader.ReadAsync(cancellationToken);

        return workItem;
    }
}


public class QueuedHostedService : BackgroundService
{
    private readonly ILogger<QueuedHostedService> _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, 
        ILogger<QueuedHostedService> logger)
    {
        TaskQueue = taskQueue;
        _logger = logger;
    }

    public IBackgroundTaskQueue TaskQueue { get; }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation(
            $"Queued Hosted Service is running.{Environment.NewLine}" +
            $"{Environment.NewLine}Tap W to add a work item to the " +
            $"background queue.{Environment.NewLine}");

        await BackgroundProcessing(stoppingToken);
    }

    private async Task BackgroundProcessing(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var workItem = 
                await TaskQueue.DequeueAsync(stoppingToken);

            try
            {
                await workItem(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, 
                    "Error occurred executing {WorkItem}.", nameof(workItem));
            }
        }
    }

    public override async Task StopAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Queued Hosted Service is stopping.");

        await base.StopAsync(stoppingToken);
    }
}

-5

HostingEnvironment.QueueBackgroundWorkItem原来只需要一行代码就能轻松使用,非常方便。 在ASP Core 2.x中,“新”的方法需要阅读数页晦涩难懂的文档并编写大量代码。

为了避免这种情况,您可以使用以下替代方法:

    public static ConcurrentBag<Boolean> bs = new ConcurrentBag<Boolean>();

    [HttpPost("/save")]
    public async Task<IActionResult> SaveAsync(dynamic postData)
    {

    var id = (String)postData.id;

    Task.Run(() =>
                {
                    bs.Add(Create(id));
                });

     return new OkResult();

    }


    private Boolean Create(String id)
    {
      /// do work
      return true;
    }

静态的ConcurrentBag<Boolean> bs将持有对象的引用,这将防止垃圾回收器在控制器返回后收集任务。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接