我是NEventStore和事件溯源的新手。在一个项目中,我想使用NEventStore来持久化由我们的聚合生成的事件,但我有一些问题需要正确地处理并发。
如何使用乐观锁写入同一流?
假设我有两个相同聚合的实例,它们从2个不同的线程加载到修订版1。然后第一个线程调用命令A,第二个线程调用命令B。使用乐观锁,其中一个聚合应该失败并出现并发异常。
我考虑使用maxRevision打开从加载聚合的点开始的流,但似乎CommitChanges从未失败,即使我传递旧版本。
我错过了什么?当使用NEventStore/事件溯源时,乐观锁是否可行/正确?
以下是我用于重现问题的代码:
回到我的问题:启用乐观锁应该在打开流时使用版本号吗?还有其他可能的实现或指导方针吗?
谢谢。
如何使用乐观锁写入同一流?
假设我有两个相同聚合的实例,它们从2个不同的线程加载到修订版1。然后第一个线程调用命令A,第二个线程调用命令B。使用乐观锁,其中一个聚合应该失败并出现并发异常。
我考虑使用maxRevision打开从加载聚合的点开始的流,但似乎CommitChanges从未失败,即使我传递旧版本。
我错过了什么?当使用NEventStore/事件溯源时,乐观锁是否可行/正确?
以下是我用于重现问题的代码:
namespace NEventStore.Example
{
using System;
using System.Transactions;
using NEventStore;
using NEventStore.Dispatcher;
using NEventStore.Persistence.SqlPersistence.SqlDialects;
internal static class MainProgram
{
private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
private static IStoreEvents store;
private static void Main()
{
using (var scope = new TransactionScope())
using (store = WireupEventStore())
{
Client1(revision: 0);
Client2(revision: 0);
scope.Complete();
}
Console.WriteLine(Resources.PressAnyKey);
Console.ReadKey();
}
private static IStoreEvents WireupEventStore()
{
return Wireup.Init()
.UsingInMemoryPersistence()
.Build();
}
private static void Client1(int revision)
{
using (var stream = store.OpenStream(StreamId, 0, revision))
{
var @event = new SomeDomainEvent { Value = "Client 1 - event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void Client2(int revision)
{
using (var stream = store.OpenStream(StreamId, 0, revision))
{
var @event = new SomeDomainEvent { Value = "Client 2 - event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
}
}
我预计客户端2会失败,因为我使用旧版本打开了流。
更新于2013年8月26日: 我已经使用Sql服务器测试了相同的代码,并且似乎按预期工作。
namespace NEventStore.Example
{
using System;
using System.Transactions;
using NEventStore;
using NEventStore.Dispatcher;
using NEventStore.Persistence.SqlPersistence.SqlDialects;
internal static class MainProgram
{
private static readonly Guid StreamId = Guid.NewGuid(); // aggregate identifier
private static IStoreEvents store;
private static void Main()
{
using (store = WireupEventStore())
{
OpenOrCreateStream();
AppendToStream_Client1(revision: 1);
AppendToStream_Client2(revision: 1); // throws an error
// AppendToStream_Client2(revision: 2); // works
}
Console.WriteLine(Resources.PressAnyKey);
Console.ReadKey();
}
private static IStoreEvents WireupEventStore()
{
return Wireup.Init()
.LogToOutputWindow()
.UsingInMemoryPersistence()
.UsingSqlPersistence("EventStore") // Connection string is in app.config
.WithDialect(new MsSqlDialect())
.InitializeStorageEngine()
.UsingJsonSerialization()
.Build();
}
private static void OpenOrCreateStream()
{
using (var stream = store.OpenStream(StreamId, 0, int.MaxValue))
{
var @event = new SomeDomainEvent { Value = "Initial event." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void AppendToStream_Client1(int revision)
{
using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
{
var @event = new SomeDomainEvent { Value = "Second event 1." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
private static void AppendToStream_Client2(int revision)
{
using (var stream = store.OpenStream(StreamId, int.MinValue, revision))
{
var @event = new SomeDomainEvent { Value = "Second event 2." };
stream.Add(new EventMessage { Body = @event });
stream.CommitChanges(Guid.NewGuid());
}
}
}
}
回到我的问题:启用乐观锁应该在打开流时使用版本号吗?还有其他可能的实现或指导方针吗?
谢谢。