EF Core多线程+BeginTransaction+Commit导致死锁

4

我有关于SaveChangesAsync()BeginTransaction() + transaction.Commit() 的问题。

我们团队有一个.NET Core worker,通过EF Core 3从Microsoft EventHub接收事件并将数据保存到SQL Server中。
其中一个事件类型有很多数据,因此我们创建了几个表,将数据分离后再保存到这些表中,子表引用了父表的id列(FK_Key)。
在某些条件下,在保存新数据之前必须删除数据库中的一些数据,因此我们进行删除->更新操作。

为了将数据保存到数据库中,我们调用dbContext.Database.BeginTransaction()transaction.Commit()。当我们运行worker时,会出现死锁异常,如Transaction (Process ID 71) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

我发现PurgeDataInChildTables()中的.BatchDeleteAsync()Upsert()中的BulkInsertOrUpdateAsync()之一会抛出死锁异常(每次运行worker时都会更改)。

以下是代码:

public async Task DeleteAndUpsert(List<MyEntity> entitiesToDelete, List<MyEntity> entitiesToUpsert)
{
    if (entitiesToDelete.Any())
        await myRepository.Delete(entitiesToDelete);

    if (entitiesToUpsert.Any())
        await myRepository.Upsert(entitiesToUpsert);
}


public override async Task Upsert(IList<MyEntity> entities)
{
    using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
    {
        using (var transaction = dbContext.Database.BeginTransaction())
        {
            await PurgeDataInChildTables(entities, dbContext);
            await dbContext.BulkInsertOrUpdateAsync(entities);
            // tables that depends on the parent table (FK_Key)
            await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child1>(x => x.Id).ToList());
            await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child2>(x => x.Id).ToList());
            await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany<Child3>(x => x.Id).ToList());
            transaction.Commit();
        }
    }
}

public override async Task Delete(IList<MyEntity> entities)
{
    using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
    {
        using (var transaction = dbContext.Database.BeginTransaction())
        {
            await PurgeDataInChildTables(entities, dbContext);
            await dbContext.BulkDeleteAsync(entities);
            transaction.Commit();
        }
    }
}

private async Task PurgeDataInChildTables(IList<MyEntity> entities, MyDbContext dbContext)
{
    var ids = entities.Select(x => x.Id).ToList();

    await dbContext.Child1.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
    await dbContext.Child2.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
    await dbContext.Child3.Where(x => ids.Contains(x.Id)).BatchDeleteAsync();
}

当 worker 启动时,它会创建四个线程,它们都会对同一个表进行 upsert 操作(也会删除)。因此,我认为当一个线程开始事务,另一个线程开始另一个事务(或类似情况),然后尝试对子表进行 upsert 操作(或从中删除)时,就会发生死锁。

我尝试了一些方法来解决这个问题,并注意到当我删除 BeginTransaction() 并改用 SaveChangesAsync() 时,死锁似乎得到了解决。

以下是修改后的代码:

public override async Task Upsert(IList<MyEntity> entities)
{
    using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
    {
        await PurgeDataInChildTables(entities, dbContext);
        await dbContext.BulkInsertOrUpdateAsync(entities);
        // tables that depends on the parent table (FK_Key)
        await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child1).ToList());
        await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child2).ToList());
        await dbContext.BulkInsertOrUpdateAsync(entities.SelectMany(x => x.Child3).ToList());
        await dbContext.SaveChangesAsync();
    }
}

public override async Task Delete(IList<MyEntity> entities)
{
    using (var dbContext = new MyDbContext(DbContextOptions, DbOptions))
    {
        await PurgeDataInChildTables(entities, dbContext);
        await dbContext.BulkDeleteAsync(entities);
        await dbContext.SaveChangesAsync();
    }
}

在工作程序启动后约30秒钟,死锁问题会发生,但是当我修改了代码后,2-3分钟内没有发生,所以我认为问题已经解决了,不过如果我让worker运行更长的时间,可能仍然会出现问题。

最后,以下是我的问题:

  • 当我使用BeginTransaction()+.Commit()时,会发生死锁,但当我使用SaveChangesAsync()时却不会。为什么?
  • 这两种方法在事务方面有什么区别?
  • 如果修改后的代码仍可能导致死锁或不是一个好的解决方案,我该如何解决?
1个回答

2
很难准确地说,需要查看数据库的分析会话才能确定。需要查找的是哪种锁被占用(其中是“共享”,还是“排他”或“更新”),以及何时实际上打开事务。 我将描述一种需要通过实际数据库分析来证明的理论行为。
当您使用Database.BeginTransaction()包装所有内容时: 隔离级别未由EF设置,而是使用数据库默认隔离级别。 在Microsoft SQL Server的情况下,它将是“读提交”。此隔离级别表示并发事务可以读取数据,但如果正在进行修改,则其他事务将等待其完成,即使它们只想读取。 事务将在调用Commit()之前保持。
当您不明确指定事务时: 选择语句和SaveChangesAsync将导致具有相同隔离级别的单独事务。事务不会比需要更长时间:例如,在SaveChangesAsync的情况下,它将存在于写入所有更改的同时,从调用该方法的时刻开始。
“Transaction (Process ID 71)因锁资源与另一个进程发生死锁,并被选择为死锁牺牲者。请重新运行该事务。”这个消息出现在有几个事务试图访问某些资源时,其中一个尝试读取数据,而另一个尝试修改数据的情况下。在这种情况下,为了避免死锁,数据库将尝试杀死需要回滚较少资源的事务。在您的情况下,是一个试图读取的事务。在回滚的负担方面,读取操作相对轻量级。
总结: 当你有一个巨大的锁,长时间占用一个资源时,它会阻止其他工作者访问该资源,因为数据库在其他工作者尝试读取时会杀死它们的事务,可能在var ids = entities.Select(x => x.Id).ToList();这一点上。当你重新编写代码时,你摆脱了长时间的锁定。此外,从BulkInsertOrUpdateAsync的文档中可以看出,该扩展在每次调用时都使用内部事务,不影响也不涉及EF上下文。如果是这样的话,那么实际的事务甚至比一次对SaveChangesAsync的调用更短,当数据以常规的EF方式而不是使用扩展更改时。

你的回答很有道理。文档中说:“如果我们需要在单个过程中执行多个操作,则应使用显式事务”,因此我不应该删除 BeginTransaction(),因为我想在单个过程中更新/删除多个表中的行。我应该将 .Select(x => .... 移到事务之外,然后可能更改隔离级别以适当的方式。 - Koji
1
@Koji 是的,那应该会影响行为。虽然死锁不太可能消失,因为仍然会出现相同行的修改,并且一个事务将等待另一个事务。真正有用的帮助可能是转移到同步执行和排队。此外,您可以通过利用 SQL Server Management Studio 来检查是否有等待的事务,只需查看活动进程即可 - 它们将处于空闲状态。 - cassandrad
在我的代码中,.NET Core worker 会自动创建多个线程,我认为我无法控制它。我能想到的一件事是使用 lock。这将确保只有一个线程可以同时进行读取/更新/删除操作,但我试图避免使用它,因为它会显著减慢线程的速度。 - Koji
我听说有一种“等待时间”配置,可以确定SQL服务器等待多长时间后才会抛出死锁异常。这是真的吗?如果您知道,请提供一些解释它的网站或文档。虽然这不是理想的解决方案,但我想尝试看看是否存在这样的东西。 - Koji
1
@Koji 不幸的是,没有这样的“等待时间”可以配置,锁定检测是自动进行的,并且在非离散时间间隔内发生;当发现锁定时,异常会立即抛出。您可以尝试使用 ROWLOCK 优化提示,但请注意,在某些情况下,它可能会导致性能下降。因此,在这里手动锁定看起来是一个很好的解决方案,因为无论在哪一侧采取,无论是 C# 代码还是数据库,锁定都将被采取。 - cassandrad

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接