C# - 后台服务中的DbContext被释放

3
我有一个WebAPI,应该也能接收来自RabbitMQ的消息。我使用了this教程,因为我知道有时IIS喜欢终止长时间运行的任务(尽管我还没有在服务器上测试过,也许它不起作用)。我有一个处理通过RabbitMQ接收到的消息的服务。我遇到的第一个问题是,我无法将其注入到BackgroundService类中,因此我使用了IServiceScopeFactory。现在,我必须从两个队列中消耗消息,并且据我所知,最佳实践是为此使用两个通道。但是处理是在一个服务中完成的。BackgroundService:
public class ConsumeRabbitMQHostedService : BackgroundService
{
    private IConnection _connection;
    private IModel _firstChannel;
    private IModel _secondChannel;
    private RabbitConfigSection _rabbitConfig;
    public IServiceScopeFactory _serviceScopeFactory;

    public ConsumeRabbitMQHostedService(IOptions<RabbitConfigSection> rabbitConfig, IServiceScopeFactory serviceScopeFactory)
    {
        _rabbitConfig = rabbitConfig.Value;
        _serviceScopeFactory = serviceScopeFactory;
        InitRabbitMQ();
    }

    private void InitRabbitMQ()
    {
        var factory = new ConnectionFactory { HostName = _rabbitConfig.HostName, UserName = _rabbitConfig.UserName, Password = _rabbitConfig.Password };

        
        _connection = factory.CreateConnection();

        
        _firstChannel = _connection.CreateModel();

        _firstChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
        _firstChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, true, false, false, null);
        _firstChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
        _firstChannel.BasicQos(0, 1, false);

        _secondChannel = _connection.CreateModel();

        _secondChannel.ExchangeDeclare(_rabbitConfig.DefaultExchange, ExchangeType.Topic);
        _secondChannel.QueueDeclare(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, true, false, false, null);
        _secondChannel.QueueBind(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, _rabbitConfig.DefaultExchange, "*.test.queue", null);
        _secondChannel.BasicQos(0, 1, false);

        _connection.ConnectionShutdown += RabbitMQ_ConnectionShutdown;
    }
    protected override Task ExecuteAsync(CancellationToken stoppingToken)
    {
        stoppingToken.ThrowIfCancellationRequested();

        var firstConsumer = new EventingBasicConsumer(_firstChannel);
        var secondConsumer = new EventingBasicConsumer(_secondChannel);
        using (var scope = _serviceScopeFactory.CreateScope())
        {
            IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
            firstConsumer.Received += (ch, ea) =>
            {
                // received message  
                var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());

                // handle the received message  
                HandleFirstMessage(content, scoped);
                _firstChannel.BasicAck(ea.DeliveryTag, false);

            };
            firstConsumer.Shutdown += OnConsumerShutdown;
            firstConsumer.Registered += OnConsumerRegistered;
            firstConsumer.Unregistered += OnConsumerUnregistered;
            firstConsumer.ConsumerCancelled += OnConsumerConsumerCancelled;
            _firstChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.FirstItemsConsumeQueue, false, firstConsumer);
        }
        using (var scope = _serviceScopeFactory.CreateScope())
        {
            IIntegrationService scoped = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
            secondConsumer.Received += (ch, ea) =>
            {
                // received message  

                var content = System.Text.Encoding.UTF8.GetString(ea.Body.ToArray());

                // handle the received message  
                HandleSecondMessage(content, scoped);
                _secondChannel.BasicAck(ea.DeliveryTag, false);
            };


            secondConsumer.Shutdown += OnConsumerShutdown;
            secondConsumer.Registered += OnConsumerRegistered;
            secondConsumer.Unregistered += OnConsumerUnregistered;
            secondConsumer.ConsumerCancelled += OnConsumerConsumerCancelled;

            _secondChannel.BasicConsume(_rabbitConfig.Queues.ConsumeQueues.SecondItemsConsumeQueue, false, secondConsumer);
        }
        return Task.CompletedTask;
    }

    private void HandleFirstMessage(string content, IIntegrationService integrationService)
    {
        List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
        integrationService.ImportFirst(dataToImport);
    }

    private void HandleSecondMessage(string content, IIntegrationService integrationService)
    {
        List<Import901Data> importData = JsonConvert.DeserializeObject<List<Import901Data>>(content);
        integrationService.ImportSecond(importData);
    }

    private void OnConsumerConsumerCancelled(object sender, ConsumerEventArgs e) { }
    private void OnConsumerUnregistered(object sender, ConsumerEventArgs e) { }
    private void OnConsumerRegistered(object sender, ConsumerEventArgs e) { }
    private void OnConsumerShutdown(object sender, ShutdownEventArgs e) { }
    private void RabbitMQ_ConnectionShutdown(object sender, ShutdownEventArgs e) { }

    public override void Dispose()
    {
        _firstChannel.Close();
        _connection.Close();
        base.Dispose();
    }
}

在服务中我遇到了以下问题:

System.ObjectDisposedException:“无法访问已释放的上下文实例。此错误的常见原因是释放了从依赖项注入解析的上下文实例,然后在应用程序的其他位置尝试使用同一上下文实例。如果您正在对上下文实例调用“Dispose”或将其包装在using语句中,则可能会发生这种情况。如果您正在使用依赖项注入,则应让依赖项注入容器处理上下文实例的释放。 对象名称:“IntegrationDbContext”。

DbContext 被注入到 IIntegrationService 中。如果我理解得正确,两个(甚至一个)服务实例共享 DbContext,当其中一个完成时会释放 DbContext。我尝试不创建两个实例(所有代码都在一个 using 内),尝试使 IIntegrationService 瞬态,尝试异步执行所有操作(这是初始版本,将其改为同步以进行测试) - 仍然出现相同的错误。我该怎么办?这样做正确吗?

更新1.Startup 中的 ConfigureServices

        public void ConfigureServices(IServiceCollection services)
    {
        var rabbitConfigSection =
            Configuration.GetSection("Rabbit");
        services.Configure<RabbitConfigSection>(rabbitConfigSection);
        services.AddDbContext<SUNDbContext>(options =>
               options.UseSqlServer(Configuration.GetConnectionString("DefaultConnection")));

        services.AddCors();
        services.AddSwaggerGen(c =>
        {
            c.SwaggerDoc("v1", new OpenApiInfo
            {
                Title = "My API",
                Version = "v1"
            });
        });
        services.AddRabbit(Configuration);
        services.AddHostedService<ConsumeRabbitMQHostedService>();
        services.AddControllers();
        services.AddTransient<IIntegrationService, IntegrationService>();// it's transient now, same error with scoped
    }

1
可能不会影响任何事情,但只是因为我注意到了:你在Dispose时关闭了第一个通道,但没有关闭第二个通道,然后关闭了共享的连接。 - Nikki9696
你能否也发布一下你构建/注册DbContext和IIntegrationService的初始代码? - Neil W
1
所以按照这种推理方式,如果你有一个在个别消息的上下文之外解决的持久范围,那么如果在处理消息之前范围被释放会发生什么呢?换句话说,你需要为每个在消息处理程序内处理的消息解决一个单独的范围。 - David L
1
通常情况下你是对的,但在这种情况下你正在创建自己的范围,这会改变生命周期。我添加了一个答案。很高兴能帮到你! - David L
1
在使用块完成之前,它不能被处理。然而,在您的原始代码中,使用块会在处理消息之前完成。 - David L
显示剩余5条评论
1个回答

6
这个问题是由于_serviceScopeFactory.CreateScope()创建的外部scope在每个使用语句后都被释放了,而每个消息仍然试图依靠现在已释放的scope和附加的上下文处理消息所导致的。

解决方法是在您的消息处理程序中为每个消息创建一个新的scope:
private void HandleFirstMessage(string content)
{
    using (var scope = _serviceScopeFactory.CreateScope())
    {
        IIntegrationService integrationService = scope.ServiceProvider.GetRequiredService<IIntegrationService>();
        List<StockImportDto> dataToImport = JsonConvert.DeserializeObject<List<StockImportDto>>(content);
        integrationService.ImportFirst(dataToImport);
    }
}

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接