如何在多线程场景下对只需执行一次的代码进行单元测试?

9

一个类包含一个属性,应该只创建一次。创建过程是通过传入参数的 Func<T> 来实现的。这是缓存场景的一部分。

测试确保无论有多少线程尝试访问该元素,创建只会发生一次。

单元测试的机制是在访问器周围启动大量线程,并计算创建函数被调用的次数。

这一点根本不确定,没有任何保证这实际上测试了多线程访问。也许一次只有一个线程会命中锁定。(在现实中,如果没有lockgetFunctionExecuteCount 介于7到9之间... 在我的机器上,不能保证在CI服务器上它会是相同的)

如何以确定性的方式重写单元测试?如何确保lock被多个线程多次触发?

using Microsoft.VisualStudio.TestTools.UnitTesting;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Example.Test
{
    public class MyObject<T> where T : class
    {
        private readonly object _lock = new object();
        private T _value = null;
        public T Get(Func<T> creator)
        {
            if (_value == null)
            {
                lock (_lock)
                {
                    if (_value == null)
                    {
                        _value = creator();
                    }
                }
            }
            return _value;
        }
    }

    [TestClass]
    public class UnitTest1
    {
        [TestMethod]
        public void MultipleParallelGetShouldLaunchGetFunctionOnlyOnce()
        {
            int getFunctionExecuteCount = 0;
            var cache = new MyObject<string>();

            Func<string> creator = () =>
            {
                Interlocked.Increment(ref getFunctionExecuteCount);
                return "Hello World!";
            };
            // Launch a very big number of thread to be sure
            Parallel.ForEach(Enumerable.Range(0, 100), _ =>
            {
                cache.Get(creator);
            });

            Assert.AreEqual(1, getFunctionExecuteCount);
        }
    }
}

最糟糕的情况是如果有人破解了“锁定”代码,并且测试服务器出现了一些延迟。这个测试不应该通过:
using NUnit.Framework;
using System;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace Example.Test
{
    public class MyObject<T> where T : class
    {
        private readonly object _lock = new object();
        private T _value = null;
        public T Get(Func<T> creator)
        {
            if (_value == null)
            {
                // oups, some intern broke the code
                //lock (_lock)
                {
                    if (_value == null)
                    {
                        _value = creator();
                    }
                }
            }
            return _value;
        }
    }

    [TestFixture]
    public class UnitTest1
    {
        [Test]
        public void MultipleParallelGetShouldLaunchGetFunctionOnlyOnce()
        {
            int getFunctionExecuteCount = 0;
            var cache = new MyObject<string>();

            Func<string> creator = () =>
            {
                Interlocked.Increment(ref getFunctionExecuteCount);
                return "Hello World!";
            };
            Parallel.ForEach(Enumerable.Range(0, 2), threadIndex =>
            {
                // testing server has lag
                Thread.Sleep(threadIndex * 1000);
                cache.Get(creator);
            });

             // 1 test passed :'(
            Assert.AreEqual(1, getFunctionExecuteCount);
        }
    }
}

如果您将创建者设为“慢”的(让线程睡一会儿),那么其他线程遇到锁的机率就会增加,它们必须等待慢线程离开锁。 - Emond
你要对线程安全进行单元测试吗?这对我来说似乎很奇怪。在我看来,你应该对暴露的类的功能进行单元测试,而线程安全问题更适合作为代码审查的主题。 - Yuval Itzchakov
如果有人删除了代码中的“lock”部分,而代码审查没有发现,我希望单元测试能够捕捉到它。 - Cyril Gandon
1
为什么不直接使用Lazy<T>类呢?它似乎可以完全达到你想要实现的目标。链接 - Maarten
@link: 是的,Lazy可以解决这个小例子中的问题。我的真实情况更复杂,涉及到更高级的情况,我不能使用Lazy类。 - Cyril Gandon
为什么不使用Rhino Mock并且在方法调用上只期望发生一次? - vendettamit
3个回答

5
为了使其确定性,您只需要两个线程,并确保其中一个在函数内部阻塞,而另一个则尝试进入。
[TestMethod]
public void MultipleParallelGetShouldLaunchGetFunctionOnlyOnce()
{
    var evt = new ManualResetEvent(false);

    int functionExecuteCount = 0;
    var cache = new MyObject<object>();

    Func<object> creator = () =>
    {
        Interlocked.Increment(ref functionExecuteCount);
        evt.WaitOne();
        return new object();
    };

    var t1 = Task.Run(() => cache.Get(creator));
    var t2 = Task.Run(() => cache.Get(creator));

    // Wait for one task to get inside the function
    while (functionExecuteCount == 0)
        Thread.Yield();

    // Allow the function to finish executing
    evt.Set();

    // Wait for completion
    Task.WaitAll(t1, t2);

    Assert.AreEqual(1, functionExecuteCount);
    Assert.AreEqual(t1.Result, t2.Result);
}

您可能希望在此测试中设置超时时间 :)


以下是一种变体,允许测试更多情况:

public void MultipleParallelGetShouldLaunchGetFunctionOnlyOnce()
{
    var evt = new ManualResetEvent(false);

    int functionExecuteCount = 0;
    var cache = new MyObject<object>();

    Func<object> creator = () =>
    {
        Interlocked.Increment(ref functionExecuteCount);
        evt.WaitOne();
        return new object();
    };

    object r1 = null, r2 = null;

    var t1 = new Thread(() => { r1 = cache.Get(creator); });
    t1.Start();

    var t2 = new Thread(() => { r2 = cache.Get(creator); });
    t2.Start();

    // Make sure both threads are blocked
    while (t1.ThreadState != ThreadState.WaitSleepJoin)
        Thread.Yield();

    while (t2.ThreadState != ThreadState.WaitSleepJoin)
        Thread.Yield();

    // Let them continue
    evt.Set();

    // Wait for completion
    t1.Join();
    t2.Join();

    Assert.AreEqual(1, functionExecuteCount);
    Assert.IsNotNull(r1);
    Assert.AreEqual(r1, r2);
}

如果您想延迟第二个调用,您将无法使用Thread.Sleep,因为它会导致线程进入WaitSleepJoin状态:

线程被阻止。这可能是调用Thread.SleepThread.Join的结果,也可能是请求锁定的结果 - 例如通过调用Monitor.EnterMonitor.Wait - 或者等待线程同步对象,例如ManualResetEvent

并且我们将无法确定线程是正在睡眠还是在等待您的ManualResetEvent...

但是您可以轻松地用忙等待替换睡眠。注释掉lock,并将t2更改为:

var t2 = new Thread(() =>
{
    var sw = Stopwatch.StartNew();
    while (sw.ElapsedMilliseconds < 1000)
        Thread.Yield();

    r2 = cache.Get(creator);
});

现在测试将会失败。


1
嗨@Lucas,这个方法看起来很有前途,但是如果我注释掉“锁代码”,并在一秒后启动第二个线程,测试就可以通过:var t2 = Task.Factory.StartNew(() => { Thread.Sleep(1000); return cache.Get(creator); }); - Cyril Gandon
太棒了,这正是我在寻找的,线程操作和线程状态。谢谢。 - Cyril Gandon

1
除了pid的答案之外:
这段代码实际上并没有创建很多线程。
// Launch a very big number of thread to be sure
Parallel.ForEach(Enumerable.Range(0, 100), _ =>
{
    cache.Get(creator);
});

它将启动~Environment.ProcessorCount个线程。更多细节。

如果你想要获得大量的线程,你应该明确地这样做。

var threads = Enumerable.Range(0, 100)
    .Select(_ => new Thread(() => cache.Get(creator))).ToList();
threads.ForEach(thread => thread.Start());
threads.ForEach(thread => thread.Join());

如果您足够多的线程并强制它们切换,那么您将获得并发竞争。
如果您关心CI服务器只有一个空闲核心的情况,您可以通过更改Process.ProcessorAffinity属性在测试中包含此约束。更多细节。

1
我认为没有一种真正确定的方法,但是您可以提高概率,使其非常难以不引起并发竞争:
Interlocked.Increment(ref getFunctionExecuteCount);
Thread.Yield();
Thread.Sleep(1);
Thread.Yield();
return "Hello World!";

通过提高 Sleep() 参数(到10?),越来越不可能发生并发竞争。

Yield 在这里有什么作用,睡眠不足以吗?处理器与此有什么关系? - Emond
这个想法是卸载正在运行的线程,以便其他并发线程可以运行并请求锁。Yield 允许中止当前时间片,以便轮询可以立即交换另一个时间片。 - pid
睡眠功能可以在多个处理器上执行相同的操作。 - Emond
啊,好的。知道了!谢谢。 - pid

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接