NEventStore乐观锁

3
我是NEventStore和事件溯源的新手。在一个项目中,我想使用NEventStore来持久化由我们的聚合生成的事件,但我有一些问题需要正确地处理并发。
如何使用乐观锁写入同一流?
假设我有两个相同聚合的实例,它们从2个不同的线程加载到修订版1。然后第一个线程调用命令A,第二个线程调用命令B。使用乐观锁,其中一个聚合应该失败并出现并发异常。
我考虑使用maxRevision打开从加载聚合的点开始的流,但似乎CommitChanges从未失败,即使我传递旧版本。
我错过了什么?当使用NEventStore/事件溯源时,乐观锁是否可行/正确?
以下是我用于重现问题的代码:
namespace NEventStore.Example
{
    using System;
    using System.Transactions;
    using NEventStore;
    using NEventStore.Dispatcher;
    using NEventStore.Persistence.SqlPersistence.SqlDialects;

    internal static class MainProgram
    {
        private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
        private static IStoreEvents store;

        private static void Main()
        {
            using (var scope = new TransactionScope())
            using (store = WireupEventStore())
            {
                Client1(revision: 0);

                Client2(revision: 0);

                scope.Complete();
            }

            Console.WriteLine(Resources.PressAnyKey);
            Console.ReadKey();
        }

        private static IStoreEvents WireupEventStore()
        {
             return Wireup.Init()
                .UsingInMemoryPersistence()
                .Build();
        }

        private static void Client1(int revision)
        {
            using (var stream = store.OpenStream(StreamId, 0, revision))
            {
                var @event = new SomeDomainEvent { Value = "Client 1 - event 1." };

                stream.Add(new EventMessage { Body = @event });


                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void Client2(int revision)
        {
            using (var stream = store.OpenStream(StreamId, 0, revision))
            {
                var @event = new SomeDomainEvent { Value = "Client 2 - event 1." };

                stream.Add(new EventMessage { Body = @event });


                stream.CommitChanges(Guid.NewGuid());
            }
        }
    }
}

我预计客户端2会失败,因为我使用旧版本打开了流。

更新于2013年8月26日: 我已经使用Sql服务器测试了相同的代码,并且似乎按预期工作。

namespace NEventStore.Example
{
    using System;
    using System.Transactions;
    using NEventStore;
    using NEventStore.Dispatcher;
    using NEventStore.Persistence.SqlPersistence.SqlDialects;

    internal static class MainProgram
    {
        private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
        private static IStoreEvents store;

        private static void Main()
        {
            using (store = WireupEventStore())
            {
                OpenOrCreateStream();

                AppendToStream_Client1(revision: 1);

                AppendToStream_Client2(revision: 1); // throws an error
                // AppendToStream_Client2(revision: 2); // works
            }

            Console.WriteLine(Resources.PressAnyKey);
            Console.ReadKey();
        }

        private static IStoreEvents WireupEventStore()
        {
             return Wireup.Init()
                .LogToOutputWindow()
                .UsingInMemoryPersistence()
                .UsingSqlPersistence("EventStore") // Connection string is in app.config
                    .WithDialect(new MsSqlDialect())
                    .InitializeStorageEngine()
                    .UsingJsonSerialization()
                .Build();
        }

        private static void OpenOrCreateStream()
        {
            using (var stream = store.OpenStream(StreamId, 0, int.MaxValue))
            {
                var @event = new SomeDomainEvent { Value = "Initial event." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void AppendToStream_Client1(int revision)
        {
            using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
            {
                var @event = new SomeDomainEvent { Value = "Second event 1." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }

        private static void AppendToStream_Client2(int revision)
        {
            using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
            {
                var @event = new SomeDomainEvent { Value = "Second event 2." };

                stream.Add(new EventMessage { Body = @event });
                stream.CommitChanges(Guid.NewGuid());
            }
        }
    }
}

回到我的问题:启用乐观锁应该在打开流时使用版本号吗?还有其他可能的实现或指导方针吗?
谢谢。
1个回答

6
首先,内存持久化实现的主要目的是测试,并不支持事务。在您的原始示例中,客户端2将仅将其事件追加到流中。请尝试使用支持事务的持久化存储(SQL & Raven,但不包括Mongo)运行上述示例。
其次,在打开流时指定最小/最大修订版用于不同的目的:
  1. 当重新载入聚合并且没有快照可用时,您会指定(min:0,max:int.MaxValue),因为您想检索所有事件。
  2. 当重新加载聚合并且快照可用时,您会指定(min:snapshot.Version,max:int.MaxValue)以获取自快照以来发生的所有事件。
  3. 当保存聚合时,您会指定(min:0,max:Aggregate.Version)。聚合版本是在重新载入过程中推导出来的。如果同一聚合同时在其他地方重新加载并保存,就会出现竞争条件和ConcurrencyException

大多数支持都将封装在域框架中。请参阅CommonDomain中的AggregateBaseEventStoreRepository

第三,也是最重要的一点,单个事务更新超过1个流是一种代码异味。如果您正在进行DDD / ES,则流表示单个聚合根,根据定义,它是一致性边界。在一个事务中创建/更新多个AR会破坏这个边界。 NEventStore的事务支持是(勉强)添加的,因此可以与其他工具一起使用,即事务性地从MSMQ / NServiceBus /任何位置读取命令并处理它,或者将提交消息事务性地分派到队列并标记为这样。个人而言,我建议您尽力避免2PC。


谢谢Damian。但我不确定是否理解。假设我删除TransactionScope。是否可以处理乐观锁?如何处理?基本上,我只想在此期间没有提交其他事件时才写入流。 - Davide Icardi
我已经使用SQL Server并且没有使用事务更新了问题。现在,如果我传递错误的版本,第二个附加将失败。这是处理此情况的正确方式吗?在这种情况下,我应该在聚合状态中保存版本,并在保存新事件时将其传回。这是预期的实现吗? - Davide Icardi
乐观锁由每个持久化引擎处理。在 SQL 中,它基于 StreamId 和 CommitSequence 的主键。因此,如果您同时打开同一流两次,并向两者都添加提交,则会导致 CommitSequence 冲突和 ConcurrencyException。 - user1010
我发现了InMemoryPersistenceEngine中CommitMessages的并发检查bug https://github.com/NEventStore/NEventStore/commit/3a4dec500d537ae834aa3da7c47ee06ecf579dd3 (这与事务范围无关) - user1010

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接