具有不同线程安全模式的 System.Lazy<T>

14

.NET 4.0的System.Lazy<T>类提供了三种线程安全模式,通过枚举LazyThreadSafetyMode实现,我将其总结如下:

  • LazyThreadSafetyMode.None - 非线程安全。
  • LazyThreadSafetyMode.ExecutionAndPublication - 只有一个并发线程将尝试创建基础值。在成功创建后,所有等待的线程都将接收相同的值。如果在创建过程中出现未处理的异常,则会重新在每个等待的线程上引发该异常,并在每次尝试访问基础值时缓存和重新引发它。
  • LazyThreadSafetyMode.PublicationOnly - 多个并发线程将尝试创建基础值,但首先成功的线程将确定传递给所有线程的值。如果在创建过程中出现未处理的异常,则不会缓存它,并且并发的、随后的尝试访问基础值将重试创建并可能成功。

我想要一个延迟初始化的值,它遵循稍微不同的线程安全规则,即:

只有一个并发线程将尝试创建基础值。在成功创建后,所有等待的线程都将接收相同的值。如果在创建过程中出现未处理的异常,则会在每个等待的线程上重新引发该异常,但不会缓存它,随后的尝试访问基础值将重试创建并可能成功。

因此,与LazyThreadSafetyMode.ExecutionAndPublication的关键区别在于,如果创建的“第一次尝试”失败,可以在以后重新尝试。

现有(.NET 4.0)类是否提供此功能,或者我需要自己实现?如果我需要自己实现,是否有一种聪明的方法在实现中重用现有的Lazy<T>以避免显式锁定/同步?


注:对于用例,请想象“创建”可能是昂贵且容易出错的,例如涉及从远程服务器获取大量数据。我不想尝试同时获取多个数据,因为它们可能全部失败或全部成功。但是,如果它们失败了,我希望能够稍后重试。


1
“稍后重试”这个提示太过模糊。你可以使用一个计时器来重新创建懒加载实例。 - Hans Passant
嗨@HansPassant。我并不是说Lazy<T>本身应该稍后重试。我的意思是,如果用户多次调用myLazy.value,那么如果第一次失败,在第二次调用时它将再次尝试实例化底层值,而不仅仅是重新抛出先前的异常。 - MattBecker82
“后面”是什么时候?它是指“在主线程抛出异常并且所有等待的线程都通过观察到它而被解除阻塞之后的任何时间”吗?这是否由观察您假设的LazyWithSprinkles<T>的任何调用者来管理?这听起来像是一个比您发布的问题稍微大一点的问题,这表明需要一个与Lazy<T>类似但看起来完全不同的解决方案。 - Joe Amenta
例如,您可以拥有Lazy<Task<T>>,在第一次请求时,它会启动一个Task<T>,该任务会重复发出请求,直到最终成功(或抛出无法恢复的错误);然后调用者可以等待该Task<T>尽可能长的时间。虽然这不完全符合您要求的内容,但是,如果在类似于Lazy<T>的想法中抛出异常,则有时会在值准备好之前超时100ms...但这实际上与稍后重试有什么区别呢? - Joe Amenta
另一种不同的方式是,如果所有的调用者在看到第一个异常后都感到沮丧,并且不想再尝试,那么这将浪费资源在后台线程上,可能会无限期地重试...同样,如果不知道使用案例的更大视野,很难说这是否可以接受或者是否值得担心。 - Joe Amenta
显示剩余3条评论
5个回答

4

只有一个并发线程尝试创建底层值。成功创建后,所有等待的线程都将接收相同的值。如果在创建过程中发生未处理的异常,则会在每个等待线程上重新抛出,但不会被缓存,并且后续访问底层值的尝试将重新尝试创建,并可能成功。

由于Lazy不支持此功能,您可以尝试自己编写代码实现:

private static object syncRoot = new object();
private static object value = null;
public static object Value
{
    get
    {
        if (value == null)
        {
            lock (syncRoot)
            {
                if (value == null)
                {
                    // Only one concurrent thread will attempt to create the underlying value.
                    // And if `GetTheValueFromSomewhere` throws an exception, then the value field
                    // will not be assigned to anything and later access
                    // to the Value property will retry. As far as the exception
                    // is concerned it will obviously be propagated
                    // to the consumer of the Value getter
                    value = GetTheValueFromSomewhere();
                }
            }
        }
        return value;
    }
}

更新:

为了满足您关于所有等待读取线程传播相同异常的要求:

private static Lazy<object> lazy = new Lazy<object>(GetTheValueFromSomewhere);
public static object Value
{
    get
    {
        try
        {
            return lazy.Value;
        }
        catch
        {
            // We recreate the lazy field so that subsequent readers
            // don't just get a cached exception but rather attempt
            // to call the GetTheValueFromSomewhere() expensive method
            // in order to calculate the value again
            lazy = new Lazy<object>(GetTheValueFromSomewhere);

            // Re-throw the exception so that all blocked reader threads
            // will get this exact same exception thrown.
            throw;
        }
    }
}

错过了“将在每个等待线程上重新抛出”这一点,但我猜这也不是一个真正的要求。 - Joe Amenta
事实上,如果两个线程在同时尝试读取正在被分配的值(假设GetTheValueFromSomewhere会抛出异常),那么两个线程都会得到两个不同的异常,因为它们最终都会触发GetTheValueFromSomewhere方法。因此,在这个例子中,第二个读取线程将重试重新创建该值。 - Darin Dimitrov
不是线程安全的。两个异常可能同时进入catch块。然后,懒加载将被重新创建两次。 - usr
syncLock 目前的使用方式并不能防止前两个评论中提到的任何问题。它只是确保所有等待的线程必须排成一条单行,依次用自己全新的实例覆盖 lazy 变量。实际上存在竞争条件,其中一个等待的线程已经覆盖了 lazy,另一个调用者进来并在其上阻塞,然后稍后的等待线程用自己的新实例覆盖了 lazy。我认为你需要至少另一个静态变量来克服这个问题。 - Joe Amenta
这里使用 lazy = new Lazy<object>(GetTheValueFromSomewhere); 是危险的,因为它引入了一些有趣的竞态条件。线程1、2、3和4可能同时运行。线程1和2进入catch块。线程1覆盖了Lazy。线程3成功访问了Lazy(由于某种原因它没有抛出异常)。线程2覆盖了Lazy。线程4访问了Lazy(由于某种原因它抛出了异常)。开发人员会想不通为什么3是成功的,而4却不是。考虑使用Interlocked.CompareExchange来减轻这个问题。 - mjwills
显示剩余7条评论

3

类似这样的东西可能会有所帮助:

using System;
using System.Threading;

namespace ADifferentLazy
{
    /// <summary>
    /// Basically the same as Lazy with LazyThreadSafetyMode of ExecutionAndPublication, BUT exceptions are not cached 
    /// </summary>
    public class LazyWithNoExceptionCaching<T>
    {
        private Func<T> valueFactory;
        private T value = default(T);
        private readonly object lockObject = new object();
        private bool initialized = false;
        private static readonly Func<T> ALREADY_INVOKED_SENTINEL = () => default(T);

        public LazyWithNoExceptionCaching(Func<T> valueFactory)
        {
            this.valueFactory = valueFactory;
        }

        public bool IsValueCreated
        {
            get { return initialized; }
        }

        public T Value
        {
            get
            {
                //Mimic LazyInitializer.EnsureInitialized()'s double-checked locking, whilst allowing control flow to clear valueFactory on successful initialisation
                if (Volatile.Read(ref initialized))
                    return value;

                lock (lockObject)
                {
                    if (Volatile.Read(ref initialized))
                        return value;

                    value = valueFactory();
                    Volatile.Write(ref initialized, true);
                }
                valueFactory = ALREADY_INVOKED_SENTINEL;
                return value;
            }
        }
    }
}

你能详细说明一下为什么这些异常不被缓存吗? - Kinin Roza
@KininRoza 因为如果出现异常,value 就不会被写入。 - mjwills

2
Lazy不支持这个功能,这是Lazy的设计问题,因为异常“缓存”意味着该lazy实例将永远不会提供真正的值。这可能会因短暂错误(如网络问题)而使应用程序永久性地崩溃。通常需要人工干预。
我敢打赌,许多.NET应用程序中存在这种问题...
您需要编写自己的懒加载代码来解决此问题。或者,可以在CoreFx Github上开一个Issue来处理此问题。

你为什么认为Lazy<T>存在设计问题?如果你假设Lazy<T>的作用是执行值工厂函数一次并缓存结果,以便每次请求其值时都能使用缓存结果,那么为什么不应该包括抛出未处理异常,如果你给它一个不能处理可恢复异常的值工厂函数呢? - Joe Amenta
1
谢谢,usr。我意识到Lazy<T>不支持这个功能,所以我的问题实际上是是否有另一种现有的机制支持这种行为(或者可以轻松地进行适应)。如果这不清楚,我很抱歉。 - MattBecker82
1
答案是否定的,我应该指出来的。@MattBecker82 没有构建它。很遗憾。 - usr
1
@JoeAmenta 在这么长的时间里我从未想过要这么做。如果你想要那样做,你有点在使用异常来控制流程。所以似乎不太可能会有人想要这样做。如果你想要它,只需在懒加载体中捕获异常并将其作为值返回。就这样。 - usr
1
@JoeAmenta 这取决于您的期望是Lazy是否会调用值工厂一次一次成功。许多开发人员错误地假设它执行后者。 - mjwills
1
懒加载确实缺少“ExecutionRetryWithPublication”模式 - 另一方面,编写一个“改进”的懒加载实现是一天愉快的事情(并为其编写强大的测试!),但遗憾的是没有基础接口/类可用于类型签名 :} - user2864740

1
部分灵感来自Darin's answer,但是我试图让这个“等待线程队列受到异常影响”和“重试”功能正常工作:
private static Task<object> _fetcher = null;
private static object _value = null;

public static object Value
{
    get
    {
        if (_value != null) return _value;
        //We're "locking" then
        var tcs = new TaskCompletionSource<object>();
        var tsk = Interlocked.CompareExchange(ref _fetcher, tcs.Task, null);
        if (tsk == null) //We won the race to set up the task
        {
            try
            {
                var result = new object(); //Whatever the real, expensive operation is
                tcs.SetResult(result);
                _value = result;
                return result;
            }
            catch (Exception ex)
            {
                Interlocked.Exchange(ref _fetcher, null); //We failed. Let someone else try again in the future
                tcs.SetException(ex);
                throw;
            }
        }
        tsk.Wait(); //Someone else is doing the work
        return tsk.Result;
    }
}

我稍微有些担心 - 有没有人能看出任何明显的竞赛,在这里它会以不明显的方式失败?


我想不出一系列操作,会导致这做任何不良的事情...只是一个提醒,如果值没有准备好,tsk.Result 会自己等待,因此显式的 tsk.Wait() 是多余的。 - Joe Amenta

1

我尝试着创建一份版本,参考Darin更新的答案,它不会存在我指出的竞态条件... 警告:我不能完全确定这个版本已经完全没有竞态条件。

private static int waiters = 0;
private static volatile Lazy<object> lazy = new Lazy<object>(GetValueFromSomewhere);
public static object Value
{
    get
    {
        Lazy<object> currLazy = lazy;
        if (currLazy.IsValueCreated)
            return currLazy.Value;

        Interlocked.Increment(ref waiters);

        try
        {
            return lazy.Value;

            // just leave "waiters" at whatever it is... no harm in it.
        }
        catch
        {
            if (Interlocked.Decrement(ref waiters) == 0)
                lazy = new Lazy<object>(GetValueFromSomewhere);
            throw;
        }
    }
}

更新:在发布此文后,我认为我找到了一个竞态条件。只要你可以接受一个假定罕见的情况,在一些线程从成功的快速 Lazy<T>返回之后,另一些线程从慢速 Lazy<T>观察到异常并抛出(未来的请求都将成功),那么这种行为实际上应该是可以接受的。
  • waiters = 0
  • t1: 进入并在Interlocked.Decrement之前停止 (waiters = 1)
  • t2: 进入并在Interlocked.Increment之前停止 (waiters = 1)
  • t1: 执行Interlocked.Decrement并准备覆盖 (waiters = 0)
  • t2: 在Interlocked.Decrement之前停止 (waiters = 1)
  • t1: 用新的lazy1覆盖lazy (waiters = 1)
  • t3: 进入并阻塞在lazy1上 (waiters = 2)
  • t2: 执行其Interlocked.Decrement (waiters = 1)
  • t3: 获取并返回lazy1的值 (waiters现在不重要)
  • t2: 重新抛出其异常

我无法想出一系列事件会导致比“此线程在另一个线程产生成功结果后抛出异常”更糟糕的情况。

更新2:将lazy声明为volatile以确保所有读取者立即看到受保护的覆盖。有些人(包括我自己)看到volatile就会立刻认为“这可能被错误使用”,他们通常是正确的。我在这里使用它的原因是:在上面示例中的事件序列中,如果t3恰好在t1修改lazy以包含lazy1的那一刻之前定位到lazy.Value的读取,它仍然可以读取旧的lazy而不是lazy1volatile可以防止这种情况发生,以便下一次尝试可以立即开始。

我也提醒自己,为什么我在写原始答案时一直心中有个声音说“低锁并发编程很难,只需使用C# lock语句!”。

更新3:刚刚更改了更新2中的某些文本,指出实际情况使得 volatile 必要--此处使用的 Interlocked 操作显然在今天重要的CPU架构上实现为全栅栏而不是我最初只是有点假设的半栅栏,因此 volatile 保护的范围比我最初想象的要窄得多。

看起来是一个优雅的解决方案,我不介意你描述的竞态条件。然而,如果我们真的想要排除它,是否只需添加 static object syncRoot = new object(); 并将 Interlocked.Increment(ref _waiters); 更改为 lock(syncRoot) {++waiters;},并且还要将 if 语句更改为 lock(syncRoot) { if(--waiters == 0) _lazy = new Lazy<object>(GetValueFromSomewhere); } - MattBecker82
@MattBecker82:我认为这并没有帮助。最好的情况下(不确定),它允许我们从字段声明中删除“volatile”,因为“Monitor.Enter”执行了完整的内存屏障。我无法想出一个合理的方法来防止这种情况 - 我认为它会强制我们等待所有当前正在执行的“catch”处理程序(如果有的话)完成(另一个变量,因为“waiters”可能都在等待成功运行),紧接着在任何东西读取“!currLazy.IsValueCreated”之后立即再次读取“lazy”。这是可行的,但会增加复杂性,效益值得怀疑。 - Joe Amenta
好的,我会将其标记为正确的,因为我认为这是最优雅的,并且关于已知竞争条件的警告足够清晰。同时也要感谢DarinDamien的回答。谢谢大家。 - MattBecker82

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接