不幸的是,在dnx中没有实现HostingEnvironment.QueueBackgroundWorkItem(..),而Task.Run(..)也不安全。
是否有任何优雅的解决方案?
如@axelheer所提到的,IHostedService 是 .NET Core 2.0及以上版本中的首选方法。
我需要一个轻量级的与ASP.NET Core中HostingEnvironment.QueueBackgroundWorkItem相同的替代品,因此我编写了DalSoft.Hosting.BackgroundQueue,它使用.NET Core 2.0的IHostedService。
PM> Install-Package DalSoft.Hosting.BackgroundQueue
在你的ASP.NET Core Startup.cs文件中:
public void ConfigureServices(IServiceCollection services)
{
services.AddBackgroundQueue(onException:exception =>
{
});
}
要将后台任务加入队列,只需将BackgroundQueue
添加到控制器的构造函数中,并调用Enqueue
方法。
public EmailController(BackgroundQueue backgroundQueue)
{
_backgroundQueue = backgroundQueue;
}
[HttpPost, Route("/")]
public IActionResult SendEmail([FromBody]emailRequest)
{
_backgroundQueue.Enqueue(async cancellationToken =>
{
await _smtp.SendMailAsync(emailRequest.From, emailRequest.To, request.Body);
});
return Ok();
}
QueueBackgroundWorkItem
已经被移除,但我们有IApplicationLifetime
代替了IRegisteredObject
,后者被前者使用。我认为这看起来对于这样的场景非常有前途。
这个想法(我仍然不确定它是否是一个相当糟糕的想法;因此,请注意!)是注册一个单例,该单例生成并观察新任务。在该单例中,我们还可以注册一个“停止事件”,以便正确等待仍在运行的任务。
这个“概念”可以用于像日志记录、邮件发送之类的短时间运行的东西。这些事情不应该花费太多时间,但会为当前请求产生不必要的延迟。
public class BackgroundPool
{
protected ILogger<BackgroundPool> Logger { get; }
public BackgroundPool(ILogger<BackgroundPool> logger, IApplicationLifetime lifetime)
{
if (logger == null)
throw new ArgumentNullException(nameof(logger));
if (lifetime == null)
throw new ArgumentNullException(nameof(lifetime));
lifetime.ApplicationStopped.Register(() =>
{
lock (currentTasksLock)
{
Task.WaitAll(currentTasks.ToArray());
}
logger.LogInformation(BackgroundEvents.Close, "Background pool closed.");
});
Logger = logger;
}
private readonly object currentTasksLock = new object();
private readonly List<Task> currentTasks = new List<Task>();
public void SendStuff(Stuff whatever)
{
var task = Task.Run(async () =>
{
Logger.LogInformation(BackgroundEvents.Send, "Sending stuff...");
try
{
// do THE stuff
Logger.LogInformation(BackgroundEvents.SendDone, "Send stuff returns.");
}
catch (Exception ex)
{
Logger.LogError(BackgroundEvents.SendFail, ex, "Send stuff failed.");
}
});
lock (currentTasksLock)
{
currentTasks.Add(task);
currentTasks.RemoveAll(t => t.IsCompleted);
}
}
}
BackgroundPool
应该被注册为单例,并可以通过 DI 被任何其他组件使用。我目前正在使用它来发送邮件,而且它运行良好(在应用程序关闭期间进行了测试)。HttpContext
之类的内容可能不起作用。旧解决方案使用UnsafeQueueUserWorkItem
来禁止这种情况。您可以在 .NET Core 中使用 Hangfire (http://hangfire.io/) 来执行后台作业。
例如:
var jobId = BackgroundJob.Enqueue(
() => Console.WriteLine("Fire-and-forget!"));
HostingEnvironment.QueueBackgroundWorkItem
进行的某些操作,但它是一个显著“沉重”的解决方案,我认为值得在这里提到。 - Todd Menierusing System;
using System.Collections.Generic;
using System.Threading.Tasks;
using Microsoft.AspNetCore.Hosting;
using Microsoft.Extensions.Logging;
namespace Example
{
public class BackgroundPool
{
private readonly ILogger<BackgroundPool> _logger;
private readonly IApplicationLifetime _lifetime;
private readonly object _currentTasksLock = new object();
private readonly List<Task> _currentTasks = new List<Task>();
public BackgroundPool(ILogger<BackgroundPool> logger, IApplicationLifetime lifetime)
{
if (logger == null)
throw new ArgumentNullException(nameof(logger));
if (lifetime == null)
throw new ArgumentNullException(nameof(lifetime));
_logger = logger;
_lifetime = lifetime;
_lifetime.ApplicationStopped.Register(() =>
{
lock (_currentTasksLock)
{
Task.WaitAll(_currentTasks.ToArray());
}
_logger.LogInformation("Background pool closed.");
});
}
public void QueueBackgroundWork(Action action)
{
#pragma warning disable 1998
async Task Wrapper() => action();
#pragma warning restore 1998
QueueBackgroundWork(Wrapper);
}
public void QueueBackgroundWork(Func<Task> func)
{
var task = Task.Run(async () =>
{
_logger.LogTrace("Queuing background work.");
try
{
await func();
_logger.LogTrace("Background work returns.");
}
catch (Exception ex)
{
_logger.LogError(ex.HResult, ex, "Background work failed.");
}
}, _lifetime.ApplicationStopped);
lock (_currentTasksLock)
{
_currentTasks.Add(task);
}
task.ContinueWith(CleanupOnComplete, _lifetime.ApplicationStopping);
}
private void CleanupOnComplete(Task oldTask)
{
lock (_currentTasksLock)
{
_currentTasks.Remove(oldTask);
}
}
}
}
我知道这有点晚了,但我们也遇到了这个问题。所以在阅读了很多想法后,这是我们想出的解决方案。
/// <summary>
/// Defines a simple interface for scheduling background tasks. Useful for UnitTesting ASP.net code
/// </summary>
public interface ITaskScheduler
{
/// <summary>
/// Schedules a task which can run in the background, independent of any request.
/// </summary>
/// <param name="workItem">A unit of execution.</param>
[SecurityPermission(SecurityAction.LinkDemand, Unrestricted = true)]
void QueueBackgroundWorkItem(Action<CancellationToken> workItem);
/// <summary>
/// Schedules a task which can run in the background, independent of any request.
/// </summary>
/// <param name="workItem">A unit of execution.</param>
[SecurityPermission(SecurityAction.LinkDemand, Unrestricted = true)]
void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem);
}
public class BackgroundTaskScheduler : BackgroundService, ITaskScheduler
{
public BackgroundTaskScheduler(ILogger<BackgroundTaskScheduler> logger)
{
_logger = logger;
}
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogTrace("BackgroundTaskScheduler Service started.");
_stoppingToken = stoppingToken;
_isRunning = true;
try
{
await Task.Delay(-1, stoppingToken);
}
catch (TaskCanceledException)
{
}
finally
{
_isRunning = false;
_logger.LogTrace("BackgroundTaskScheduler Service stopped.");
}
}
public void QueueBackgroundWorkItem(Action<CancellationToken> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
if (!_isRunning)
throw new Exception("BackgroundTaskScheduler is not running.");
_ = Task.Run(() => workItem(_stoppingToken), _stoppingToken);
}
public void QueueBackgroundWorkItem(Func<CancellationToken, Task> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
if (!_isRunning)
throw new Exception("BackgroundTaskScheduler is not running.");
_ = Task.Run(async () =>
{
try
{
await workItem(_stoppingToken);
}
catch (Exception e)
{
_logger.LogError(e, "When executing background task.");
throw;
}
}, _stoppingToken);
}
private readonly ILogger _logger;
private volatile bool _isRunning;
private CancellationToken _stoppingToken;
}
ITaskScheduler
(我们已经在旧的ASP.NET客户端代码中定义了它,用于UTest测试目的)允许客户端添加后台任务。 BackgroundTaskScheduler
的主要目的是捕获停止取消标记(由Host拥有),并将其传递到所有后台Task
;根据定义,这些任务在 System.Threading.ThreadPool
中运行,因此无需创建自己的线程池。ExecuteAsync
完成之前都已经完成了,对吧? - StriplingWarrior我已经使用了Quartz.NET(不需要 SQL Server),并使用以下扩展方法轻松设置和运行作业:
public static class QuartzUtils
{
public static async Task<JobKey> CreateSingleJob<JOB>(this IScheduler scheduler,
string jobName, object data) where JOB : IJob
{
var jm = new JobDataMap { { "data", data } };
var jobKey = new JobKey(jobName);
await scheduler.ScheduleJob(
JobBuilder.Create<JOB>()
.WithIdentity(jobKey)
.Build(),
TriggerBuilder.Create()
.WithIdentity(jobName)
.UsingJobData(jm)
.StartNow()
.Build());
return jobKey;
}
}
public class MyJobAsync :IJob
{
public async Task Execute(IJobExecutionContext context)
{
var data = (MyDataType)context.MergedJobDataMap["data"];
....
执行如下:
await SchedulerInstance.CreateSingleJob<MyJobAsync>("JobTitle 123", myData);
public interface IBackgroundTaskQueue
{
ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);
ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken);
}
public class BackgroundTaskQueue : IBackgroundTaskQueue
{
private readonly Channel<Func<CancellationToken, ValueTask>> _queue;
public BackgroundTaskQueue(int capacity)
{
// Capacity should be set based on the expected application load and
// number of concurrent threads accessing the queue.
// BoundedChannelFullMode.Wait will cause calls to WriteAsync() to return a task,
// which completes only when space became available. This leads to backpressure,
// in case too many publishers/calls start accumulating.
var options = new BoundedChannelOptions(capacity)
{
FullMode = BoundedChannelFullMode.Wait
};
_queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
}
public async ValueTask QueueBackgroundWorkItemAsync(
Func<CancellationToken, ValueTask> workItem)
{
if (workItem == null)
{
throw new ArgumentNullException(nameof(workItem));
}
await _queue.Writer.WriteAsync(workItem);
}
public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(
CancellationToken cancellationToken)
{
var workItem = await _queue.Reader.ReadAsync(cancellationToken);
return workItem;
}
}
public class QueuedHostedService : BackgroundService
{
private readonly ILogger<QueuedHostedService> _logger;
public QueuedHostedService(IBackgroundTaskQueue taskQueue,
ILogger<QueuedHostedService> logger)
{
TaskQueue = taskQueue;
_logger = logger;
}
public IBackgroundTaskQueue TaskQueue { get; }
protected override async Task ExecuteAsync(CancellationToken stoppingToken)
{
_logger.LogInformation(
$"Queued Hosted Service is running.{Environment.NewLine}" +
$"{Environment.NewLine}Tap W to add a work item to the " +
$"background queue.{Environment.NewLine}");
await BackgroundProcessing(stoppingToken);
}
private async Task BackgroundProcessing(CancellationToken stoppingToken)
{
while (!stoppingToken.IsCancellationRequested)
{
var workItem =
await TaskQueue.DequeueAsync(stoppingToken);
try
{
await workItem(stoppingToken);
}
catch (Exception ex)
{
_logger.LogError(ex,
"Error occurred executing {WorkItem}.", nameof(workItem));
}
}
}
public override async Task StopAsync(CancellationToken stoppingToken)
{
_logger.LogInformation("Queued Hosted Service is stopping.");
await base.StopAsync(stoppingToken);
}
}
HostingEnvironment.QueueBackgroundWorkItem
原来只需要一行代码就能轻松使用,非常方便。
在ASP Core 2.x中,“新”的方法需要阅读数页晦涩难懂的文档并编写大量代码。
为了避免这种情况,您可以使用以下替代方法:
public static ConcurrentBag<Boolean> bs = new ConcurrentBag<Boolean>();
[HttpPost("/save")]
public async Task<IActionResult> SaveAsync(dynamic postData)
{
var id = (String)postData.id;
Task.Run(() =>
{
bs.Add(Create(id));
});
return new OkResult();
}
private Boolean Create(String id)
{
/// do work
return true;
}
静态的ConcurrentBag<Boolean> bs
将持有对象的引用,这将防止垃圾回收器在控制器返回后收集任务。