EF Core 5中同时进行多个批量合并语句时的事务死锁问题

3

我有一个应用程序,它使用EF Core 5运行多个线程将数据插入到SQL Server 2017数据库表中。

使用EF Core 5插入领域模型实体的C#代码如下:

using (var ctx = this.dbContextFactory.CreateDbContext())
{
    //ctx.Database.AutoTransactionsEnabled = false;
    foreach (var rootEntity in request.RootEntities)
    {
        ctx.ChangeTracker.TrackGraph(rootEntity, node =>
        {
            if ((request.EntityTypes != null && request.EntityTypes.Contains(node.Entry.Entity.GetType()))
                || rootEntity == node.Entry.Entity)
            {
                if (node.Entry.IsKeySet)
                    node.Entry.State = Microsoft.EntityFrameworkCore.EntityState.Modified;
                else
                    node.Entry.State = Microsoft.EntityFrameworkCore.EntityState.Added;
            }
        });
    }
    await ctx.SaveChangesAsync(cancellationToken);
}
        

每个线程负责实例化自己的 DbContext 实例,因此使用 dbContextFactory。

一些 INSERT(MERGE)生成的示例 SQL 如下:

SET NOCOUNT ON;
DECLARE @inserted0 TABLE ([OrderId] bigint, [_Position] [int]);
MERGE [dbo].[Orders] USING (
VALUES (@p0, 0),
(@p1, 1),
(@p2, 2),
...
(@43, 41)) AS i ([SomeColumn],  _Position) ON 1=0
WHEN NOT MATCHED THEN
INSERT ([SomeColumn])
VALUES (i.[SomeColumn])
OUTPUT INSERTED.[OrderId], i._Position
INTO @inserted0;

SELECT [t].[OrderId] FROM [dbo].[Orders] t
INNER JOIN @inserted0 i ON ([t].[OrderId] = [i].[OrderId])
ORDER BY [i].[_Position];

由于这些线程经常同时运行,我会遇到以下SQL异常:
Transaction (Process ID 99) was deadlocked on lock resources with another process and has been chosen as the deadlock victim. Rerun the transaction.

EF Core会隐式地将隔离级别设置为READ COMMITTED。

使用SQL Profiler,事务死锁是由以下原因引起的:

enter image description here

我的担忧:

  1. 令人沮丧的是,EF Core生成的SQL包含两个语句:MERGE和SELECT。我不明白SELECT的目的,因为主键的标识可从@inserted0表变量中获取。根据这个答案,仅使用MERGE语句就足以使其具有原子性。

我认为是这个SELECT导致了事务死锁。

  1. 我尝试通过使用READ COMMITTED SNAPSHOT来避免与主键查找冲突,但即使这个隔离级别应该避免锁定并使用行版本控制,我仍然遇到了相同的错误。

我尝试解决问题的方法:

我能找到的唯一解决此问题的方法是显式地防止EF Core启动事务,因此以下是代码:

ctx.Database.AutoTransactionsEnabled = false;

我已经多次测试过,没有遇到交易死锁的情况。鉴于逻辑仅仅是插入新记录,我相信这是可以完成的。

有人能提供解决此问题的建议吗?

感谢您的时间。


为什么不让不同的线程同时插入到同一张表中呢? - Svyatoslav Danyliv
因为EF Core已经自动将其转换为MERGE而不是INSERT。批量合并40条记录比40个单独的INSERT语句更快。除非在EF Core中有一个选项可以使用单个SQL语句进行批量插入? - user978139
我知道为什么它会翻译成MERGE。我正在努力找出如何避免这种情况。你插入的是哪种类型的数据?也许你只需要通过可用的扩展使用BulkCopy - Svyatoslav Danyliv
1个回答

0

我们在多个线程上使用INSERT(MERGE)语句时遇到了相同的问题。我们不想为所有事务启用EnableRetryOnFailure()选项,因此编写了以下DbContent扩展方法。

  public static async Task<TResult> SaveWithRetryAsync<TResult>(this DbContext context,
                                                                  Func<Task<TResult>> bulkInsertOperation,
                                                                  Func<TResult, Task<bool>> verifyBulkOperationSucceeded,
                                                                  IsolationLevel isolationLevel = IsolationLevel.Unspecified,
                                                                  int retryLimit = 6,
                                                                  int maxRetryDelayInSeconds = 30)
    {
        var existingTransaction = context.Database.CurrentTransaction?.GetDbTransaction();
        if (existingTransaction != null)
            throw new InvalidOperationException($"Cannot run {nameof(SaveWithRetryAsync)} inside a transaction");

        if (context.ChangeTracker.HasChanges())
        {
            throw new InvalidOperationException(
                "DbContext should be saved before running this action to revert only the changes of this action in case of a concurrency conflict.");
        }

        const int sqlErrorNrOnDuplicatePrimaryKey = 2627;
        const int sqlErrorNrOnSnapshotIsolation = 3960;
        const int sqlErrorDeadlock = 1205;
        int[] sqlErrorsToRetry = { sqlErrorNrOnDuplicatePrimaryKey, sqlErrorNrOnSnapshotIsolation, sqlErrorDeadlock };

        var retryState = new SaveWithRetryState<TResult>(bulkInsertOperation);

        // Use EF Cores connection resiliency feature for retrying (see https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency)
        // Usually the IExecutionStrategy is configured DbContextOptionsBuilder.UseSqlServer(..., options.EnableRetryOnFailure()).
        // In ASP.NET, the DbContext is configured in Startup.cs and we don't want this retry behaviour everywhere for each db operation.
        var executionStrategyDependencies = context.Database.GetService<ExecutionStrategyDependencies>();
        var retryStrategy = new CustomSqlServerRetryingExecutionStrategy(executionStrategyDependencies, retryLimit, TimeSpan.FromSeconds(maxRetryDelayInSeconds), sqlErrorsToRetry);

        try
        {
            var result = await retryStrategy.ExecuteInTransactionAsync(
                retryState,
                async (state, cancelToken) =>
                {
                    try
                    {
                        var r = await state.Action();

                        await context.SaveChangesAsync(false, cancelToken);

                        if (state.FirstException != null)
                        {
                            Log.Logger.Warning(
                                $"Action passed to {nameof(SaveWithRetryAsync)} failed {state.NumberOfRetries} times " +
                                $"(retry limit={retryLimit}, ThreadId={Thread.CurrentThread.ManagedThreadId}).\nFirst exception was: {state.FirstException}");
                        }

                        state.Result = r;
                        return r;
                    }
                    catch (Exception ex)
                    {
                        context.RevertChanges();
                        state.NumberOfRetries++;
                        state.FirstException ??= ex;
                        state.LastException = ex;

                        throw;
                    }
                },
                (state, cancelToken) => verifyBulkOperationSucceeded(retryState.Result),
                context.GetSupportedIsolationLevel(isolationLevel));

            context.ChangeTracker.AcceptAllChanges();
            return result;
        }
        catch (Exception ex)
        {
            throw new InvalidOperationException(
                $"DB Transaction in {nameof(SaveWithRetryAsync)} failed. " +
                $"Tried {retryState.NumberOfRetries} times (retry limit={retryLimit}, ThreadId={Thread.CurrentThread.ManagedThreadId}).\n" +
                $"First exception was: {retryState.FirstException}.\nLast exception was: {retryState.LastException}",
                ex);
        }
    }

使用以下CustomSqlServerRetryingExecutionStrategy

 public class CustomSqlServerRetryingExecutionStrategy : SqlServerRetryingExecutionStrategy
{
    public CustomSqlServerRetryingExecutionStrategy(ExecutionStrategyDependencies executionStrategyDependencies, int retryLimit, TimeSpan fromSeconds, int[] sqlErrorsToRetry)
        : base(executionStrategyDependencies, retryLimit, fromSeconds, sqlErrorsToRetry)
    {
    }

    protected override bool ShouldRetryOn(Exception exception)
    {
        //SqlServerRetryingExecutionStrategy does not check the base exception, maybe a bug in EF core ?!
        return base.ShouldRetryOn(exception) || base.ShouldRetryOn(exception.GetBaseException());
    }
}

保存当前(重试)状态的辅助类:

private class SaveWithRetryState<T>
    {
        public SaveWithRetryState(Func<Task<T>> action)
        {
            Action = action;
        }

        public Exception FirstException { get; set; }
        public Exception LastException { get; set; }
        public int NumberOfRetries { get; set; }
        public Func<Task<T>> Action { get; }
        public T Result { get; set; }
    }

现在,扩展方法可以按如下方式使用。代码将尝试多次添加大量(5次)。
 await _context.SaveWithRetryAsync(
            // method to insert the bulk
            async () =>
            {
                var listOfAddedItems = new List<string>();
                foreach (var item in bulkImport)
                {
                    listOfAddedItems.Add(item.Guid);
                    await context.Import.AddAsync(item);
                }

                return listOfAddedItems;
            },
            // method to check if the bulk insert was successful
            listOfAddedItems =>
            {

                if (listOfAddedItems == null)
                    return Task.FromResult(true);
                return _context.Import.AsNoTracking().AnyAsync(x => x.Guid == listOfAddedItems.First());
            },
            IsolationLevel.Snapshot,
            5, // max retry attempts
            100); // max retry time

如果想了解为什么会出现这种情况的背景信息,请查看此讨论:https://github.com/dotnet/efcore/issues/21899


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接