多个AddHostedService dotnet core

8
当我尝试注册超过一个 AddHostedService 时,StartAsync 方法仅调用第一个。
services.AddHostedService<HostServiceBox>(); // StartAsync is called
services.AddHostedService<HostServiceWebSocket>(); // DO NOT WORK StartAsync not called
services.AddHostedService<HostServiceLogging>(); // DO NOT WORK StartAsync not called

1
我认为你需要在这里发布更多的信息。也许你的IHostedService实现(例如HostServiceBox)没有返回Task.CompletedTask - 321X
你找到解决方案了吗? - Krum Bagashev
@KrumBagashev请看下面我的回答。 - btbenjamin
看起来你可以添加一个单例而不是 AddHostedService,像这样:services.AddSingleton<IHostedService, MyService>(); 来源:https://forums.asp.net/t/2156818.aspx?How+to+run+2+services+in+one+time+ - Bob Horn
4个回答

8

好的,现在是2023年,.NET 6已经发布。现在运行多个托管服务不再是问题,只要它们由不同的类表示即可。就像这样:

public class Program
{
    public static void Main(string[] args)
    {
        IHost host = Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.AddHostedService<Worker>();
                services.AddHostedService<Worker2>();
            }).Build();

        host.Run();
    }
}

两个工作线程都会运行。

但是如果我们需要并行运行同一个服务类的多个实例呢?这似乎仍然是不可能的。

请参见相关讨论:https://github.com/dotnet/runtime/issues/38751

最终我编写了自己的实用函数,以启动多个任务并正确收集所有异常。代码如下:

/// <summary>
/// Runs multiple cancelable tasks in parallel. If any of the tasks terminates, all others are cancelled. 
/// </summary>
public static class TaskBunchRunner
{
    public class BunchException : Exception
    {
        public AggregateException Agg { get; }

        public BunchException(AggregateException agg) : base("Task bunch failed", agg)
        {
            Agg = agg;
        }

        public override string Message => $"Task bunch failed: {Agg.Message}";
        public override string ToString() => $"BunchException -> {Agg.ToString()}";
    }
    
    public static async Task Bunch(this IEnumerable<Func<CancellationToken, Task>> taskFns, CancellationToken ct)
    {
        using CancellationTokenSource combinedTcs = CancellationTokenSource.CreateLinkedTokenSource(ct);
        CancellationToken ct1 = combinedTcs.Token;

        Task[] tasks = taskFns.Select(taskFn => Task.Run(() => taskFn(ct1), ct1)).ToArray();
        
        // If any of the tasks terminated, it may be because of an error or a cancellation.
        // In both cases we cancel all of them. 
        await Task.WhenAny(tasks); // this await will never throw
        combinedTcs.Cancel();
        
        var allTask = Task.WhenAll(tasks); // this will collect exceptions in an AggregateException
        try
        {
            await allTask;
        }
        catch (Exception)
        {
            if (allTask.Exception != null) throw new BunchException(allTask.Exception);
            throw; 
        }
        
        // Why not just await Task.WhenAll() and let it throw whatever it is?
        // Because await will unwrap the aggregated exception and rethrow just one of the inner exceptions,
        // losing the information about others. We want all the exceptions to be logged, that is why 
        // we get the aggregated exception from the task. We also throw it wrapped into a custom exception, so the 
        // outer await (in the caller's scope) does not unwrap it again. :facepalm:
    }
}

现在我们创建一个单一的托管服务,并使其ExecuteAsync方法作为一组运行多个任务:
class MySingleService
{
    private readonly string _id;
    public MySingleService(string id){ _id = id;  }
    
    public async Task RunAsync(CancellationToken ct)
    {
        await Task.Delay(500, ct);
        Console.WriteLine($"Message from service {_id}");
        await Task.Delay(500, ct);
    }
}

class MyHostedService: BackgroundService
{
    protected override async Task ExecuteAsync(CancellationToken stoppingToken)
    {
        MySingleService[] individuals = new[]
        {
            new MySingleService("1"),
            new MySingleService("2"),
            new MySingleService("3"),
        };
        
        await individuals
            .Select<MySingleService, Func<CancellationToken, Task>>(s => s.RunAsync)
            .Bunch(stoppingToken);
    }
}

public class Program
{
    public static void Main(string[] args)
    {
        IHost host = Host.CreateDefaultBuilder(args)
            .ConfigureServices((hostContext, services) =>
            {
                services.AddHostedService<MyHostedService>();
            }).Build();

        host.Run();
    }
}

注意事项1:类TaskBunchRunner已从真实项目中取出并证明有效,而使用示例是虚构的并未经测试。
注意事项2:方法Bunch设计用于后台服务,这些服务不会自然完成,它们会一直运行直到被取消或失败。因此,如果一组中的一个任务成功完成,其他任务将被取消(这可能不是您想要的)。如果需要支持完成操作,建议检查WhenAny的结果:如果竞赛获胜者已经完成了,我们需要将其从数组中删除并再次调用WhenAny
我知道这不完全符合OP的要求,但可能对那些遇到与我相同问题的人有用。

4
以下是可行的代码: 我通过创建一个帮助程序来解决这个问题。 @statup.cs
public void ConfigureServices(IServiceCollection services)
        {
            JwtBearerConfiguration(services);

            services.AddCors(options => options.AddPolicy("CorsPolicy", builder =>
            {
                builder
                    .AllowAnyMethod()
                    .AllowAnyHeader()
                    .AllowAnyOrigin()
                    .AllowCredentials();
            }));

            services.AddMvc().SetCompatibilityVersion(CompatibilityVersion.Version_2_1); ;


            services.AddSignalR();

            services.AddHostedService<HostServiceHelper>(); // <===== StartAsync is called

        }

@HostServiceHelper.cs

    public class HostServiceHelper : IHostedService
    {
        private static IHubContext<EngineHub> _hubContext;

        public HostServiceHelper(IHubContext<EngineHub> hubContext)
        {
            _hubContext = hubContext;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            return Task.Run(() =>
            {
                Task.Run(() => ServiceWebSocket(), cancellationToken);

                Task.Run(() => ServiceBox(), cancellationToken);

                Task.Run(() => ServiceLogging(), cancellationToken);

            }, cancellationToken);
        }

        public void ServiceLogging()
        {
        // your own CODE
         }

        public void ServiceWebSocket()
        {
 // your own CODE
        }

        public void ServiceBox()
        {
            // your own CODE
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            //Your logical
            throw new NotImplementedException();
        }
    }

1
这与框架运行多个托管服务不同。一个明显的缺陷是:没有人处理这3个服务中的异常。 - C-F

1

.NET 8 更新

在 .NET 8 之前,所有托管服务都是按顺序运行的,这就是为什么它会等待第一个服务完成后再尝试执行下一个的原因。

但是在 .NET 8 中,您可以通过设置 HostOptions 来轻松解决这个问题。

所以,这个想法是同时运行所有的 StartAsync(以及 StopAsync)方法。

using WorkerService;
 
var host = Host.CreateDefaultBuilder(args)
    .ConfigureServices(services =>
    {
        services.Configure<HostOptions>(options =>
        {
            options.ServicesStartConcurrently = true; //<--- new .NET 8 feature
            options.ServicesStopConcurrently = true; //<--- new .NET 8 feature
        });

        services.AddHostedService<HostServiceBox>();
        services.AddHostedService<HostServiceWebSocket>();
        services.AddHostedService<HostServiceLogging>();
    })
    .Build();
 
host. Run();

我需要将NuGet包Microsoft.Extensions.Hosting更新到8.0版本,才能看到新的属性。我的项目设置为.NET 8,但运行的是7.0版本的包。 - undefined

0

托管服务通常是一个单一的任务,因此我会使用单例模式来实现。

// Hosted Services
services.AddSingleton<IHostedService, HttpGetCurrencyPairRequestSyncingService>();
services.AddSingleton<IHostedService, HttpPostCurrencyPairRequestSyncingService>();

当我在我的课堂上时,

public class CurrencyPairCacheManagementService : BaseHostedService<CurrencyPairCacheManagementService>
        , ICurrencyPairCacheManagementService, IHostedService, IDisposable
    {
        private ICurrencyPairService _currencyPairService;
        private IConnectionMultiplexer _connectionMultiplexer;

        public CurrencyPairCacheManagementService(IConnectionMultiplexer connectionMultiplexer,
            IServiceProvider serviceProvider) : base(serviceProvider)
        {
            _currencyPairService = serviceProvider.GetService<CurrencyPairService>();
            _connectionMultiplexer = connectionMultiplexer;

            InitializeCache(serviceProvider);
        }

        /// <summary>
        /// Operation Procedure for CurrencyPair Cache Management.
        ///
        /// Updates every 5 seconds.
        ///
        /// Objectives:
        /// 1. Pull the latest currency pair dataset from cold storage (DB)
        /// 2. Cross reference checking (MemoryCache vs Cold Storage)
        /// 3. Update Currency pairs
        /// </summary>
        /// <param name="stoppingToken"></param>
        /// <returns></returns>
        /// <exception cref="NotImplementedException"></exception>
        protected override async Task ExecuteAsync(CancellationToken stoppingToken)
        {
            _logger.LogInformation("CurrencyPairCacheManagementService is starting.");

            stoppingToken.Register(() => _logger.LogInformation("CurrencyPairCacheManagementService is stopping."));

            while (!stoppingToken.IsCancellationRequested)
            {
                var currencyPairs = _currencyPairService.GetAllActive();

                await Task.Delay(TimeSpan.FromSeconds(5), stoppingToken);
            }

            _logger.LogWarning("CurrencyPairCacheManagementService background task is stopping.");
        }

        public void InitializeCache(IServiceProvider serviceProvider)
        {
            var currencyPairs = _currencyPairService.GetAllActive();

            // Load them individually to the cache.
            // This way, we won't have to update the entire collection if we were to remove, update or add one.
            foreach (var cPair in currencyPairs)
            {
                // Naming convention => PREFIX + CURRENCYPAIRID
                // Set the object into the cache

            }
        }

        public Task InproPair(CurrencyPair currencyPair)
        {
            throw new NotImplementedException();
        }
    }

在继续执行您想要的操作之前,首先会触发ExecuteAsync。您可能还希望删除我声明的泛型,因为我的基类使用泛型运行(如果您不使用泛型运行托管服务基类,则我认为您不需要显式继承IHostedService和IDisposable)。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接