使用EF Core 2.1和SQL Server正确处理并发。

8
我目前正在使用ASP.NET Core Web API、Entity Framework Core 2.1和SQL Server数据库开发一个API,用于从两个账户A和B之间进行资金转移。由于B账户是接受付款的账户,所以可能会同时执行许多并发请求。如您所知,如果没有很好地管理,这可能导致某些用户无法看到他们的支付到账。
我花了几天时间尝试实现并发处理,但仍无法确定最佳方法。为了简单起见,我创建了一个测试项目,试图重现这个并发问题。
在测试项目中,我有两个路由:request1和request2,每个路由都向同一用户执行转账操作,一个金额为10美元,另一个金额为20美元。我在第一个路由上放置了一个Thread.sleep(10000),具体如下:
    [HttpGet]
    [Route("request1")]
    public async Task<string> request1()
    {
        using (var transaction = _context.Database.BeginTransaction(System.Data.IsolationLevel.Serializable))
        {
            try
            {
                Wallet w = _context.Wallets.Where(ww => ww.UserId == 1).FirstOrDefault();
                Thread.Sleep(10000);
                w.Amount = w.Amount + 10;
                w.Inserts++;
                _context.Wallets.Update(w);
                _context.SaveChanges();
                transaction.Commit();
            }
            catch (Exception ex)
            {
                transaction.Rollback();
            }
        }
        return "request 1 executed";
    }

    [HttpGet]
    [Route("request2")]
    public async Task<string> request2()
    {
        using (var transaction = _context.Database.BeginTransaction(System.Data.IsolationLevel.Serializable))
        {
            try
            {
                Wallet w = _context.Wallets.Where(ww => ww.UserId == 1).FirstOrDefault();
                w.Amount = w.Amount + 20;
                w.Inserts++;
                _context.Wallets.Update(w);
                _context.SaveChanges();
                transaction.Commit();
            }
            catch (Exception ex)
            {
                transaction.Rollback();
            }
        }
        return "request 2 executed";
    }

在浏览器中执行请求1和请求2后,第一个事务被回滚了,原因是:
InvalidOperationException: An exception has been raised that is likely due to a transient failure. Consider enabling transient error resiliency by adding 'EnableRetryOnFailure()' to the 'UseSqlServer' call.

我也可以重试交易,但是没有更好的方法吗?使用锁定?

在文档中,Serializable级别是最隔离且最昂贵的:

当前事务完成之前,其他事务无法修改已被当前事务读取的数据。

这意味着没有其他事务可以更新另一个事务读取的数据,这在此处起到了预期的作用,因为请求2路由中的更新等待第一个事务(请求1)提交。

问题在于,一旦当前事务读取了钱包行,我们需要阻止其他事务的读取,为解决问题,我需要使用锁定,以便当请求1中的第一个选择语句执行时,所有事务都需要等待第一个事务完成,以便它们可以选择正确的值。由于EF Core不支持锁定,因此我需要直接执行SQL查询,因此在选择钱包时,我将向当前选择的行添加行锁定。

//this locks the wallet row with id 1
//and also the default transaction isolation level is enough
Wallet w = _context.Wallets.FromSql("select * from wallets with (XLOCK, ROWLOCK) where id = 1").FirstOrDefault();
Thread.Sleep(10000);
w.Amount = w.Amount + 10;
w.Inserts++;
_context.Wallets.Update(w);
_context.SaveChanges();
transaction.Commit();

现在即使执行多次请求,转账的结果仍然是正确的。此外,我正在使用一个事务表来记录每次转账的状态,以便在发生错误时能够计算出所有钱包的金额。
当然,还有其他做法,比如:
- 存储过程:但我想把逻辑放在应用程序层面。 - 编写同步方法来处理数据库逻辑:这样所有的数据库请求都会在单个线程中执行,我曾经读到一篇博客文章建议使用这种方法,但也许我们会使用多台服务器进行扩展。
我不知道我是否没有搜索到好的材料来处理基于 Entity Framework Core 的悲观并发,即使在浏览 Github 时,我看到的大部分代码也没有使用锁定。
这就带来了我的问题:这是正确的做法吗?
感谢您的耐心阅读。

1
你看到值为20,是因为第二个请求成功了,而第一个请求由于乐观并发失败而被回滚。你不会根据异常做任何事情;一个简单的选择可能是重试事务。然而...你可能想考虑采用事件溯源方法;即在追加日志中存储构成钱包状态的交易,而不是当前状态。就像会计分类帐和几乎所有银行系统一样。 - sellotape
2
无论如何,先阅读一下事件溯源的主题吧;这个主题太大了,我在这里写不下。有一些概念,比如读模型和快照,可以满足您的需求,避免阅读整个日志以获取平衡(这是一个常见的问题)。我可能不夸张地说,几乎每个涉及大量他人资金的系统都基本上是事件溯源的。这不是一个容易采取的步骤,但如果您的系统的非功能性要求需要它,您可能需要考虑它。 - sellotape
@Reda,抱歉,我没有注意到您已经在使用Serializable隔离级别。是的,这种行为有点出乎意料,但我不认为您会遇到这种情况(长时间运行的事务后跟短时间事务)。即使在RC下读取您的钱包行时,您也可以通过使用xlock、rowlock、holdlock实现基本相同的效果,除非EF在幕后超越了自己的能力,否则第二个事务应该会等待第一个锁定完成。我建议设置SQL Profiler跟踪以查看实际命中数据库的内容。 - Roger Wolf
@Reda,为什么呢,这已经有文档了-例如https://learn.microsoft.com/en-us/sql/t-sql/statements/set-transaction-isolation-level-transact-sql。它只是足够复杂,无法适应SO上的评论。如果您认为大量信用会创建瓶颈,可以通过将它们通过队列(自己的或服务代理)进行串行化来解决。只需确保所有借方都根据钱包的当前状态进行检查 :) - Roger Wolf
@Reda 一旦进入队列,您可以查询队列的长度和其中的事务,并返回“处理中”状态以及一个数字作为反馈给用户。此外,如果将任何传入的事务写入日志文件,则在发生关键系统故障时始终可以稍后检索它们。 - Daan
显示剩余12条评论
3个回答

1
我的建议是,当出现 DbUpdateConcurrencyException 异常时,使用 entry.GetDatabaseValues();entry.OriginalValues.SetValues(databaseValues); 来进行重试逻辑处理。无需锁定数据库。
以下是 EF Core documentation 页面的示例:
using (var context = new PersonContext())
{
    // Fetch a person from database and change phone number
    var person = context.People.Single(p => p.PersonId == 1);
    person.PhoneNumber = "555-555-5555";

    // Change the person's name in the database to simulate a concurrency conflict
    context.Database.ExecuteSqlCommand(
        "UPDATE dbo.People SET FirstName = 'Jane' WHERE PersonId = 1");

    var saved = false;
    while (!saved)
    {
        try
        {
            // Attempt to save changes to the database
            context.SaveChanges();
            saved = true;
        }
        catch (DbUpdateConcurrencyException ex)
        {
            foreach (var entry in ex.Entries)
            {
                if (entry.Entity is Person)
                {
                    var proposedValues = entry.CurrentValues;
                    var databaseValues = entry.GetDatabaseValues();

                    foreach (var property in proposedValues.Properties)
                    {
                        var proposedValue = proposedValues[property];
                        var databaseValue = databaseValues[property];

                        // TODO: decide which value should be written to database
                        // proposedValues[property] = <value to be saved>;
                    }

                    // Refresh original values to bypass next concurrency check
                    entry.OriginalValues.SetValues(databaseValues);
                }
                else
                {
                    throw new NotSupportedException(
                        "Don't know how to handle concurrency conflicts for "
                        + entry.Metadata.Name);
                }
            }
        }
    }
}

1
感谢您的回答。您描述的乐观并发控制方法不适用于我的情况。如果使用您的代码出现并发异常,我应该回滚两个事务。这很糟糕,因为两个最终用户都会收到错误消息,指示事务处理失败。转移是一个 2 秒操作... 我相信如果您仔细思考,您会发现这不是一个合适的解决方案。感谢您花费的时间。 - Reda

-1

你可以使用Redis的分布式锁机制来实现。 此外,你还可以通过userId进行锁定,这不会影响其他人的方法。


请再解释详细一些。 - Gert Arnold
基本思想是锁定范围,这保证了同时进行事务。 但是,如果您需要扩展系统,则必须使用分布式锁,实际上它使用的是redis https://github.com/samcook/RedLock.net - murat_yuceer
2
请将此添加到答案中。话虽如此,在问题的情况下,线程锁定通常是一个坏主意,绝对没有必要。 - Gert Arnold
为什么这是个坏主意呢,他们并没有解决问题,只是在出现并发问题时抛出异常。此外,你可以通过userId进行锁定,这不会影响其他人的锁定方法。 - murat_yuceer

-5
为什么不在代码中处理并发问题,而要在数据库层面上处理呢?
你可以编写一个方法,用于更新给定钱包的值,并在此处使用简单锁。就像这样:
private readonly object walletLock = new object();

public void UpdateWalletAmount(int userId, int amount)
{
    lock (balanceLock)
    {
         Wallet w = _context.Wallets.Where(ww => ww.UserId == userId).FirstOrDefault();
         w.Amount = w.Amount + amount;
         w.Inserts++;
         _context.Wallets.Update(w);
         _context.SaveChanges();
    }
}

所以你的方法会像这样:

[HttpGet]
[Route("request1")]
public async Task<string> request1()
{
    try
    {
        UpdateWalletAmount(1, 10);
    }
    catch (Exception ex)
    {
        // log error
    }
    return "request 1 executed";
}

[HttpGet]
[Route("request2")]
public async Task<string> request2()
{
    try
    {
        UpdateWalletAmount(1, 20);
    }
    catch (Exception ex)
    {
        // log error
    }
    return "request 2 executed";
}

在这种情况下,您甚至不需要使用事务。


1
当你扩展时,你可能需要更多的Web服务器。所以我想你明白我为什么不想这样做。 - Reda
问题中明确指出,这不是处理并发的“正确”方式,这将误导未来很多人... - Reda

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接