我们在多个线程上使用INSERT(MERGE)语句时遇到了相同的问题。我们不想为所有事务启用EnableRetryOnFailure()选项,因此编写了以下DbContent扩展方法。
public static async Task<TResult> SaveWithRetryAsync<TResult>(this DbContext context,
Func<Task<TResult>> bulkInsertOperation,
Func<TResult, Task<bool>> verifyBulkOperationSucceeded,
IsolationLevel isolationLevel = IsolationLevel.Unspecified,
int retryLimit = 6,
int maxRetryDelayInSeconds = 30)
{
var existingTransaction = context.Database.CurrentTransaction?.GetDbTransaction();
if (existingTransaction != null)
throw new InvalidOperationException($"Cannot run {nameof(SaveWithRetryAsync)} inside a transaction");
if (context.ChangeTracker.HasChanges())
{
throw new InvalidOperationException(
"DbContext should be saved before running this action to revert only the changes of this action in case of a concurrency conflict.");
}
const int sqlErrorNrOnDuplicatePrimaryKey = 2627;
const int sqlErrorNrOnSnapshotIsolation = 3960;
const int sqlErrorDeadlock = 1205;
int[] sqlErrorsToRetry = { sqlErrorNrOnDuplicatePrimaryKey, sqlErrorNrOnSnapshotIsolation, sqlErrorDeadlock };
var retryState = new SaveWithRetryState<TResult>(bulkInsertOperation);
// Use EF Cores connection resiliency feature for retrying (see https://learn.microsoft.com/en-us/ef/core/miscellaneous/connection-resiliency)
// Usually the IExecutionStrategy is configured DbContextOptionsBuilder.UseSqlServer(..., options.EnableRetryOnFailure()).
// In ASP.NET, the DbContext is configured in Startup.cs and we don't want this retry behaviour everywhere for each db operation.
var executionStrategyDependencies = context.Database.GetService<ExecutionStrategyDependencies>();
var retryStrategy = new CustomSqlServerRetryingExecutionStrategy(executionStrategyDependencies, retryLimit, TimeSpan.FromSeconds(maxRetryDelayInSeconds), sqlErrorsToRetry);
try
{
var result = await retryStrategy.ExecuteInTransactionAsync(
retryState,
async (state, cancelToken) =>
{
try
{
var r = await state.Action();
await context.SaveChangesAsync(false, cancelToken);
if (state.FirstException != null)
{
Log.Logger.Warning(
$"Action passed to {nameof(SaveWithRetryAsync)} failed {state.NumberOfRetries} times " +
$"(retry limit={retryLimit}, ThreadId={Thread.CurrentThread.ManagedThreadId}).\nFirst exception was: {state.FirstException}");
}
state.Result = r;
return r;
}
catch (Exception ex)
{
context.RevertChanges();
state.NumberOfRetries++;
state.FirstException ??= ex;
state.LastException = ex;
throw;
}
},
(state, cancelToken) => verifyBulkOperationSucceeded(retryState.Result),
context.GetSupportedIsolationLevel(isolationLevel));
context.ChangeTracker.AcceptAllChanges();
return result;
}
catch (Exception ex)
{
throw new InvalidOperationException(
$"DB Transaction in {nameof(SaveWithRetryAsync)} failed. " +
$"Tried {retryState.NumberOfRetries} times (retry limit={retryLimit}, ThreadId={Thread.CurrentThread.ManagedThreadId}).\n" +
$"First exception was: {retryState.FirstException}.\nLast exception was: {retryState.LastException}",
ex);
}
}
使用以下CustomSqlServerRetryingExecutionStrategy
public class CustomSqlServerRetryingExecutionStrategy : SqlServerRetryingExecutionStrategy
{
public CustomSqlServerRetryingExecutionStrategy(ExecutionStrategyDependencies executionStrategyDependencies, int retryLimit, TimeSpan fromSeconds, int[] sqlErrorsToRetry)
: base(executionStrategyDependencies, retryLimit, fromSeconds, sqlErrorsToRetry)
{
}
protected override bool ShouldRetryOn(Exception exception)
{
return base.ShouldRetryOn(exception) || base.ShouldRetryOn(exception.GetBaseException());
}
}
保存当前(重试)状态的辅助类:
private class SaveWithRetryState<T>
{
public SaveWithRetryState(Func<Task<T>> action)
{
Action = action;
}
public Exception FirstException { get; set; }
public Exception LastException { get; set; }
public int NumberOfRetries { get; set; }
public Func<Task<T>> Action { get; }
public T Result { get; set; }
}
现在,扩展方法可以按如下方式使用。代码将尝试多次添加大量(5次)。
await _context.SaveWithRetryAsync(
async () =>
{
var listOfAddedItems = new List<string>();
foreach (var item in bulkImport)
{
listOfAddedItems.Add(item.Guid);
await context.Import.AddAsync(item);
}
return listOfAddedItems;
},
listOfAddedItems =>
{
if (listOfAddedItems == null)
return Task.FromResult(true);
return _context.Import.AsNoTracking().AnyAsync(x => x.Guid == listOfAddedItems.First());
},
IsolationLevel.Snapshot,
5,
100);
如果想了解为什么会出现这种情况的背景信息,请查看此讨论:https://github.com/dotnet/efcore/issues/21899
BulkCopy
? - Svyatoslav Danyliv