如何在使用“每个请求一个会话”时让NHibernate重试死锁事务?

14

当您使用Session-Per-Request模式时,在使用NHibernate的3层应用程序中需要支持事务失败重试的情况下,您将使用什么模式/架构?(因为即使是死锁、超时或活锁异常,ISession在异常发生后也会变得无效)。


你是如何解决你的问题的? - Eugeniu Torica
1个回答

37

注意2 现在我永远不会将写事务放在Web项目中,而是使用消息传递+队列,并在后台使用工作程序处理消息,以实现事务性工作。

然而,对于从Web项目中获取一致数据的读取,我仍然会使用事务,结合MVCC/快照隔离。在这种情况下,您会发现每个请求的会话-每个事务都是完全可以的。

注意1 本文的思想已经被放置在Castle Transactions框架和我的新NHibernate设施中。

好的,这里是一般的想法。假设您想为客户创建一个未完成订单。您有某种GUI,例如浏览器/MVC应用程序,该应用程序使用相关信息创建新的数据结构(或者您从网络中获取此数据结构):

[Serializable]
class CreateOrder /*: IMessage*/
{
    // immutable
    private readonly string _CustomerName;
    private readonly decimal _Total;
    private readonly Guid _CustomerId;

    public CreateOrder(string customerName, decimal total, Guid customerId)
    {
        _CustomerName = customerName;
        _Total = total;
        _CustomerId = customerId;
    }

    // put ProtoBuf attribute
    public string CustomerName
    {
        get { return _CustomerName; }
    }

    // put ProtoBuf attribute
    public decimal Total
    {
        get { return _Total; }
    }

    // put ProtoBuf attribute
    public Guid CustomerId
    {
        get { return _CustomerId; }
    }
}

你需要一些东西去处理它。可能会是某种服务总线中的命令处理程序。诸如“命令处理程序”之类的词汇有很多,你也可以称其为“服务”或“领域服务”或“消息处理程序”。如果你在进行函数式编程,那么它就是你的消息框实现;如果你在使用Erlang或Akka,那么它就是一个Actor。
class CreateOrderHandler : IHandle<CreateOrder>
{
    public void Handle(CreateOrder command)
    {
        With.Policy(IoC.Resolve<ISession>, s => s.BeginTransaction(), s =>
        {
            var potentialCustomer = s.Get<PotentialCustomer>(command.CustomerId);
            potentialCustomer.CreateOrder(command.Total);
            return potentialCustomer;
        }, RetryPolicies.ExponentialBackOff.RetryOnLivelockAndDeadlock(3));
    }
}

interface IHandle<T> /* where T : IMessage */
{
    void Handle(T command);
}

以上展示了一个API在应用状态/事务处理中的使用方式。

With的实现:

static class With
{
    internal static void Policy(Func<ISession> getSession,
                                       Func<ISession, ITransaction> getTransaction,
                                       Func<ISession, EntityBase /* abstract 'entity' base class */> executeAction,
                                       IRetryPolicy policy)
    {
        //http://fabiomaulo.blogspot.com/2009/06/improving-ado-exception-management-in.html

        while (true)
        {
            using (var session = getSession())
            using (var t = getTransaction(session))
            {
                var entity = executeAction(session);
                try
                {
                    // we might not always want to update; have another level of indirection if you wish
                    session.Update(entity);
                    t.Commit();
                    break; // we're done, stop looping
                }
                catch (ADOException e)
                {
                    // need to clear 2nd level cache, or we'll get 'entity associated with another ISession'-exception

                    // but the session is now broken in all other regards will will throw exceptions
                    // if you prod it in any other way
                    session.Evict(entity);

                    if (!t.WasRolledBack) t.Rollback(); // will back our transaction

                    // this would need to be through another level of indirection if you support more databases
                    var dbException = ADOExceptionHelper.ExtractDbException(e) as SqlException;

                    if (policy.PerformRetry(dbException)) continue;
                    throw; // otherwise, we stop by throwing the exception back up the layers
                }
            }
        }
    }
}

正如您所看到的,我们需要一个新的工作单元;每当出现问题时都需要ISession。这就是为什么循环在Using语句/块的外部的原因。拥有函数等同于拥有工厂实例,只不过我们直接在对象实例上调用,而不是调用其方法。我认为这样做可以使调用者API更加友好。

我们希望能够平稳处理如何执行重试,因此我们有一个可以由不同处理程序实现的接口,称为IRetryHandler。对于要强制执行控制流程的每个方面(是的,它非常接近AOP),可以将这些链接在一起。类似于AOP的工作方式,返回值用于控制控制流,但仅以真/假方式进行,这是我们的要求。

interface IRetryPolicy
{
    bool PerformRetry(SqlException ex);
}

聚合根PotentialCustomer是一个有生命周期的实体。你需要用*.hbm.xml文件/FluentNHibernate来映射它。
它有一个方法与发送的命令1:1对应。这使得命令处理程序非常易读。
此外,使用鸭子类型的动态语言,可以将命令的类型名称映射到方法,类似于Ruby/Smalltalk的方式。
如果你正在做事件溯源,事务处理会类似,但是事务不会接口NHibernate的事务。相应的是,你需要通过调用CreateOrder(decimal)来保存创建的事件,并为实体提供从存储中重新读取保存的事件的机制。
最后一个要注意的点是我重写了我创建的三个方法。这是NHibernate方面的要求,因为它需要一种知道实体何时等于另一个实体的方法,是否在集合/包中。更多关于我的实现的内容,请参见此处。无论如何,这只是示例代码,我现在并不关心我的客户,所以我不会去实现它们。
sealed class PotentialCustomer : EntityBase
{
    public void CreateOrder(decimal total)
    {
        // validate total
        // run business rules

        // create event, save into event sourced queue as transient event
        // update private state
    }

    public override bool IsTransient() { throw new NotImplementedException(); }
    protected override int GetTransientHashCode() { throw new NotImplementedException(); }
    protected override int GetNonTransientHashCode() { throw new NotImplementedException(); }
}

我们需要一种创建重试策略的方法。当然,我们可以用很多种方法来实现。这里我将流畅接口与静态方法类型相同的对象实例结合起来。我明确地实现了该接口,以便在流畅接口中不可见其他方法。该接口仅使用下面我提供的“示例”实现。
internal class RetryPolicies : INonConfiguredPolicy
{
    private readonly IRetryPolicy _Policy;

    private RetryPolicies(IRetryPolicy policy)
    {
        if (policy == null) throw new ArgumentNullException("policy");
        _Policy = policy;
    }

    public static readonly INonConfiguredPolicy ExponentialBackOff =
        new RetryPolicies(new ExponentialBackOffPolicy(TimeSpan.FromMilliseconds(200)));

    IRetryPolicy INonConfiguredPolicy.RetryOnLivelockAndDeadlock(int retries)
    {
        return new ChainingPolicy(new[] {new SqlServerRetryPolicy(retries), _Policy});
    }
}

我们需要一个用于部分完成调用的流畅接口。这样可以保证类型安全。因此,在配置策略之前,我们需要两个解引用运算符(即“点”的符号--(.)),远离我们的静态类型。
internal interface INonConfiguredPolicy
{
    IRetryPolicy RetryOnLivelockAndDeadlock(int retries);
}

链接策略可以被解决。其实现要检查所有子代是否返回继续,同时还执行其中的逻辑。

internal class ChainingPolicy : IRetryPolicy
{
    private readonly IEnumerable<IRetryPolicy> _Policies;

    public ChainingPolicy(IEnumerable<IRetryPolicy> policies)
    {
        if (policies == null) throw new ArgumentNullException("policies");
        _Policies = policies;
    }

    public bool PerformRetry(SqlException ex)
    {
        return _Policies.Aggregate(true, (val, policy) => val && policy.PerformRetry(ex));
    }
}

此策略允许当前线程休眠一段时间;有时数据库会过载,如果多个读者/写者持续尝试读取将会对数据库造成拒绝服务攻击(请看几个月前Facebook崩溃的情况,因为他们所有的缓存服务器同时查询了他们的数据库)。

internal class ExponentialBackOffPolicy : IRetryPolicy
{
    private readonly TimeSpan _MaxWait;
    private TimeSpan _CurrentWait = TimeSpan.Zero; // initially, don't wait

    public ExponentialBackOffPolicy(TimeSpan maxWait)
    {
        _MaxWait = maxWait;
    }

    public bool PerformRetry(SqlException ex)
    {
        Thread.Sleep(_CurrentWait);
        _CurrentWait = _CurrentWait == TimeSpan.Zero ? TimeSpan.FromMilliseconds(20) : _CurrentWait + _CurrentWait;
        return _CurrentWait <= _MaxWait;
    }
}

同样,在任何优秀的基于SQL的系统中,我们需要处理死锁。特别是在使用NHibernate时,我们无法深入计划这些问题,除了保持严格的事务策略——没有隐式事务;并小心使用Open-Session-In-View。如果您要获取大量数据,则还需要记住笛卡尔积问题/N+1选择问题。因此,您可能需要使用Multi-Query或HQL的“fetch”关键字。
internal class SqlServerRetryPolicy : IRetryPolicy
{
    private int _Tries;
    private readonly int _CutOffPoint;

    public SqlServerRetryPolicy(int cutOffPoint)
    {
        if (cutOffPoint < 1) throw new ArgumentOutOfRangeException("cutOffPoint");
        _CutOffPoint = cutOffPoint;
    }

    public bool PerformRetry(SqlException ex)
    {
        if (ex == null) throw new ArgumentNullException("ex");
        // checks the ErrorCode property on the SqlException
        return SqlServerExceptions.IsThisADeadlock(ex) && ++_Tries < _CutOffPoint;
    }
}

一个帮助类,使代码更易读。
internal static class SqlServerExceptions
{
    public static bool IsThisADeadlock(SqlException realException)
    {
        return realException.ErrorCode == 1205;
    }
}

不要忘记在IConnectionFactory中处理网络故障(可以通过实现IConnection进行委托处理)。


PS:如果你不仅仅是读取数据,那么每个请求一个Session的模式是有问题的。特别是当你使用相同的ISession进行读写操作时,并且没有按照顺序将读操作全部放在写操作之前。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接