无锁更新成员变量的环形分配器?

7

我是一个帮助翻译的助手。以下是需要翻译的内容:

我有一个类,用于存储某些实时数据的最新值(大约每秒150亿个事件)。

假设它看起来像这样:

class DataState 
{
    Event latest_event;

  public:
  //pushes event atomically
  void push_event(const Event __restrict__* e);
  //pulls event atomically
  Event pull_event();
};

我需要能够原子地推送事件并以严格的顺序保证进行拉取操作。现在,我知道我可以使用自旋锁,但考虑到巨大的事件速率(超过每秒100万次)和高度并发性,我更愿意使用无锁操作。
问题在于,Event的大小为64字节。 当前X86 CPU上没有任何CMPXCHG64B指令(截至'16年8月)。 因此,如果我使用std :: atomic ,我必须链接到libatomic,后者在内部使用互斥锁(太慢了)。
所以我的解决方案是代替原子交换值的指针。 问题是,动态内存分配会成为这些事件速率的瓶颈。 因此……我定义了一个我称之为“环形分配器”的东西:
/// @brief Lockfree Static short-lived allocator used for a ringbuffer
/// Elements are guaranteed to persist only for "size" calls to get_next()
template<typename T> class RingAllocator {
  T *arena;
  std::atomic_size_t arena_idx;
  const std::size_t arena_size;
 public:
  /// @brief Creates a new RingAllocator
  /// @param size The number of elements in the underlying arena. Make this large enough to avoid overwriting fresh data
  RingAllocator<T>(std::size_t size) : arena_size(size)
  {
  //allocate pool
  arena = new T[size];
  //zero out pool
  std::memset(arena, 0, sizeof(T) * size);
  arena_idx = 0;
  }

  ~RingAllocator()
  {
  delete[] arena;
  }

  /// @brief Return next element's pointer. Thread-safe
  /// @return pointer to next available element
  T *get_next()
  {
      return &arena[arena_idx.exchange(arena_idx++ % arena_size)];
  }
};

那么我的DataState类可能看起来像这样:

class DataState 
{
    std::atomic<Event*> latest_event;
    RingAllocator<Event> event_allocator;
  public:
  //pushes event atomically
  void push_event(const Event __restrict__* e)
  {
      //store event
      Event *new_ptr = event_allocator.get_next()
      *new_ptr = *e;
      //swap event pointers
      latest_event.store(new_ptr, std::memory_order_release);
  }
  //pulls event atomically
  Event pull_event()
  {
      return *(latest_event.load(std::memory_order_acquire));
  }
};

只要我将我的环形分配器大小设置为可能同时调用函数的最大线程数,就不会有覆盖pull_event可能返回的数据的风险。此外,所有内容都非常本地化,因此间接引用不会导致较差的缓存性能。这种方法可能存在什么潜在问题吗?

1
如果这段代码能够正常运行,而你想要寻求可能的改进或风险,最好在SE Code Review上发布你的问题。 - πάντα ῥεῖ
MM 代表什么?我知道 M 代表“兆”(百万),如果你是指千亿,为什么不用“T”(太赫兹)来表示呢? - Martin Bonner supports Monica
1
@MartinBonner 编辑后说是百万。在金融行业中,使用“MM”后缀来表示百万很常见。实际上我也不确定为什么。 - alfalfasprout
你确定这些都有帮助吗?直觉上,一个由互斥锁保护的固定结构会更快。 - David Schwartz
1
如果您可以在具有非错误/禁用TSX(Skylake和可能的某些后期Broadwell)的CPU上运行此代码,事务性内存可以将多个操作组合成单个原子操作,其中包括超过16B的内存(其中cmpxchg16B是x86-64上当前最大的原子RMW大小)。请参见http://www.realworldtech.com/haswell-tm/。您必须看看它的性能表现,与指针交换或锁定相比如何。 - Peter Cordes
显示剩余10条评论
3个回答

2

DataState类:

我以为它会是一个堆栈或队列,但它不是,所以push / pull似乎不是很好的方法名。(否则实现就完全错误了)。

它只是一个让你读取任何线程存储的最后一个事件的门闩。

没有任何东西可以阻止连续两次写入来覆盖从未读取过的元素。也没有任何东西可以阻止您两次读取相同的元素。

如果您只需要复制小块数据的地方,环形缓冲区似乎是一个不错的方法。但如果您不想丢失事件,我认为您不能以这种方式使用它。相反,只需获取环形缓冲区条目,然后将其复制并在那里使用即可。因此,唯一的原子操作应该是增加环形缓冲区位置索引。


环形缓冲区

您可以使get_next()更加高效。这行代码执行原子后增量(fetch_add)和原子交换:

return &arena[arena_idx.exchange(arena_idx++ % arena_size)];

我甚至不确定是否安全,因为 xchg 可能会干扰来自另一个线程的 fetch_add。无论如何,即使是安全的,也不是理想的。

你不需要那个。确保 arena_size 总是 2 的幂次方,然后你就不需要对共享计数器进行取模了。你可以让它继续增长,并且每个线程都按照自己的需求进行取模。它最终会回到起点,但它是一个二进制整数,因此它将在你的 arena size 的倍数处回到起点。

我建议存储一个 AND 掩码而不是大小,这样即使它不是编译时常量,% 也不会编译成任何其他指令,这确保我们避免了 64 位整数的 div 指令。

template<typename T> class RingAllocator {
  T *arena;
  std::atomic_size_t arena_idx;
  const std::size_t size_mask;   // maybe even make this a template parameter?
 public:
  RingAllocator<T>(std::size_t size) 
    : arena_idx(0),  size_mask(size-1)
  {
     // verify that size is actually a power of two, so the mask is all-ones in the low bits, and all-zeros in the high bits.
     // so that i % size == i & size_mask for all i
   ...
  }

  ...
  T *get_next() {
      size_t idx = arena_idx.fetch_add(1, std::memory_order_relaxed);  // still atomic, but we don't care which order different threads take blocks in
      idx &= size_mask;   // modulo our local copy of the idx
      return &arena[idx];
  }
};

如果您使用calloc而不是new+memset,分配竞技场将更有效率。操作系统在将页面提供给用户空间进程之前已经将它们清零(以防止信息泄漏),因此将它们全部写入只是浪费的工作。

  arena = new T[size];
  std::memset(arena, 0, sizeof(T) * size);

  // vs.

  arena = (T*)calloc(size, sizeof(T));

编写页面时,它们被插入其中,因此它们都连接到真实的物理页面,而不仅仅是系统范围内共享的物理零页面的写时复制映射(例如在new/malloc/calloc之后)。在NUMA系统上,选择的物理页面可能取决于实际触摸页面的线程,而不是执行分配的线程。但由于您正在重用池,因此编写页面的第一个核心可能不是最终使用它的核心。也许可以从微基准测试/性能计数器中寻找一些线索。

1
谢谢。这个建议和使用唯一序列标识符有助于解决我的问题。问题:当y是2的幂时,x%y ==(x&(y-1)),所以掩码不应该是size - 1吗? - alfalfasprout
是的。我想象中的实际代码是 size_mask(size_to_mask(size)),其中 size_to_mask 检查它是否确实是 2 的幂。但你可以在常量成员初始化之后,在构造函数中进行该检查。 - Peter Cordes

1
只要我将我的环形分配器大小设置为可能同时调用函数的最大线程数,就不会有覆盖pull_event返回数据的风险。......这种方法可能存在任何潜在问题吗?
问题是,如果我只有2个线程和10个元素在环形缓冲区中,第一个线程可能调用一次pull_event并处于“中间拉取”状态,然后第二个线程可能调用10次push,覆盖第一个线程正在拉取的内容。
再次强调,假设我正确理解了您的代码。
另外,如上所述,
return &arena[arena_idx.exchange(arena_idx++ % arena_size)];

在同一变量的交换中,arena_idx++看起来不对,实际上也是错误的。两个线程可以对其进行递增-ThreadA将其递增到8,而ThreadB将其递增到9,然后ThreadB将其交换到9,然后ThreadA将其交换到8。糟糕。

atomic(op1) @ atomic(op2) != atomic(op1 @ op2)

我担心代码中未显示的其他错误。我并不是说这是一种侮辱-无锁定并不容易。


是的,我现在使用fetch_add操作代替了原来的两个操作(原子增量和交换),并且线程本身计算模运算(正如Peter Cordes所建议的)。同时将其改为单线程以解决调度问题。 - alfalfasprout
@alfalfasprout:如果将其设置为单线程,则不需要使arena_idx成为原子。但是,哪些线程将使用数据?如果线程休眠10ms,则合理大小的环形缓冲区可能已经回绕并覆盖正在读取一半的事件。除非你指的是完全放弃多线程? - Peter Cordes

0

你有没有看过任何可用的C++ Disruptor(Java)端口?

disruptor--

disruptor

虽然它们不是完整的端口,但它们可能提供你所需要的一切。我目前正在开发一个更全面的端口,但它还没有准备好。


实际上,我以前使用过 disruptor。不幸的是,它并不适合这种用例(每个线程应该获得一个唯一的事件,例如:FIFO 队列而不是 PUB/SUB)。 - alfalfasprout
原始的Disruptor模式支持发布-订阅(pub sub),即1PMC,以及MPMC。前者是我们在C++端口中处理网络I/O时使用的方式。 - AKB
确实如此,但我不希望在线程之间出现重复事件。发布-订阅模式是“面向所有人的一切”(有时在客户端进行过滤)。我希望将事件分派到各个线程而不是重复。 - alfalfasprout
抱歉我的解释不够清晰。Disruptor 绝对支持 FIFO。也就是说,传入的事件将由下一个可用的消费者进行处理。也就是说,每个事件只会被一个消费者处理一次。 - AKB

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接