创建一个线程安全的原子计数器。

4

我在我的一个项目中有一个特定的需求,那就是保持某些操作的“计数”,并定期(例如24小时)“读取”+“重置”这些计数器。

操作将是:

  • 工作线程 -> 增加计数器(随机)
  • 计时器线程(例如24小时) -> 读取计数器 -> 做某事 -> 重置计数器

我感兴趣的平台是Windows,但如果这可以跨平台更好。 我正在使用Visual Studio,并且目标Windows架构仅为 x64

我不确定结果是否“正确”,以及我的实现是否正确。坦白地说,我从未使用过许多std包装器,而且我的c++知识相当有限。

结果为:

12272 Current: 2
12272 After: 0
12272 Current: 18
12272 After: 0
12272 Current: 20
12272 After: 0
12272 Current: 20
12272 After: 0
12272 Current: 20
12272 After: 0

以下是一个完全可复制/粘贴的可重现示例:
#include <iostream>
#include <chrono>
#include <thread>
#include <Windows.h>

class ThreadSafeCounter final 
{
private:
    std::atomic_uint m_Counter1;
    std::atomic_uint m_Counter2;
    std::atomic_uint m_Counter3;
public:
    ThreadSafeCounter(const ThreadSafeCounter&) = delete;
    ThreadSafeCounter(ThreadSafeCounter&&) = delete;
    ThreadSafeCounter& operator = (const ThreadSafeCounter&) = delete;
    ThreadSafeCounter& operator = (ThreadSafeCounter&&) = delete;

    ThreadSafeCounter() : m_Counter1(0), m_Counter2(0), m_Counter3(0) {}
    ~ThreadSafeCounter() = default;

    std::uint32_t IncCounter1() noexcept 
    {
        m_Counter1.fetch_add(1, std::memory_order_relaxed) + 1;
        return m_Counter1;
    }

    std::uint32_t DecCounter1() noexcept
    {
        m_Counter1.fetch_sub(1, std::memory_order_relaxed) - 1;
        return m_Counter1;
    }

    VOID ClearCounter1() noexcept
    {
        m_Counter1.exchange(0);
    }
};

int main()
{
    static ThreadSafeCounter Threads;

    auto Thread1 = []() {
        while (true)
        {
            auto test = Threads.IncCounter1();
            std::cout << std::this_thread::get_id() << " Threads.IncCounter1() -> " << test << std::endl;

            std::this_thread::sleep_for(std::chrono::seconds(2));
        }
    };

    auto Thread2 = []() {
        while (true)
        {
            auto test = Threads.DecCounter1();
            std::cout << std::this_thread::get_id() << " Threads.DecCounter1() -> " << test << std::endl;

            std::this_thread::sleep_for(std::chrono::seconds(2));
        }
    };

    auto Thread3 = []() {
        while (true)
        {
            Threads.ClearCounter1();
            std::cout << std::this_thread::get_id() << " Threads.ClearCounter1()" << std::endl;

            std::this_thread::sleep_for(std::chrono::seconds(2));
        }
    };

    std::thread th1(Thread1);
    std::thread th2(Thread2);
    std::thread th3(Thread3);

    th1.join();
    th2.join();
    th3.join();
}

我应该提到,在我的实际项目中并未使用std::thread包装器,线程是使用WinApi函数(例如CreateThread)创建的。上面的代码只是为了模拟/测试。

请指出上面的代码有什么错误,可以改进什么,是否完全正确。

谢谢!


1
你的问题首先说程序“工作”,但随后立即表示存在“问题”,然后给出一些输出,但没有解释输出问题的原因。 - paddy
那我不明白问题所在。如果你想要压力测试你的计数器以确保它实际上是原子性的,那么就同时生成数百个线程来增加或减少计数器数百万次,并设计测试参数以便你可以知道最终计数应该是多少。然后你只需要运行程序,等待线程完成并检查值是否正确即可。当然,一旦添加了“重置”,任何形式的“净零”测试都是虚假的。 - paddy
1
你假设像 IncCounter1() 这样的函数是原子的,但它们并不是。你看到的行为可能的一个解释是一个线程正在调用 IncCounter1(),在调用 m_Counter1.fetch_add() 之后但在到达 return 语句之前被其他线程调用 DecCounter1()ClearCounter1() 打断了。将 IncCounter1() 更改为类似于 return m_Counter1.fetch_add(1, std::memory_order_relaxed) + 1 的内容。其他操作也同理。 - Peter
1
为什么要使用锁来保护原子计数器?原子计数器旨在处理并发访问。 - János Benjamin Antal
1
@JánosBenjaminAntal 当然,我不会反驳这一点。如果你有更好的例子/解决方案,请随意发布 :) - Mecanik
显示剩余14条评论
3个回答

11

你为什么要编写一个ThreadSafeCounter类呢?

std::atomic<size_t>本身就是一个线程安全的计数器,这也是使用std::atomic的主要原因。所以,你应该使用它而不是另一个类。大多数原子类型都有operator++/operator--的重载,所以你的主循环可以轻松地改写成这样:

    static std::atomic_int ThreadCounter1(0);

    auto Thread1 = []() {
        while (true)
        {
            auto test = ++ThreadCounter1;  // or ThreadCounter1++, whatever you need
            std::cout << std::this_thread::get_id() << " Threads.IncCounter1() -> " << test << std::endl;

            std::this_thread::sleep_for(std::chrono::seconds(2));
        }
    };

    auto Thread2 = []() {
        while (true)
        {
            auto test = --ThreadCounter1;
            std::cout << std::this_thread::get_id() << " Threads.DecCounter1() -> " << test << std::endl;

            std::this_thread::sleep_for(std::chrono::seconds(2));
        }
    };

    auto Thread3 = []() {
        while (true)
        {
/* Note: You could simply "ThreadCounter1 = 0" assign it here.
But exchange not only assigns a new value, it returns the previous value.
*/
            auto ValueAtReset=ThreadCounter1.exchange(0);
            std::cout << std::this_thread::get_id() << " Threads.ClearCounter1() called at value" << ValueAtReset << std::endl;

            std::this_thread::sleep_for(std::chrono::seconds(2));
        }
    };

我忘记提到你的 DecCounter 操作存在一个问题。你正在使用 atomic_uint,它无法处理负数。但是不能保证 Thread1 先执行,这意味着在 Thread2 运行(也就是减少计数器)之前,Thread1 可能已经执行了。这意味着计数器将会被包装。

所以你可以/应该改用 std::atomic<int>。这将给出正确的 (calls_Thread1 - calls_Thread2) 数量。如果 Thead2 比 Thread1 减少更多值,该数量将为负数。


尽管我认为operator ++operator--的重要部分基于使用默认内存顺序std::memory_order_seq_cstfetch_addfetch_sub,因此我建议明确使用适当的内存顺序fetch_xxx。除此之外,如果额外的类不是必需的(我假设有其他功能),我认为你的方法是好的。 - János Benjamin Antal
3
当然,你关于内存顺序的想法是正确的。不过,除非性能真的很关键 - 并且已经证明增量/减量操作是罪魁祸首 - 我仍然建议使用operator++/--以提高代码清晰度。而且我不想在这里让问题变得复杂。内存顺序的细节很难理解,值得再发一个stackoverflow问题来探讨 :) - Hajo Kirchhoff

2

这看起来很可疑:

std::uint32_t IncCounter1() noexcept 
{
    m_Counter1.fetch_add(1, std::memory_order_relaxed) + 1;
    return m_Counter1;
}

这行代码末尾的+1实际上是一个无操作,因为代码没有将该表达式的结果分配给任何变量。此外,在第二行再次引用m_Counter1会创建竞争条件-因为在执行fetch_add并再次引用该值以返回结果之间,可能会轻松地发生上下文切换到另一个线程更改其值。

我认为你想要的是:

std::uint32_t IncCounter1() noexcept 
{
    return m_Counter1.fetch_add(1, std::memory_order_relaxed) + 1;
}

fetch_add函数会在增加前返回之前的值。所以返回值+1即为该计数器此时的当前值。

DecCounter1中也存在相同问题。将该函数的实现更改为以下内容:

std::uint32_t DecCounter1() noexcept
{
    return m_Counter1.fetch_sub(1, std::memory_order_relaxed) - 1;
}

谢谢,我已经完成了这个更改。坦白地说,我会将这些函数设置为VOID,但是现在为了测试,我需要返回值。 - Mecanik
抱歉,我已经更新了我的帖子并提供了我确切的需求。 - Mecanik

1
我认为这些计数器的使用方式可能存在问题。如果在同一时间调用了大量的DecCounter1和一个或多个ClearCounter1,那么可能会发生ClearCounter1将计数器设置为0,然后执行大量DefCounter1(在锁定防止它们这样做之前),计数器最终变为负数。当以下情况发生时,这可能是一个问题:
  1. 计数器用于任何类型的资源管理(根据计数器值释放内存/文件等)
  2. 计数器仅用于某些统计信息,但计数器的值与线程数不成比例。例如:如果您要计算的一天中发生了8个事件,但线程数超过8个,则在非常不幸的情况下(当事件发生在重置计数器的同时)可能会得到下一周期的错误起始值,从而使统计数据不准确。

如果上述任何情况为真,则情况会更加困难,我尚未考虑过这一点。对于简单的统计计数器,我认为代码可以改进:原子操作和锁定几乎相同,通过锁定可能会降低性能而不获得任何收益(所有上述问题都是针对使用锁定、原子操作或两者的情况而言)。因此,我会摆脱锁定并仅使用原子操作。以下是我的改进版本提案(如果没有应用上述任何问题)

#include <iostream>
#include <chrono>
#include <thread>


class ThreadSafeCounter final
{
private:
  std::atomic_uint32_t m_Counter1;
  std::atomic_uint32_t m_Counter2;
  std::atomic_uint32_t m_Counter3;
public:
  ThreadSafeCounter(const ThreadSafeCounter&) = delete;
  ThreadSafeCounter(ThreadSafeCounter&&) = delete;
  ThreadSafeCounter& operator = (const ThreadSafeCounter&) = delete;
  ThreadSafeCounter& operator = (ThreadSafeCounter&&) = delete;

  ThreadSafeCounter() : m_Counter1(0), m_Counter2(0), m_Counter3(0) {}
  ~ThreadSafeCounter() = default;

  void IncCounter1() noexcept
  {
    m_Counter1.fetch_add(1, std::memory_order_relaxed);
  }

  void DecCounter1() noexcept
  {
    m_Counter1.fetch_sub(1, std::memory_order_relaxed);
  }

  std::uint32_t GetTotalCounter1()
  {
    return m_Counter1.load();
  }

  std::uint32_t GetAndClearCounter1()
  {
    return m_Counter1.exchange(0);
  }
};

int main()
{
  static ThreadSafeCounter Threads;

  auto WorkerThread1 = []() {
    while (true)
    {
      Threads.IncCounter1();
      std::this_thread::sleep_for(std::chrono::milliseconds(10));
    }
  };

  auto WorkerThread2 = []() {
    while (true)
    {
      Threads.DecCounter1();
      std::this_thread::sleep_for(std::chrono::milliseconds(20));
    }
  };

  auto FinalThread = []() {
    while (true)
    {
      auto Current = Threads.GetTotalCounter1();
      std::cout << std::this_thread::get_id() << " Current: " << Current << std::endl;

      const auto Before = Threads.GetAndClearCounter1();
      std::cout << std::this_thread::get_id() << " Before: " << Before << std::endl;

      auto After = Threads.GetTotalCounter1();
      std::cout << std::this_thread::get_id() << " After: " << After << std::endl;

      std::this_thread::sleep_for(std::chrono::seconds(1));
    }
  };

  std::thread th1(WorkerThread1);
  std::thread th2(WorkerThread2);
  std::thread th3(FinalThread);

  th1.join();
  th2.join();
  th3.join();
}

我所做的更改:
  1. 我移除了锁,因为在我看来它并没有任何额外的作用。
  2. 我将 m_Counter1.fetch_sub(0) 替换为 m_Counter1.load(),因为它们的功能相同,但是 load 操作更为明显。
  3. 我将 atomic_uint 类型替换为 atomic_uint32_t,以匹配 GetTotalCounter1 的返回值类型。
  4. 添加了一个函数 GetAndClearCounter1 来确保在读取和清除值之间没有其他操作发生。
  5. 移除了 IncCounter1DecCounter1 中未使用的 + 1- 1
  6. VOID 返回类型更改为 void,因为在我看来,void 更符合惯用语法。
最终结论是,我认为阅读 std::atomic 的文档 可以真正帮助你。虽然这不是最简短的文档,但肯定足够详细,让你熟悉操作、内存顺序等方面。

如果你担心出现负数(或无符号溢出),那么你应该考虑使用compare_exchange。至少在递减时是这样的。 - Homer512
你可以使用比较交换的方式来获取值,并检查它是否大于0,然后使用compare_exchange检查该值是否已更改,如果没有则递减。但是,如果该值已更改会发生什么呢?如果您重试它,则很容易陷入无限循环中。如果我没有关于确切用例(何时可以减小等)的进一步知识,我就不会尝试解决这个问题,因为这将是一场大混乱。 - János Benjamin Antal

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接