如何将参数传递给排队的后台任务(.NET Core)

10

在我的 Web 应用程序中,我有一个执行时间较长的任务,我想在后台调用这个任务。因此,根据文档 .NET Core 3.1 队列后台任务,我使用以下代码:

public interface IBackgroundTaskQueue
{
    ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem);

    ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken);
}

public class BackgroundTaskQueue : IBackgroundTaskQueue
{
    private readonly Channel<Func<CancellationToken, ValueTask>> _queue;

    public BackgroundTaskQueue(int capacity)
    {
        var options = new BoundedChannelOptions(capacity){FullMode = BoundedChannelFullMode.Wait};
        _queue = Channel.CreateBounded<Func<CancellationToken, ValueTask>>(options);
    }

    public async ValueTask QueueBackgroundWorkItemAsync(Func<CancellationToken, ValueTask> workItem)
    {
        if (workItem == null)throw new ArgumentNullException(nameof(workItem));
        await _queue.Writer.WriteAsync(workItem);
    }

    public async ValueTask<Func<CancellationToken, ValueTask>> DequeueAsync(CancellationToken cancellationToken)
    {
        var workItem = await _queue.Reader.ReadAsync(cancellationToken);
        return workItem;
    }
}

托管服务

public class QueuedHostedService : BackgroundService
{
    private readonly ILogger<QueuedHostedService> _logger;

    public QueuedHostedService(IBackgroundTaskQueue taskQueue, ILogger<QueuedHostedService> logger)
    {
        TaskQueue = taskQueue;
        _logger = logger;
    }

    public IBackgroundTaskQueue TaskQueue { get; }

    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        await BackgroundProcessing(stoppingToken);
    }

    private async Task BackgroundProcessing(CancellationToken stoppingToken)
    {
        while (!stoppingToken.IsCancellationRequested)
        {
            var workItem = await TaskQueue.DequeueAsync(stoppingToken);

            try
            {
                await workItem(stoppingToken);
            }
            catch (Exception ex)
            {
                _logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
            }
        }
    }

    public override async Task StopAsync(CancellationToken stoppingToken)
    {
        _logger.LogInformation("Queued Hosted Service is stopping.");
        await base.StopAsync(stoppingToken);
    }
}

然后我注册所有服务

services.AddHostedService<QueuedHostedService>();
services.AddSingleton<IBackgroundTaskQueue>(new BackgroundTaskQueue(queueCapacity));

然后我可以通过不带参数的调用方式成功使用它,就像这个示例中一样。
public async Task<TenantBo> RegisterCompanyAsync(AddTenantBo addTenantBo)
{
  var tenantBo = new TenantBo();

  try
  {
    _companyRegistrationLogHelper.SetInfoLog(GetTenantId(tenantBo), 
      "Start create company: " + JsonConvert.SerializeObject(addTenantBo));

      InitOnCreateCompanyTasks(tenantBo);

      //skip if already create tenant 
      tenantBo = await CreateTenantAsync(tenantBo, addTenantBo);

      //run in background
      _companyRegistationQueue.QueueBackgroundWorkItemAsync(RunRegistrationCompanyMainAsync);

      return tenantBo;
  }
  catch (Exception e)
  {
    //some logs
    return tenantBo;
  }
}

private async ValueTask RunRegistrationCompanyMainAsync(CancellationToken cancellationToken)
{
  //some await Tasks
}

private async ValueTask RunRegistrationCompanyMainAsync(string tenantId, CancellationToken cancellationToken)
{
  //some await Tasks
}

因此,我只能使用一个参数来调用RunRegistrationCompanyMainAsync(CancellationToken cancellationToken),而无法使用两个参数的RunRegistrationCompanyMainAsync(string tenantId, CancellationToken cancellationToken)。

您能帮助我将字符串参数作为此任务的参数传递吗?

2个回答

21
在调用QueueBackgroundWorkItemAsync(RunRegistrationCompanyMainAsync)时,编译器实际上将方法组转换为委托。但是要提供Func委托的实例,您不仅限于使用方法组,例如可以提供lambda表达式
 var someTenantId = ....
 .....
_companyRegistationQueue.QueueBackgroundWorkItemAsync(ct => RunRegistrationCompanyMainAsync(someTenantId, ct));

谢谢,但这还不够。我需要扩展Func并使用Tuple。 - Nikita Sychou
@NikitaSychou 你说的“不够”是什么意思? - Guru Stron
Channel<Func<string, CancellationToken, ValueTask>> 不够用,但是 Channel<Tuple<string, Func<string, CancellationToken, ValueTask>>> 并且在所有地方使用就足够了。 - Nikita Sychou
2
@NikitaSychou,使用我的解决方案,您不需要 Func<string, CancellationToken, ValueTask>,您可以像对无参数作业一样使用相同的 Func<CancellationToken, ValueTask>。 您的 Tuple 解决方案与编译器实际上在我的解决方案中使用闭包的方式几乎相同,但更麻烦。 - Guru Stron
2
@GuruStron 那对我百分百有效。 - pedrommuller
对我也起作用了。我需要将会话用户传递给工作项。谢谢。 - Tareq

1

经过一段时间的研究,我找到了解决方法。 只需要像这样使用元组即可。

public class CompanyRegistationQueue : ICompanyRegistationQueue
    {
        private readonly Channel<Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>>> _queue;

        public CompanyRegistationQueue(int capacity)
        {
            var options = new BoundedChannelOptions(capacity) { FullMode = BoundedChannelFullMode.Wait };
            _queue = Channel.CreateBounded<Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>>**>(options);
        }

        public async ValueTask QueueBackgroundWorkItemAsync(Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>> workItem)
        {
            if (workItem == null) throw new ArgumentNullException(nameof(workItem));
            await _queue.Writer.WriteAsync(workItem);
        }

        public async ValueTask<Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>>> DequeueAsync(CancellationToken cancellationToken)
        {
            var workItem = await _queue.Reader.ReadAsync(cancellationToken);
            return workItem;
        }
    }

然后像这样调用它。
private async Task BackgroundProcessing(CancellationToken stoppingToken)
        {
            while (!stoppingToken.IsCancellationRequested)
            {
                var workItem = await TaskQueue.DequeueAsync(stoppingToken);

                try
                {
//item2 is task
                    await workItem.Item2(workItem.Item1, stoppingToken);
                }
                catch (Exception ex)
                {
                    _logger.LogError(ex, "Error occurred executing {WorkItem}.", nameof(workItem));
                }
            }
        }

调用代码
var paramValue = new Tuple<CreateCompanyModel, Func<CreateCompanyModel, CancellationToken, ValueTask>>(createCompanyModel, RunRegistrationCompanyMainAsync);
                await _companyRegistationQueue.QueueBackgroundWorkItemAsync(paramValue);

P.S. 可能元组不是最好的解决方案,但它可以工作


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接