通过中断更新的 64 位变量的读取

5

我没有找到太多关于非原子操作的材料。

假设我有一个32位处理器,我想在一个64位变量中计算微秒数。每微秒都会有一个中断更新该变量。调度程序是非抢占式的。将会有一个函数用来清除变量,另一个函数用来读取它。由于这是一个32位处理器,因此访问将是非原子性的。是否有一种“标准”或惯用的方法来处理这个问题,以便读取函数不会得到半更新的值?


<stdatomic.h>有long long。 - pm100
3
我曾经使用类似的方法(使用8位处理器),先读取高位,再读取低位,然后比较高位。如果不同,则重复执行该过程。 - Weather Vane
从FPGA开发的角度来看如何跨时钟域边界:使用格雷码在ISR和读取器代码之间进行跨越。这是有效的,因为在增加格雷码计数器时,每次只会更改一个位。 - datenwolf
应该声明正在使用C99。 - MaryK
1
这是在微控制器上吗?使用RTOS还是裸机? - Gabriel Staples
@pm100 但是它是否为非锁定取决于架构,您必须检查ATOMIC_LLONG_LOCK_FREE来了解。如果它不是无锁定的,则会非常昂贵。 - phuclv
3个回答

5
有没有一种“标准”或成语化的方法来处理这个问题,以便读取函数不会得到一个半更新的值?
你需要做的是使用我所称之为“原子访问保护”或“中断保护”。这是我感兴趣的领域,我花了很多时间学习并在各种类型的微控制器中使用。
@chux - Reinstate Monica 是正确的,但我想进一步澄清一些事情:
对于从易失性变量中进行读取,通过快速复制变量来最小化关闭中断的时间,然后在计算中使用副本。
// ==========
// Do this:
// ==========

// global volatile variables for use in ISRs
volatile uint64_t u1;
volatile uint64_t u2;
volatile uint64_t u3;

int main()
{
    // main loop
    while (true)
    {
        uint64_t u1_copy;
        uint64_t u2_copy;
        uint64_t u3_copy;

        // use atomic access guards to copy out the volatile variables
        // 1. Save the current interrupt state
        const uint32_t INTERRUPT_STATE_BAK = INTERRUPT_STATE_REGISTER;
        // 2. Turn interrupts off
        interrupts_off();
        // copy your volatile variables out
        u1_copy = u1;
        u2_copy = u2;
        u3_copy = u3;
        // 3. Restore the interrupt state to what it was before disabling it.
        // This leaves interrupts disabled if they were previously disabled
        // (ex: inside an ISR where interrupts get disabled by default as it
        // enters--not all ISRs are this way, but many are, depending on your
        // device), and it re-enables interrupts if they were previously
        // enabled. Restoring interrupt state rather than enabling interrupts
        // is the right way to do it, and it enables this atomic access guard
        // style to be used both inside inside **and** outside ISRs.
        INTERRUPT_STATE_REGISTER = INTERRUPT_STATE_BAK;

        // Now use your copied variables in any calculations
    }
}

// ==========
// NOT this!
// ==========

volatile uint64_t u1;
volatile uint64_t u2;
volatile uint64_t u3;

int main()
{
    // main loop
    while (true)
    {
        // 1. Save the current interrupt state
        const uint32_t INTERRUPT_STATE_BAK = INTERRUPT_STATE_REGISTER;
        // 2. Turn interrupts off
        interrupts_off();

        // Now use your volatile variables in any long calculations
        // - This is not as good as using copies! This would leave interrupts
        //   off for an unnecessarily long time, introducing a ton of jitter
        //   into your measurements and code.

        // 3. Restore the interrupt state to what it was before disabling it.
        INTERRUPT_STATE_REGISTER = INTERRUPT_STATE_BAK;

    }
}

对于写入易失性变量,要快速写入:

在更新易失性变量时,尽量减少中断关闭的时间,只在更新过程中禁用它们。

// global volatile variables for use in ISRs
volatile uint64_t u1;
volatile uint64_t u2;
volatile uint64_t u3;

int main()
{
    // main loop
    while (true)
    {
        // Do calculations here, **outside** the atomic access interrupt guards

        const uint32_t INTERRUPT_STATE_BAK = INTERRUPT_STATE_REGISTER;
        interrupts_off();
        // quickly update your variables and exit the guards
        u1 = 1234;
        u2 = 2345;
        u3 = 3456;
        INTERRUPT_STATE_REGISTER = INTERRUPT_STATE_BAK;
    }
}

替代方案:无锁原子读取通过重复读取循环:doAtomicRead():确保在不关闭中断的情况下进行原子读取!

与上述示例中使用的原子访问保护相比,另一种选择是重复读取变量,直到它不再改变,这表明在你只读取了部分字节后,变量没有被更新。

以下是该方法。@Brendan和@chux-ReinstateMonica和我在@chux-ReinstateMonica's answer下讨论了一些想法。

#include <stdint.h>  // UINT64_MAX

#define MAX_NUM_ATOMIC_READ_ATTEMPTS 3

// errors
#define ATOMIC_READ_FAILED (UINT64_MAX)

/// @brief          Use a repeat-read loop to do atomic-access reads of a 
///     volatile variable, rather than using atomic access guards which
///     disable interrupts.
uint64_t doAtomicRead(const volatile uint64_t* val)
{
    uint64_t val_copy;
    uint64_t val_copy_atomic = ATOMIC_READ_FAILED;
    
    for (size_t i = 0; i < MAX_NUM_ATOMIC_READ_ATTEMPTS; i++)
    {
        val_copy = *val; 
        if (val_copy == *val)
        {
            val_copy_atomic = val_copy;
            break;
        }
    }

    return val_copy_atomic;
}

如果你想更深入地理解,这里是同样的doAtomicRead()函数,但这次附有详细的解释性注释。我还展示了一个被注释掉的微小变体,可能在某些情况下会有帮助,如注释中所解释的那样。
/// @brief          Use a repeat-read loop to do atomic-access reads of a 
///     volatile variable, rather than using atomic access guards which
///     disable interrupts.
///
/// @param[in]      val             Ptr to a volatile variable which is updated
///                                 by an ISR and needs to be read atomically.
/// @return         A copy of an atomic read of the passed-in variable, 
///     if successful, or sentinel value ATOMIC_READ_FAILED if the max number
///     of attempts to do the atomic read was exceeded.
uint64_t doAtomicRead(const volatile uint64_t* val)
{
    uint64_t val_copy;
    uint64_t val_copy_atomic = ATOMIC_READ_FAILED;
    
    // In case we get interrupted during this code block, and `val` gets updated
    // in that interrupt's ISR, try `MAX_NUM_ATOMIC_READ_ATTEMPTS` times to get
    // an atomic read of `val`.
    for (size_t i = 0; i < MAX_NUM_ATOMIC_READ_ATTEMPTS; i++)
    {
        val_copy = *val; 

        // An interrupt could have fired mid-read while doing the **non-atomic**
        // read above, updating the 64-bit value in the ISR and resulting in
        // 32-bits of the old value in the 64-bit variable being wrong now
        // (since the whole 64-bit value has just been updated with a new
        // value), so verify the read above with a new read again.
        // 
        // Caveat: 
        //
        // Note that this method is **not _always_** foolproof, as technically
        // the interrupt could fire off and run again during this 2nd read,
        // causing a very rare edge-case where the exact same incorrect value
        // gets read again, resulting in a false positive where it assigns an
        // erroneous value to `val_copy_atomic`! HOWEVER, that is for **you or
        // I** to design and decide as the architect. 
        //
        // Is it _possible_ for the ISR to really fire off again immediately
        // after returning? Or, would that never happen because we are
        // guaranteed some minimum time gap between interrupts? If the former,
        // you should read the variable again a 3rd or 4th time by uncommenting
        // the extra code block below in order to check for consistency and
        // minimize the chance of an erroneous `val_copy_atomic` value. If the
        // latter, however, and you know the ISR won't fire off again for at
        // least some minimum time value which is large enough for this 2nd
        // read to occur **first**, **before** the ISR gets run for the 2nd
        // time, then you can safely say that this 2nd read is sufficient, and
        // you are done.
        if (val_copy == *val)
        {
            val_copy_atomic = val_copy;
            break;
        }

        // Optionally delete the "if" statement just above and do this instead.
        // Refer to the long "caveat" note above to see if this might be
        // necessary. It is only necessary if your ISR might fire back-to-back
        // with essentially zero time delay between each interrupt.
        // for (size_t j = 0; j < 4; j++)
        // {
        //     if (val_copy == *val)
        //     {
        //         val_copy_atomic = val_copy;
        //         break;
        //     }
        // }
    }

    return val_copy_atomic;
}

上述代码可以进行优化,只需在每次迭代之前添加一次额外的读取操作,并在循环中仅读取一次*val,而不是两次。以下是优化后的代码:

[这是我最喜欢的版本:]

uint64_t doAtomicRead(const volatile uint64_t* val)
{
    uint64_t val_copy_new;
    uint64_t val_copy_old = *val;
    uint64_t val_copy_atomic = ATOMIC_READ_FAILED;
    
    for (size_t i = 0; i < MAX_NUM_ATOMIC_READ_ATTEMPTS; i++)
    {
        val_copy_new = *val; 
        if (val_copy_new == val_copy_old)
        {
            // no change in the new reading, so we can assume the read was not 
            // interrupted during the first reading
            val_copy_atomic = val_copy_new;
            break;
        }
        // update the old reading, to compare it with the new reading in the
        // next iteration
        val_copy_old = val_copy_new;  
    }

    return val_copy_atomic;
}

一般使用doAtomicRead()的示例:
// global volatile variable shared between ISRs and main code
volatile uint64_t u1;

// Inside your function: "atomically" read and copy the volatile variable
uint64_t u1_copy = doAtomicRead(&u1);
if (u1_copy == ATOMIC_READ_FAILED)
{
    printf("Failed to atomically read variable `u1`.\n");

    // Now do whatever is appropriate for error handling; examples: 
    goto done;
    // OR:
    return;
    // etc.
}

这要求写入者对任何读取者都是原子的,例如,在单个写入者写入此变量的情况下是正确的。例如,这个写入可能发生在中断服务程序中。我们只检测到由于读取者被中断而导致的不完整的读取,并进行重试。如果64位值在内存中已经处于不完整的写入状态时,该读取者可能会错误地将其视为有效。 SeqLock没有这种限制,因此在多核情况下非常有用。但是,如果您不需要这个(例如,您有一个单核微控制器),那么它可能效率较低,而doAtomicRead()技巧可以很好地工作。
对于一个单调递增的特殊边界情况(不适用于可以更新为任何值的变量,比如存储传感器读数的变量!),你只需要重新读取64位值的最高有效位,并检查它是否发生了变化。因此,为了(可能)稍微提高上述doAtomicRead()函数的效率,将其更新为执行这个操作。除非你错过了2^32次计数,唯一可能出现的撕裂是当低位卷绕时,高位会被递增。这就像检查整个值,但重试的频率会更低。

在这个关于原子访问保护、禁用中断等主题上进一步探讨

  1. 我的 c/containers_ring_buffer_FIFO_GREAT.c 示例来自我的 eRCaGuy_hello_world 存储库。这个代码示例的描述在文件顶部的评论中:
  2. 演示一个基本且高效的无锁 SPSC(单生产者单消费者)环形缓冲队列,在 C 中(也适用于 C++)。

    该队列仅在 SPSC 上下文中无锁工作,比如在需要从中断服务例程(ISR)向主循环发送数据的裸机微控制器上。

  3. [Peter Cordes 在 SeqLock(序列锁)模式上的回答] 使用 32 位原子计数器实现 64 位的方法
  4. [我的回答] C++ 中对单字节(volatile)数组元素的递减操作不是原子的!为什么?(还有:如何在 Atmel AVR 微控制器/Arduino 中强制实现原子性)
  5. 我关于哪些 Arduino 支持 ATOMIC_BLOCK?的长而详细的回答以及:
    1. 使用 gcc 编译器如何实现 ATOMIC_BLOCK 宏,我在哪里可以查看它们的源代码?
    2. 如何在 C++ 中实现 ATOMIC_BLOCK 功能(而不是使用 avrlibc 的 gcc C 版本)?
    3. 我详细解释了这个非常聪明的原子访问保护宏在 C 中如何工作以及如何轻松地在 C++ 中实现:
    4. ATOMIC_BLOCK(ATOMIC_RESTORESTATE)
      {
          my_var_copy = my_var;
      }
      
  6. [我的问答] 在 STM32 微控制器上,哪些变量类型/大小是原子的?
    1. 并非所有变量在简单读写时都需要原子访问保护(但在增加/减少操作时总是需要!- 参见列表中的第一个链接!),因为某些变量对于给定的架构具有自然的原子读写
      1. 对于 8 位 AVR 微控制器(如 Arduino Uno 上的 ATmega328):8 位变量具有自然的原子读写
      2. 对于 32 位 STM32 微控制器,所有非结构体(简单)类型的 32 位和更小的变量都具有自然的原子读写请参阅上面的回答获取详细信息、源代码文档和证明。
  7. 在 STM32 微控制器上禁用中断的技术:https://stm32f4-discovery.net/2015/06/how-to-properly-enabledisable-interrupts-in-arm-cortex-m/
  8. [我的回答] 在 Arduino 中 ISR 中全局 volatile 变量未更新:如何通过使用原子访问保护来识别和解决竞态条件?
  9. [我的回答] 在 STM32 微控制器中禁用和重新启用中断的各种方法,以实现原子访问保护?
  10. https://en.wikipedia.org/wiki/Readers%E2%80%93writer_lock

1
请注意,您的重新读取策略依赖于它在读者方面是原子写入的(因此,在单核机器上的ISR中是可以的)。SeqLock没有这个要求;对于其他内核上的读取器而言,它是SMP安全的。但它不是无锁的,如果有多个写入者,他们需要确保自己不会同时写入。 - Peter Cordes
@PeterCordes,你能否用代码演示这部分内容?上述内容可以在汇编中进行优化,每次迭代只需解引用*val一次,在第一次之前多加载一次。 - Gabriel Staples
@PeterCordes,我还是不明白。请随意更新包含“///todo///”的代码块,以便更清晰地表达您的意思。 - Gabriel Staples
1
好的,完成。https://godbolt.org/z/Me5oWKoPG 显示了使用MAX_NUM_ATOMIC_READ_ATTEMPTS=3时,GCC -Os仍然完全展开循环,因此read1=read2不需要任何指令成本。将值设置为300或其他可见编译器创建循环,在这种情况下,2个ldrdldrd + 2个mov少一些指令,但差别不大。 - Peter Cordes
啊,我想我在开始编辑时找到了 todo 注释,但在搞砸之后重新开始编辑,并且显然从Godbolt复制/粘贴了错误代码块的一部分,这个代码块只是之前看起来类似于我一直在查看的那个。不确定为什么您想保留具有2个读取的版本并将其显示为第一个。差别不大,所以我认为未来的读者可能会因为同一算法的多个实现而陷入困境;它们都易于阅读。当然,这是您的答案,因此由您决定,这只是我的意见/印象。 - Peter Cordes
显示剩余5条评论

3

在ISR内部,通常会防止后续中断的发生(除非有更高优先级,但这时计数通常不会受影响),因此只需简单地执行 count_of_microseconds ++;

在ISR之外,要访问(读取或写入) count_of_microseconds ,需要具有中断保护或原子访问的功能。

当无法使用 atomic 但可以使用解释控制:*1

volatile uint64_t count_of_microseconds;
...
saved_interrupt_state();
disable_interrupts();
uint64_t my_count64 = count_of_microseconds;
restore_interrupt_state();
// now use my_count64 

否则使用。
atomic_ullong count_of_microseconds;
...
unsigned long long my_count64 = count_of_microseconds;
// now use my_count64 

请参阅如何在C语言中使用原子变量?

自C89以来,使用volatilecount_of_microseconds


[更新]

无论使用哪种方法(包括此答案或其他答案)在非ISR代码中读写计数器,我建议将读写代码封装在一个辅助函数中,以隔离这一关键操作集。


*1 <stdatomic.h>自C11起可用,并且未定义__STDC_NO_ATOMICS__

#if __STDC_VERSION__ >= 201112
#ifndef __STDC_NO_ATOMICS__
#include <stdatomic.h>
#endif
#endif

2
有第三种选择 - 一个循环读取高半部分,读取低半部分,然后再次读取高半部分; 直到高半部分连续两次相同为止 - 例如,像这样 new_high = counter[1]; do { old_high = new_high; low = counter[0]; new_high = counter[1]; } while (old_high != new_high); - Brendan
3
@Brendan,这个想法的一个简单变化是读取项目两次。如果相同,我们就完成了。否则读取第三个。如果第二个和第三个相同,我们就完成了,否则程序出错。我对无限循环持谨慎态度。我宁愿避免访问争用,因此采用了禁用中断的想法。 - chux - Reinstate Monica
1
@GabrielStaples 我学到的一件事是不要做 disable(); read; enable(),因为中断可能已经被禁用,最好的方法是恢复状态而不是启用它。 - chux - Reinstate Monica
1
@GabrielStaples:对于一个计数器来说,唯一会出现问题的情况是如果循环的1次迭代花费的时间太长,以至于高半部分回绕以匹配旧的高半部分。对于微秒计数器来说,这意味着循环的1次迭代需要超过5000万年。 - Brendan
2
@Brendan 当对象缺少volatile时,无论是循环还是3次读取都可能失败。没有volatile,对象的重新读取可能会被优化掉。 - chux - Reinstate Monica
显示剩余14条评论

-1

很高兴听到“读两次”方法是可行的。我之前有些怀疑,不知道为什么。与此同时,我想出了这个:

struct
{
    uint64_t ticks;
    bool toggle;
} timeKeeper = {0};

void timeISR()
{
    ticks++;
    toggle = !toggle;
}

uint64_t getTicks()
{
    uint64_t temp = 0;
    bool startToggle = false;
    
    do
    {
        startToggle = timeKeeper.toggle;
        temp = timekeeper.ticks;
    } while (startToggle != timeKeeper.toggle);
        
    return temp;
}

1
当然,如果代码被某些东西延迟,计时器在循环的一次迭代中会出现两次点击的问题。因此,你需要一个计数器而不仅仅是一个切换...但等等,ticks已经是一个计数器了。所以这本质上就变成了“读两次”。 - Nate Eldredge
2
你几乎重新发明了SeqLock,但1位计数器是不够的;它太容易绕过了。此外,没有这些代码中有volatile,所以你没有强制编译器使汇编读取两次。(使用32位原子操作实现64位原子计数器)。 - Peter Cordes

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接