如何在C语言中一次原子地读取多个变量?

3
我正在尝试同时原子地读取三个变量 a, b, c。模式类似于以下代码。
_Atomic uint32_t a, b, c;

void thread_high_priority(void)
{
  atomic_fetch_sub_explicit(&a, 1, memory_order_relaxed);
  atomic_fetch_add_explicit(&b, 1, memory_order_relaxed);
  atomic_fetch_sub_explicit(&c, 1, memory_order_relaxed);
}

void thread_low_priority(void)
{
  uint32_t _a = a;
  uint32_t _b = b;
  uint32_t _c = c;
}
thread_high_priority 是以高优先级运行的线程,thread_low_priority 是以低优先级运行的线程。 thread_high_priority 可以打断 thread_low_priority 的执行,但反之不成立。也就是说,thread_high_priority 将始终不受干扰地运行。
约束条件是 thread_high_priority 非常关键。因此,我不想使用互斥锁阻塞,因为这会耗费时间,甚至导致死锁。有没有一种方法可以确保同时读取这三个变量而不被打断?
编辑:平台是 ARMv7M 架构,在裸机环境下运行。

可移植地,你无法以无锁的方式完成这个任务。你正在为哪个平台编程?它是否具有原子16字节加载功能,例如带有AVX的英特尔CPU或某些具有原子对齐“ldp”的AArch64 CPU?如果是这样,那么一个选项是将变量放入一个16字节对齐的union { struct foo { _Atomic uint32_t a,b,c; } together; _Atomic struct { uint32_t a,b,c;} separate;}或类似的结构中。类似于如何使用c++11 CAS实现ABA计数器?,以便一次只能有效访问一个。 - Peter Cordes
@PeterCordes 我的平台是ARMv7-M架构。我读了参考资料,它说4字节读取可以是原子性的,但对于大于4字节的数据类型则不行。 - Taiyou Kuo
我猜你可以使用“_Atomic struct{...};”并在RMWs中使用CAS更新值,但这会阻止你高效地访问单个成员。手动填充到16字节可能有助于某些实现决定使其无锁化。根据ISA,读取一个成员可能效率要低得多。 - Peter Cordes
@PeterCordes 感谢您的评论,我会研究CAS并看看如何在我的代码中使用它。 - Taiyou Kuo
1
在ARMv7-M上,您没有无锁16字节的atomic_compare_exchange_weak。只有4字节的ldrex/strex,显然甚至没有2寄存器的ldrexd/strexd。那个评论是对我第一个评论的跟进,当时你刚好发表了评论,而不是回复你的评论。 - Peter Cordes
显示剩余6条评论
2个回答

2

你可以通过一级间接寻址来解决这个问题。

只要有一个写入者,你可以这样做:

  • 将数据项集放在一个结构体中
  • 分配多个这样的结构体
  • 非原子地写入读者不使用的结构体成员
  • 原子性地更改指向读者应使用的结构体的指针

读者应该先读取指针,然后访问相应结构体中的数据。

如果在主上下文仍在读取时可能发生另一个中断,则需要保留指向读者正在使用的结构体的指针,写者可以在填写结构体之前检查这一点。如果只有一个读者,则更容易以原子方式访问此第二个指针。

为了平滑处理,您可以分配三个或更多结构体,并将它们视为环形缓冲区。


谢谢你的建议,我会尝试在我的代码中实现它。 - Taiyou Kuo
不客气!您能否请在左侧打勾接受答案? - Tom V
"对于读者不使用的结构体成员进行非原子写入:请注意,这将需要一些机制来让读者指示何时完成对结构体的使用(包括适当的内存屏障)。否则,您永远无法确定某些非常缓慢的读者是否仍在使用您想要重用或释放的结构体。" - Nate Eldredge
@NateEldredge,是的,我认为我已经解决了这个问题:“如果在主上下文仍在读取时可能发生另一个中断,则需要保留指向读取器正在使用的结构的指针,写入器可以在填充结构之前检查这一点”。 - Tom V
每个桶中的序列号可能是有用的。 可以用于SeqLock风格的撕裂检测,或像一个可以检测到满时的适当队列。(在循环缓冲队列中实现无锁进度保证展示了一个带有桶中序列号的队列。) - Peter Cordes

-1

我还想到了另一种基于SeqLock的解决方案。在知道我尝试实现的本质上是撕裂检测之后,我使用SeqLock模板进行了重写。我仍然将我的三个变量a,b,c定义为_Atomic uint32_t,因为我还想在thread_low_priority中使用atomic_fetch_*对它们进行修改。

在ARMv7-M架构中,RMW原子操作是使用ldrex/strex实现的。编译器会发出循环来检查strex是否成功。在我的情况下,当使用RMW操作时可能会出现问题,因为thread_high_priority需要快速运行且不被打断。我目前不知道是否存在这样一种情况,在thread_high_priority上下文中strex总是失败,从而导致死锁。

_Atomic uint32_t a, b, c;
atomic_uint seqcount = 0;

void thread_high_priority(void)
{
  uint32_t _a, _b, _c;
  
  uint orig_cnt = atomic_load_explicit(&seqcount, memory_order_relaxed);

  atomic_store_explicit(&seqcount, orig_cnt + 1, memory_order_relaxed);
  atomic_thread_fence(memory_order_release);

  _a = atomic_load_explicit(&a, memory_order_relaxed);
  _b = atomic_load_explicit(&b, memory_order_relaxed);
  _c = atomic_load_explicit(&c, memory_order_relaxed);
  atomic_store_explicit(&a, _a - 1, memory_order_relaxed);
  atomic_store_explicit(&b, _b + 1, memory_order_relaxed);
  atomic_store_explicit(&c, _c - 1, memory_order_relaxed);

  atomic_store_explicit(&seqcount, orig_cnt + 2, memory_order_release);
}

void thread_low_priority(void)
{
  uint32_t _a, _b, _c;
  
  uint c0, c1;
  do {
    c0 = atomic_load_explicit(&seqcount, memory_order_acquire);

    _a = atomic_load_explicit(&a, memory_order_relaxed);
    _b = atomic_load_explicit(&b, memory_order_relaxed);
    _c = atomic_load_explicit(&c, memory_order_relaxed);

    c1 = atomic_load_explicit(&seqcount, memory_order_acquire);
  } while (c0 & 1 || c0 != c1);
}

编辑:再次检查编译器的输出后,我稍微修改了thread_high_priority中的代码。使用ARM gcc 10.3.1(2021.10 none)进行编译,并使用编译标志-O1 -mcpu=cortex-m3 -std=gnu18 -mthumb

在我的原始代码中,如下所示,在存储之前发出dmb ish

atomic_store_explicit(&seqcount, orig_cnt + 1, memory_order_release);
--->
        adds    r1, r2, #1
        dmb     ish
        str     r1, [r3]

在我将内存屏障与存储分离后,dmb ish会在存储之后发出,以便在更新a、b、c之前可见seqcount的更新。

atomic_store_explicit(&seqcount, orig_cnt + 1, memory_order_relaxed);
atomic_thread_fence(memory_order_release);
-->
        adds    r1, r2, #1
        str     r1, [r3]
        dmb     ish

1
这看起来类似于SeqLock但不安全。读者可以在写者完成写作之前开始阅读。(有关C++ seqlock,请参见使用32位原子实现64位原子计数器,可轻松转换为C语言。) - Peter Cordes
可能我的逻辑有误,我想要检查 aflag 是否仍然为 true。这意味着在读取 a, b, c 后,aflag 不会改变,而不是用于检查新值的到来。我可以理解对于 thread_low_priority 来说它可能不是无锁的,因为它可能会永远阻塞。 - Taiyou Kuo
3
这种无锁写入者并带有撕裂检测和重试的读取方式,恰好是SeqLock所做的。与其尝试重新发明一个有缺陷的版本,不如直接使用它。写入"_Atomic uint32_t"的成本不比"atomic_flag"更高,而且写入者可以是只写模式,读取者则为只读模式。如果您说出了相对于正常SeqLock需要达到的目标,我或许可以给出一些建议,看看SeqLock是否已充分解决了您的问题。 - Peter Cordes
是的,你的第二个版本看起来像一个SeqLock,除了你在seqcount的存储和abc的存储之间缺少了atomic_thread_fence(memory_order_release)。 (或者将它们全部设为"release",但这在ARM32上可能会更昂贵)。你需要确保在第一个序列更新之前不可见任何有效载荷更新。 - Peter Cordes
1
顺便说一下,LDREX / STREX连续失败无法执行“atomic_fetch_add”不会导致死锁; 这将是一个活锁 - 每次执行尝试都有成功的可能性,并且必须由另一个线程积极干扰才能失败。 与死锁不同,例如等待永远不会解锁的锁。 但是,如果您不需要原子RMW,则最好通过先执行所有加载然后执行所有存储来允许一些负载/存储流水线处理。 - Peter Cordes
显示剩余14条评论

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接