如何在EF Core中实现Select For Update

9
据我所了解,在EF(和EF Core)中没有选项可以显式锁定正在查询的资源,但我经常需要这个功能,并不想每次都回退到编写选择语句。由于我只需要它用于postgres,并且根据规范,FOR UPDATE是查询中的最后一项,因此我认为最容易实现的方法是按照这里描述的获取选择语句:在Linq to Entities中,您可以将IQueryable转换为SQL字符串吗?并附加FOR UPDATE并直接执行它。然而,这将给我带来具有参数占位符或未准备好的查询的查询,这意味着对于postgres,执行计划的缓存实际上无法工作,因此无论如何都不行。
Linq to SQL有方法DataContext.GetCommand,但在EF和特别是EF Core中似乎没有相应的方法。我还看了一下EntityFramework.Extended和它们的批量更新/删除,但由于它们需要将选择语句转换为不同的语句,所以它们需要处理比我更复杂的情况,因此我希望有一个更简单的解决方案。
更新:
如果从描述中不清楚,我想创建一个像这样的扩展方法:
public static IList<T> ForUpdate (this IQueryable<T> me)
{
    // this line is obviously what is missing for me :)
    var theUnderlyingCommand = me.GetTheUnderlyingDbCommandOrSimilar();

    theUnderlyingCommand.Text += "FOR UPDATE";
    return me.ToList();
}

这样,其他开发者就可以像使用所有其他程序一样通过Linq使用EF,而不是运行.ToList(),他们会运行.ForUpdate()。(为了使实现更容易,并且因为FOR UPDATE是postgres支持的最后一个选项,在此之后不应该再有其他内容)

1
EFCore的主要方法是乐观并发而不是锁定,即使用某些列作为并发令牌(或行版本)来确保自读取以来该行未被更新。请注意,目前Npgsql提供程序不支持此功能,但可能很快就会支持(https://github.com/npgsql/Npgsql.EntityFrameworkCore.PostgreSQL/issues/19)。`SELECT FOR UPDATE`似乎需要在EFCore级别上投入相当多的工作,因此我怀疑它不会很快完成,但可以尝试向他们提出问题。 - Shay Rojansky
如果您只是想执行自己的原始SQL,其中您将附加 FOR UPDATE,EFCore提供了FromSql方法,或者您可以转到ADO.NET并做任何您想要的事情(这是一篇好文章:http://www.elanderson.net/2016/04/execute-raw-sql-in-entity-framework-core/)。 - Shay Rojansky
@ShayRojansky 我已经更新了问题,因为它可能不够清楚。我知道我可以执行原始的 SQL,但是我不想在我的应用程序的其余部分处理原始的 SQL——虽然我没有问题在一个专门且经过充分测试的扩展方法中处理它。 - peter
我将进行一项事务,但除了一个表之外,我希望在所有表上都使用读取提交。 - peter
@GertArnold,如果两个并发操作读取一个项目并稍后尝试更新它,则可序列化事务的工作方式完全不同 - 但是,如果我使用“FOR UPDATE”进行读取提交(或可序列化)并且两个操作都读取,则可以获得预期的结果而没有任何问题。 - peter
3个回答

8

这份工作可以使用SQLServer完成(未测试异步方法):

首先,创建一个DbCommandInterceptor(我称之为HintInterceptor.cs)

using System;
using System.Data.Common;
using System.Data.Entity.Infrastructure.Interception;
using System.Text.RegularExpressions;

public class HintInterceptor : DbCommandInterceptor
{
    private static readonly Regex _tableAliasRegex = new Regex(@"(?<tableAlias>FROM +(\[.*\]\.)?(\[.*\]) AS (\[.*\])(?! WITH \(\*HINT\*\)))", RegexOptions.Multiline | RegexOptions.IgnoreCase | RegexOptions.Compiled);

    [ThreadStatic]
    public static string HintValue;

    private static string Replace(string input)
    {
        if (!String.IsNullOrWhiteSpace(HintValue))
        {
            if (!_tableAliasRegex.IsMatch(input))
            {
                throw new InvalidProgramException("Não foi possível identificar uma tabela para ser marcada para atualização(forupdate)!", new Exception(input));
            }
            input = _tableAliasRegex.Replace(input, "${tableAlias} WITH (*HINT*)");
            input = input.Replace("*HINT*", HintValue);
        }
        HintValue = String.Empty;
        return input;
    }

    public override void ScalarExecuting(DbCommand command, DbCommandInterceptionContext<object> interceptionContext)
    {
        command.CommandText = Replace(command.CommandText);
    }

    public override void ReaderExecuting(DbCommand command, DbCommandInterceptionContext<DbDataReader> interceptionContext)
    {
        command.CommandText = Replace(command.CommandText);
    }
}

因此,在Web.config中注册您的拦截器类

<entityFramework>
<defaultConnectionFactory type="System.Data.Entity.Infrastructure.SqlConnectionFactory, EntityFramework" />
<providers>
  <provider invariantName="System.Data.SqlClient" type="System.Data.Entity.SqlServer.SqlProviderServices, EntityFramework.SqlServer" />
</providers>
<interceptors> 
  <interceptor type="Full.Path.Of.Class.HintInterceptor, Dll.Name" />
</interceptors>
</entityFramework>

现在我创建了一个名为HintExtension的静态类。
public static class HintExtension
{
    public static IQueryable<T> WithHint<T>(this IQueryable<T> set, string hint) where T : class
    {
        HintInterceptor.HintValue = hint;
        return set;
    }
    public static IQueryable<T> ForUpdate<T>(this IQueryable<T> set) where T : class
    {
        return set.WithHint("UPDLOCK");
    }
}

我可以在数据库事务中使用它,例如:

using(var trans = context.Database.BeginTransaction())
{
        var query = context.mydbset.Where(a => a.name == "asd").ForUpdate();
        // not locked yet
        var mylist = query.ToList();
        // now are locked for update
        // update the props, call saveChanges() and finally call commit ( or rollback)
        trans.Commit();
        // now are unlocked
}

非常抱歉,我的英语不太好,我希望我的例子能够有所帮助。


1
感谢您的精彩帖子。我认为正则表达式不正确。HINT周围的星号不应该被转义:WITH (*HINT*)))"在FROM后面的空格后面是否需要加上+? - M. Koch
关于转义星号,你是正确的。这将避免多次使用该函数。甚至可能有趣的是特别捕获此错误并显示更友好的异常。 关于from后面的+号,我不知道为什么,但我曾经遇到过在from和表之间有几个空格的情况。 - Gustavo Rossi Muller

4
根据这个问题,在ef core中实现锁提示和其他数据库相关调用并不容易。
我在我的项目中使用以下方法实现了与MsSQL和ef core一起使用的UPDLOCK:
public static class DbContextExtensions
{
    public static string GetUpdLockSqlForEntity<T>(this DbContext dbContext, int entityPk, bool pkContainsTableName = true) where T : class
    {
        var mapping = dbContext.Model.FindEntityType(typeof(T)).Relational();
        var tableName = mapping.TableName;
        var entityPkString = entityPk.ToString();
        string idPrefix = pkContainsTableName ? tableName.Substring(0, tableName.Length - 1) : string.Empty;
        return $"Select 1 from {tableName} with (UPDLOCK) where {idPrefix}Id = {entityPkString}";
    }
}

我们在数据库事务中使用这种方法作为原始SQL调用(锁将在提交或回滚后释放):
using (var dbTran = await DataContext.Database.BeginTransactionAsync(IsolationLevel.ReadCommitted))
{
    try
    {
        await DataContext.Database.ExecuteSqlCommandAsync(DataContext.GetUpdLockSqlForEntity<Deposit>(entityId));
        dbTran.Commit();
    }
    catch (Exception e)
    {
        dbTran.Rollback();
        throw;
    }
}

1

@gustavo-rossi-muller的回答很有用,但缺乏线程安全性,因此无法与EF Core提供的async方法一起使用,例如DbContext.SaveChangesAsync(),因为没有覆盖ScalarExecutingAsync()ReaderExecutingAsync()

在公共静态字段HintValue上使用[ThreadStatic]属性是必要的,以强制每个线程使用自己的HintInterceptor.HintValue变体值,而不是在所有线程(即全局变量)之间共享相同的值。

拦截器文档已经解决了这个问题:

拦截器通常是无状态的,这意味着单个拦截器实例可用于所有DbContext实例。

如果你想在每个DbContext的拦截器实例中保留一些状态,你需要:

此拦截器具有状态:它存储最近查询的每日消息的ID和消息文本,以及执行该查询的时间。由于此状态,我们还需要锁定,因为缓存要求相同的拦截器必须由多个上下文实例使用。

但我们需要的是对每个查询命令控制拦截器的状态,因为我们只需要某些特定的SELECT命令使用FOR UPDATE后缀进行查询,而不是所有导致许多语法错误的命令。

到目前为止,我们只能通过TagWith()向某些查询命令提供一些额外信息,然后在DbCommandInterceptor的覆盖中检测标记添加的注释,为这些查询追加FOR UPDATE提示。

事实上,文档已经提供了一个示例来完成此操作。


我已经修改了那个示例,使用MySQL语法附加FOR UPDATE

private class SelectForUpdateCommandInterceptor : DbCommandInterceptor
{ // https://learn.microsoft.com/en-us/ef/core/logging-events-diagnostics/interceptors#example-command-interception-to-add-query-hints
    public override InterceptionResult<object> ScalarExecuting(DbCommand command, CommandEventData eventData, InterceptionResult<object> result)
    {
        ManipulateCommand(command);
        return result;
    }

    public override ValueTask<InterceptionResult<object>> ScalarExecutingAsync(DbCommand command, CommandEventData eventData, InterceptionResult<object> result, CancellationToken cancellationToken = default)
    {
        ManipulateCommand(command);
        return new(result);
    }

    public override InterceptionResult<DbDataReader> ReaderExecuting(DbCommand command, CommandEventData eventData, InterceptionResult<DbDataReader> result)
    {
        ManipulateCommand(command);
        return result;
    }

    public override ValueTask<InterceptionResult<DbDataReader>> ReaderExecutingAsync(DbCommand command, CommandEventData eventData, InterceptionResult<DbDataReader> result, CancellationToken cancellationToken = default)
    {
        ManipulateCommand(command);
        return new(result);
    }

    private static void ManipulateCommand(IDbCommand command)
    {
        if (command.CommandText.StartsWith("-- ForUpdate", StringComparison.Ordinal))
        {
            command.CommandText += " FOR UPDATE";
        }
    }
}

然后在配置您的DbContext时注入此拦截器:

private static readonly SelectForUpdateCommandInterceptor SelectForUpdateCommandInterceptorInstance = new();
protected override void OnConfiguring(DbContextOptionsBuilder options)
{
    options.AddInterceptors(SelectForUpdateCommandInterceptorInstance);
}

最后,我们可以这样做:

var results = (from e in db.Set<SomeEntity>.TagWith("ForUpdate")
    where e.SomeField == someValue
    select e.SomeField).ToList();
db.Set<SomeEntity>.Add(new SomeEntity {SomeField = 1});
db.SaveChanges();
db.SaveChangesAsync(); // thread safe

直到 EF Core 8,他们仍然没有计划实现这个查询提示后缀:https://github.com/dotnet/efcore/issues/26042,但另一个名为 linq2db 的 linq2sql 表达式转换器已经完成了此操作:

https://github.com/linq2db/linq2db/issues/1276

https://github.com/linq2db/linq2db/pull/3297

https://github.com/linq2db/linq2db/issues/3905

引用


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接