Interlocked.Increment 不安全的多线程操作吗?

4
我在代码中发现了一个编译器错误,只有一行代码:

int thisIndex = Interlocked.Increment(ref messagesIndex) & indexMask;

定义如下:

定义如下:

static int messagesIndex = -1;
public const int MaxMessages = 0x10000;
const int indexMask = MaxMessages-1;

messagesIndex没有被其他代码访问。

如果我在单个线程中运行该代码数十亿次,不会出现任何错误。

如果我在几个线程上运行上面的代码,我会得到相同的数字两次,每1x千次跳过另一个数字。

我已经在6个线程上运行了下面的代码数十亿次,从未出现过错误:

int thisIndex = Interlocked.Increment(ref messagesIndex);

结论和问题

看起来 Interlocked.Increment() 单独使用是按预期工作的,但是 Interlocked.Increment()indexMask 结合使用时却不行 :-(

有没有什么办法可以让它始终正常工作,而不仅仅是99.99%的时间?

我尝试将 Interlocked.Increment(ref messagesIndex) 分配给一个易失性整数变量,并在该变量上执行 "& indexMask" 操作:

[ThreadStatic]
volatile static int nextIncrement;

nextIncrement = Interlocked.Increment(ref mainIndexIncrementModTest);
indexes[testThreadIndex++] = nextIncrement & maskIncrementModTest;

当我将代码写成一行时,它会引起与将其写在多行中相同的问题。

反汇编

也许有人可以从反汇编中猜出编译器引入的问题:

indexes[testThreadIndex++] = Interlocked.Increment(ref mainIndexIncrementTest);
0000009a  mov         eax, dword ptr [ebp-48h] 
0000009d  mov         dword ptr [ebp-58h], eax 
000000a0  inc         dword ptr [ebp-48h] 
000000a3  mov         eax, dword ptr [ebp-44h] 
000000a6  mov         dword ptr [ebp-5Ch], eax 
000000a9  lea         ecx, ds:[00198F84h] 
000000af  call        6D758403 
000000b4  mov         dword ptr [ebp-60h], eax 
000000b7  mov         eax, dword ptr [ebp-58h] 

000000ba  mov         edx, dword ptr [ebp-5Ch] 
000000bd  cmp         eax, dword ptr [edx+4] 
000000c0  jb          000000C7 
000000c2  call        6D9C2804 
000000c7  mov         ecx, dword ptr [ebp-60h] 
000000ca  mov         dword ptr [edx+eax*4+8], ecx 

indexes[testThreadIndex++] = Interlocked.Increment(ref mainIndexIncrementModTest) & maskIncrementModTest;
0000009a  mov         eax, dword ptr [ebp-48h] 
0000009d  mov         dword ptr [ebp-58h], eax 
000000a0  inc         dword ptr [ebp-48h] 
000000a3  mov         eax, dword ptr [ebp-44h] 
000000a6  mov         dword ptr [ebp-5Ch], eax 
000000a9  lea         ecx,ds:[001D8F88h] 
000000af  call        6D947C8B 
000000b4  mov         dword ptr [ebp-60h], eax 
000000b7  mov         eax, dword ptr [ebp-60h] 
000000ba  and         eax, 0FFFh 
000000bf  mov         edx, dword ptr [ebp-58h] 
000000c2  mov         ecx, dword ptr [ebp-5Ch] 
000000c5  cmp         edx, dword ptr [ecx+4] 
000000c8  jb          000000CF 
000000ca  call        6DBB208C 
000000cf  mov         dword ptr [ecx+edx*4+8], eax 

漏洞检测

为了发现漏洞,我会无限运行问题行并在6个线程中每个线程将返回的整数写入巨大的整数数组中。一段时间后,我停止线程并搜索所有六个整数数组,以确保每个数字只被返回一次(当然,我允许 "& indexMask" 操作)。

using System;
using System.Text;
using System.Threading;


namespace RealTimeTracer 
{
    class Test 
    {
        #region Test Increment Multi Threads
        //      ----------------------------

        const int maxThreadIndexIncrementTest = 0x200000;
        static int mainIndexIncrementTest = -1; //the counter gets incremented before its use
        static int[][] threadIndexThraces;

        private static void testIncrementMultiThread() 
        {
            const int maxTestThreads = 6;

            Thread.CurrentThread.Name = "MainThread";

            //start writer test threads
            Console.WriteLine("start " + maxTestThreads + " test writer threads.");
            Thread[] testThreads = testThreads = new Thread[maxTestThreads];
            threadIndexThraces = new int[maxTestThreads][];
            int testcycle = 0;

            do 
            {
                testcycle++;
                Console.WriteLine("testcycle " + testcycle);
                for (int testThreadIndex = 0; testThreadIndex < maxTestThreads; testThreadIndex++) 
                {
                    Thread testThread = new Thread(testIncrementThreadBody);
                    testThread.Name = "TestThread " + testThreadIndex;
                    testThreads[testThreadIndex] = testThread;
                    threadIndexThraces[testThreadIndex] = new int[maxThreadIndexIncrementTest+1]; //last int will be never used, but easier for programming
                }

                mainIndexIncrementTest = -1; //the counter gets incremented before its use
                for (int testThreadIndex = 0; testThreadIndex < maxTestThreads; testThreadIndex++) 
                {
                    testThreads[testThreadIndex].Start(testThreadIndex);
                }

                //wait for writer test threads
                Console.WriteLine("wait for writer threads.");

                foreach (Thread testThread in testThreads)
                {
                    testThread.Join();
                }

                //verify that EVERY index is used exactly by one thread.
                Console.WriteLine("Verify");
                int[] threadIndexes = new int[maxTestThreads];

                for (int counter = 0; counter < mainIndexIncrementTest; counter++) 
                {
                    int threadIndex = 0;
                    for (; threadIndex < maxTestThreads; threadIndex++) 
                    {
                        if (threadIndexThraces[threadIndex][threadIndexes[threadIndex]]==counter) 
                        {
                            threadIndexes[threadIndex]++;
                            break;
                        }
                    }

                    if (threadIndex==maxTestThreads) 
                    {
                        throw new Exception("Could not find index: " + counter);
                    }
                }
            } while (!Console.KeyAvailable);
        }

        public static void testIncrementThreadBody(object threadNoObject)
        {
            int threadNo = (int)threadNoObject;
            int[] indexes = threadIndexThraces[threadNo];
            int testThreadIndex = 0;
            try
            {
                for (int counter = 0; counter < maxThreadIndexIncrementTest; counter++)      
                {
                    indexes[testThreadIndex++] = Interlocked.Increment(ref mainIndexIncrementTest);
                }
            } 
            catch (Exception ex) 
            {
                OneTimeTracer.Trace(Thread.CurrentThread.Name + ex.Message);
            }
        }
        #endregion


        #region Test Increment Mod Multi Threads
        //      --------------------------------

        const int maxThreadIndexIncrementModTest = 0x200000;
        static int mainIndexIncrementModTest = -1; //the counter gets incremented before its use
        const int maxIncrementModTest = 0x1000;
        const int maskIncrementModTest = maxIncrementModTest - 1;


        private static void testIncrementModMultiThread() 
        {
            const int maxTestThreads = 6;

            Thread.CurrentThread.Name = "MainThread";

            //start writer test threads 
            Console.WriteLine("start " + maxTestThreads + " test writer threads.");
            Thread[] testThreads = testThreads = new Thread[maxTestThreads];
            threadIndexThraces = new int[maxTestThreads][];
            int testcycle = 0;
            do 
            {
                testcycle++;
                Console.WriteLine("testcycle " + testcycle);
                for (int testThreadIndex = 0; testThreadIndex < maxTestThreads; testThreadIndex++)
                {
                    Thread testThread = new Thread(testIncrementModThreadBody);
                    testThread.Name = "TestThread " + testThreadIndex;
                    testThreads[testThreadIndex] = testThread;
                    threadIndexThraces[testThreadIndex] = new int[maxThreadIndexIncrementModTest+1]; //last int will be never used, but easier for programming
                }

                mainIndexIncrementModTest = -1; //the counter gets incremented before its use
                for (int testThreadIndex = 0; testThreadIndex < maxTestThreads; testThreadIndex++) 
                {
                    testThreads[testThreadIndex].Start(testThreadIndex);
                }

                //wait for writer test threads
                Console.WriteLine("wait for writer threads.");
                foreach (Thread testThread in testThreads) 
                {
                    testThread.Join();
                }

                //verify that EVERY index is used exactly by one thread.
                Console.WriteLine("Verify");
                int[] threadIndexes = new int[maxTestThreads];
                int expectedIncrement = 0;
                for (int counter = 0; counter < mainIndexIncrementModTest; counter++) 
                {
                    int threadIndex = 0;
                    for (; threadIndex < maxTestThreads; threadIndex++) 
                    {
                        if (threadIndexes[threadIndex]<maxThreadIndexIncrementModTest     && 
threadIndexThraces[threadIndex][threadIndexes[threadIndex]]==expectedIncrement) 
                        {
                            threadIndexes[threadIndex]++;
                            expectedIncrement++;
                            if (expectedIncrement==maxIncrementModTest) 
                            {
                                expectedIncrement = 0;
                            }
                            break;
                        }
                    }

                    if (threadIndex==maxTestThreads) 
                    {
                        StringBuilder stringBuilder = new StringBuilder();
                        for (int threadErrorIndex = 0; threadErrorIndex < maxTestThreads; threadErrorIndex++)
                        {
                            int index = threadIndexes[threadErrorIndex];
                            if (index<0) 
                            {
                                stringBuilder.AppendLine("Thread " + threadErrorIndex + " is empty");
                            }
                            else if (index==0)
                            {
                                stringBuilder.AppendLine("Thread " + threadErrorIndex + "[0]=" +
                                threadIndexThraces[threadErrorIndex][0]);
                            }
                            else if (index>=maxThreadIndexIncrementModTest) 
                            {
                                stringBuilder.AppendLine("Thread " + threadErrorIndex + "[" + (index-1) + "]=" +
                threadIndexThraces[threadErrorIndex][maxThreadIndexIncrementModTest-2] + ", " + 
                threadIndexThraces[threadErrorIndex][maxThreadIndexIncrementModTest-1]);
                            } 
                            else 
                            {
                                stringBuilder.AppendLine("Thread " + threadErrorIndex + "[" + (index-1) + "]=" +
                threadIndexThraces[threadErrorIndex][index-1] + ", " + 
                threadIndexThraces[threadErrorIndex][index]);
                            }
                        } 

                        string exceptionString = "Could not find index: " + expectedIncrement + " for counter " + counter + Environment.NewLine + stringBuilder.ToString();
                        Console.WriteLine(exceptionString);

                        return;
                        //throw new Exception(exceptionString);
                    }
                }
            } while (!Console.KeyAvailable);
        }


        public static void testIncrementModThreadBody(object threadNoObject)
        {
            int threadNo = (int)threadNoObject;
            int[] indexes = threadIndexThraces[threadNo];
            int testThreadIndex = 0;
            try
            {
                for (int counter = 0; counter < maxThreadIndexIncrementModTest; counter++) 
                {
                    // indexes[testThreadIndex++] = Interlocked.Increment(ref mainIndexIncrementModTest) & maskIncrementModTest;
                    int nextIncrement = Interlocked.Increment(ref mainIndexIncrementModTest);
                    indexes[testThreadIndex++] = nextIncrement & maskIncrementModTest;
                }
            } 
            catch (Exception ex) 
            {
                OneTimeTracer.Trace(Thread.CurrentThread.Name + ex.Message);
            }
        }
        #endregion
    }
}

这会产生以下错误:
6个int数组的内容(每个线程一个)
线程0 [30851] = 2637,2641
线程1 [31214] = 2639,2644
线程2 [48244] = 2638,2643
线程3 [26512] = 2635,2642
线程4 [0] = 2636,2775
线程5 [9173] = 2629,2636 解释:
线程4使用了2636
线程5也使用了2636 !!!! 这不应该发生
线程0使用了2637
线程2使用了2638
线程1使用了2639
2640没有被任何线程使用 !!! 这就是测试检测到的错误
线程0使用了2641
线程3使用了2642

13
自增和&是两种操作,它们组合后不一定是原子性的。使用锁来解决这种操作的组合。 - Candide
你的问题已经收到了一些关闭投票。以下是一些反馈:保持问题非常具体化。说清楚:a)你想做什么,b)你怎么做的,c)你期望的结果,d)实际发生了什么。如果你认为问题可能出在你的代码上,那么首先要关注你的代码。可以包括你的猜测,但应该放在最后,以免分散注意力。如果你认为CLR存在错误,那么就要重点关注它——不要包含任何用于测试_你的_代码的内容——而是包括一个测试_CLR代码和它生成的机器代码的方法。 - Cory Nelson
Candide的评论是错误的。只有“增量”需要支持多线程安全(原子性)。一旦“增量”返回正确的值,对该值的任何进一步操作都不会影响到“消息索引”,而是一个本地值,在其他线程中无法看到。 - Peter Huber
1
Cory:你的回答表明你没有理解我所描述的问题,也没有尝试我提供的测试代码。请不要利用你的完全误解来宣称该问题应该关闭。“我的代码”只包含一行:“int thisIndex = Interlocked.Increment(ref messagesIndex) & indexMask;”,然后我提供了测试代码,证明这个语句有时会产生相同的数字,这显然是不应该发生的。而且,当thisIndex返回到零时,并没有出现错误。 - Peter Huber
@Candide:虽然你得到了很多赞,但原子性在这里并不起作用。请看我的回答。 - Alois Kraus
1
不要修改这个问题,最好关闭它。似乎人们没有理解问题所在,这意味着我必须完全重写问题,这将使一些提供的答案失去上下文。我想自己关闭这个问题,但我不知道怎么做。我希望从头开始重新撰写并发布一个新问题不违反规定。 - Peter Huber
3个回答

12

问题不在于Interlocked。也不存在竞争条件。这甚至与多线程无关。

  • Interlocked.Increment(ref variable) & Mask

没有竞争条件,原子性也没有问题。 Interlocked在寄存器(eax)中返回读取的值。屏蔽操作发生在一个寄存器内部读取的值上,与内存模型或原子性无关。寄存器和局部变量不能被其他线程看到,因此不会产生干扰。

  • 测试

您正在验证是否所有值都已被看到,使用int[nThreads],检查每个线程是否在索引n处看到了该值,并假设下一个值必须在此线程或任何其他线程上可见。

Console.WriteLine("Verify");
int[] threadIndexes = new int[nThreads];

for (int counter = 0; counter < GlobalCounter; counter++) 
{
    int nThread = 0;
    for (; nThread < nThreads; nThread++) 
    {
        if (ThreadArrays[nThread][threadIndexes[nThread]]==counter) 
        {
            threadIndexes[nThread]++;
            break;
        }
    }

    if (nThread==nThreads) 
    {
        throw new Exception("Could not find index: " + counter);
    }
}

我已经重命名了变量以获得更清晰的命名。我已经改变了掩码测试,使位掩码不是在线程中完成而是在验证时间完成,这也进行了断言。这表明您的测试代码存在逻辑问题。线程函数仅存储增加的值,与您的未掩码测试相同。

//verify that EVERY index is used exactly by one thread.
Console.WriteLine("Verify");
int[] threadIndexes = new int[nThreads];
int expectedIncrement = 0;
for (int counter = 0; counter < GlobalCounter; counter++) 
{
    int threadIndex = 0;
    for (; threadIndex < nThreads; threadIndex++) 
    {
        if (threadIndexes[threadIndex]<LoopCount && (ThreadArrays[threadIndex][threadIndexes[threadIndex]] & MaxIncrementBitMask)==expectedIncrement) 
        {
            threadIndexes[threadIndex]++;
            expectedIncrement++;
            if (expectedIncrement == MaxIncrementBit)
            {
                expectedIncrement = 0;
            }
            break;
        }
    }

    if (threadIndex==nThreads) 
    {

当数值出现“环绕”时,您的检查就会出问题。例如,0是第一个值。在0x1000&0xFFF处,它再次变为0。现在可能会发生这样的情况: 您会把一些包装过的值归因于错误的线程,并且会破坏一个隐含假设:每个线程只有唯一的值。

在调试器中,我看到例如值8

threadIndexes [0] = 1
threadIndexes [1] = 4
threadIndexes [2] = 0
threadIndexes [3] = 1
threadIndexes [4] = 1
threadIndexes [5] = 1

虽然您必须将前8个值归为threadIndexes [1],这是第一个线程,从0开始计数到几千。

总之:Interlocked和掩码运算是有效的。您的测试有缺陷,也许您依赖于无效的假设编写了代码。


2
嗨,Alois。终于有人真正读了我写的东西。你可能有所发现,因为当我制作“const int maxIncrementModTest = 2*maxThreadIndexIncrementModTest;”时,它可以防止“%”掩盖任何东西,不会出现错误。谢谢你的提示。我需要更多时间来调查它。 - Peter Huber
1
你说得对,我的测试是错误的。在寻找下一个数字时,它总是从线程0开始查找,然后是1、2,直到找到期望的数字。当mainIndexIncrementTest重新从0开始时,必须出现正确的数字在更高的线程中,但线程0恰好有相同的数字的情况。因此,我编写了一个不同的测试,只计算每个mainIndexIncrementTest返回的数字。所有数字都出现了相同的次数,意味着没有跳过或重复使用。我接受了你的答案。 - Peter Huber
小澄清,Increment 不会返回读取的值,而是返回已更改(即增加)的值。 - astrowalker

10

放心,Interlocked.Increment 是线程安全的。这就是它的全部目的!

你正在测试每个线程是否恰好看到了每个索引一次。如果线程一个接一个执行,那么这将起效。假设每个线程计数为10,000:

A会得到0-9999,B会得到10000-19999,依此类推——当掩码时,每个线程会恰好看到0-9999一次。

但是您的线程正在并发执行。因此您的线程看到的索引是零散、不可预测的交错:

A获取0-4999,B获取5000-9999,A获取10000-14999,B获取15000-19999。

未经掩码处理,每个值将保持唯一。经过掩码处理,A最终将会看到所有0-4999两次,而B将会看到5000-9999两次。

您没有说明您的最终目标是什么,但更好的选择可能是TLS:

[ThreadStatic]
static int perThreadIndex = -1;

int myIndex = ++perThreadIndex;

通过使用ThreadStatic属性,每个线程将只看到它自己的perThreadIndex私有实例,因此您永远不必担心线程会看到重复的索引。


1
你的假设“你正在测试每个线程是否恰好看到了每个索引一次”是完全错误的!如果你看一下我发布的测试代码,测试验证了每个数字在所有线程中仅使用一次。该测试足够聪明,可以检测计数器是否返回0。 - Peter Huber

4

没有错误。

很容易看出增量是原子的,因为它在第二段代码中的偏移量a0处作为单个机器指令执行。同样,and操作不是原子操作,因为它在偏移量b7处作为一系列指令执行。

您可以使用原子库在C++中执行原子位操作。您还可以基于原子比较和交换实现更复杂的操作。如果您愿意在C++中编写代码并从C#中调用它,那将是一个解决方案(Interop非常有效)。

如果您需要仅限C#的解决方案,则仍然可以使用Interlocked.Exchange。基本策略是在循环中执行计算,直到Exchange返回的值与用于计算的值相同时,从而保证没有其他人更改它。

然后您可以使用锁。如果有合理的替代方案,我从不使用锁。


让我解释一下原子性。如果一个操作要么完全执行,要么根本不执行,但在中间没有任何操作,则该操作是原子的。单个机器指令是原子的,因为指令永远不会被中断(可见)。任何指令序列都是非原子的,因为序列可能会被中断。在多线程环境中,当发生中断时,内存内容可能会更改,从而使计算无效。

实际上,有几种策略可以使指令序列在实践中成为原子操作,即使在理论上不是这样。

因此,此行代码是原子的,因为它生成单个机器指令。

nextIncrement = Interlocked.Increment(ref mainIndexIncrementModTest);

000000a0  inc         dword ptr [ebp-48h]

这行代码不是原子性的,因为它会生成很多指令,而每个指令都会单独执行。
indexes[testThreadIndex++] = nextIncrement & maskIncrementModTest;

000000b7  mov         eax, dword ptr [ebp-60h] ; <=== load
000000ba  and         eax, 0FFFh               ; <=== &
000000bf  mov         edx, dword ptr [ebp-58h] 
000000c2  mov         ecx, dword ptr [ebp-5Ch] 
000000c5  cmp         edx, dword ptr [ecx+4] 
000000c8  jb          000000CF 
000000ca  call        6DBB208C 
000000cf  mov         dword ptr [ecx+edx*4+8], eax ; <=== store

问题出现在内存的读写上。对AX寄存器的操作没有问题,但是从内存中读取和写入的过程依赖于这些内存值的稳定性,在多线程环境下可能并不稳定。


如果我将代码写成两行,那么很明显只有第一行必须是原子的。nextIncrement是一个局部变量,其他线程无法看到它。但它会生成相同的错误。int nextIncrement = Interlocked.Increment(ref mainIndexIncrementModTest);indexes[testThreadIndex++] = nextIncrement & maskIncrementModTest; - Peter Huber
3
如果一个操作生成一条机器指令,则该操作是原子的。你编写了多少行代码并不重要。 - david.pfx
正是我的观点。没有理由让“&”操作对其他线程造成问题,因为它作用于Increment()的返回值,而这个返回值是一个本地副本。请注意,返回值是一个整数,而不是引用类型。 - Peter Huber
1
加载和存储是非原子性的。请参见编辑。 - david.pfx
"000000af call 6D947C8B" 是对Interlocked.Increment()的调用。编译器不会将Interlocked.Increment()转换为"000000a0 inc dword ptr [ebp-48h]",因为Interlocked.Increment()内部执行的不仅仅是增加操作。增加操作在处理器的缓存中进行。如果涉及到2个处理器,Interlocked.Increment()必须确保从RAM(而不是缓存)读取最新值,增加后写回RAM。它还需要返回增加后的值。INC指令无法完成所有这些操作。 - Peter Huber

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接