C++ 回调定时器实现

3
我在我的C++应用程序中找到了以下回调定时器的实现方式。但是,这种实现方式要求我从“start”调用者中“join”线程,这实际上会阻塞start函数的调用者。
我真正想做的是:
1. 某人可以多次调用foo(data)并将它们存储在数据库中。 2. 每当调用foo(data)时,它都会启动一个几秒钟的定时器。 3. 当计时器在倒数时,foo(data)可以被多次调用并存储多个项,但直到计时器结束才调用erase。 4. 计时器到期后,一次性调用“remove”函数以从数据库中删除所有记录。
基本上,我想能够执行任务,等待几秒钟后批量执行单个批量任务B。
class CallBackTimer {

public:

    /**
     * Constructor of the CallBackTimer
     */
    CallBackTimer() :_execute(false) { }

    /**
     * Destructor
     */
    ~CallBackTimer() {
        if (_execute.load(std::memory_order_acquire)) {
            stop();
        };
    }

    /**
     * Stops the timer
     */
    void stop() {
        _execute.store(false, std::memory_order_release);
        if (_thd.joinable()) {
            _thd.join();
        }
    }

    /**
     * Start the timer function
     * @param interval Repeating duration in milliseconds, 0 indicates the @func will run only once
     * @param delay Time in milliseconds to wait before the first callback
     * @param func Callback function
     */
    void start(int interval, int delay, std::function<void(void)> func) {
        if(_execute.load(std::memory_order_acquire)) {
            stop();
        };
        _execute.store(true, std::memory_order_release);


        _thd = std::thread([this, interval, delay, func]() {
            std::this_thread::sleep_for(std::chrono::milliseconds(delay));
            if (interval == 0) {
                func();
                stop();
            } else {
                while (_execute.load(std::memory_order_acquire)) {
                    func();
                    std::this_thread::sleep_for(std::chrono::milliseconds(interval));
                }
            }
        });

    }

    /**
     * Check if the timer is currently running
     * @return bool, true if timer is running, false otherwise.
     */
    bool is_running() const noexcept {
        return ( _execute.load(std::memory_order_acquire) && _thd.joinable() );
    }


private:
    std::atomic<bool> _execute;
    std::thread _thd;

};

我尝试使用thread.detach()修改上述代码。然而,我遇到了一个问题,即分离的线程无法从数据库中写入(擦除)。

非常感谢您的帮助和建议!


你能添加一个使用这个类的示例吗?你所要求的似乎是冒险和不必要的。 - user4581301
也许你可以在构造函数中构建线程,在析构函数中加入它?然后你只需要一个std::queue来通过start()添加新的项目(回调),并通过stop()刷新所有元素。一旦超出范围,析构函数将负责线程。然后线程中会有一个while循环,直到队列有任何项目为止。然后开始处理它。下一个while循环将检查定时器和项目数量。在内部,它将逐个处理项目1。然后让它重复直到停止。析构函数将进行清理。我猜间隔是等待项目之间的时间,而延迟只是其触发时间。 - huseyin tugrul buyukisik
1个回答

3
使用std::async而不是线程(thread)。以下类将在添加最后一个字符串4秒后,按顺序处理排队的字符串。一次只启动1个异步任务,并且std::async会为您处理所有线程操作。
如果在类被销毁时队列中有未处理的项,则异步任务会立即停止而不等待,这些项将不会被处理(但如果这不是您想要的行为,则很容易进行更改)。
#include <iostream>
#include <string>
#include <future>
#include <mutex>
#include <chrono>
#include <queue>

class Batcher
{
public:
  Batcher()
    : taskDelay( 4 ),
      startTime( std::chrono::steady_clock::now() ) // only used for debugging
  {
  }

  void queue( const std::string& value )
  {
    std::unique_lock< std::mutex > lock( mutex );
    std::cout << "queuing '" << value << " at " << std::chrono::duration_cast< std::chrono::milliseconds >( std::chrono::steady_clock::now() - startTime ).count() << "ms\n";
    work.push( value );
    // increase the time to process the queue to "now + 4 seconds"
    timeout = std::chrono::steady_clock::now() + taskDelay;
    if ( !running )
    {
      // launch a new asynchronous task which will process the queue
      task = std::async( std::launch::async, [this]{ processWork(); } );
      running = true;
    }
  }

  ~Batcher()
  {
    std::unique_lock< std::mutex > lock( mutex );
    // stop processing the queue
    closing = true;
    bool wasRunning = running;
    condition.notify_all();
    lock.unlock();
    if ( wasRunning )
    {
      // wait for the async task to complete
      task.wait();
    }
  }

private:
  std::mutex mutex;
  std::condition_variable condition;
  std::chrono::seconds taskDelay;
  std::chrono::steady_clock::time_point timeout;
  std::queue< std::string > work;
  std::future< void > task;
  bool closing = false;
  bool running = false;
  std::chrono::steady_clock::time_point startTime;

  void processWork()
  {
    std::unique_lock< std::mutex > lock( mutex );
    // loop until std::chrono::steady_clock::now() > timeout
    auto wait = timeout - std::chrono::steady_clock::now();
    while ( !closing && wait > std::chrono::seconds( 0 ) )
    {
      condition.wait_for( lock, wait );
      wait = timeout - std::chrono::steady_clock::now();
    }
    if ( !closing )
    {
      std::cout << "processing queue at " << std::chrono::duration_cast< std::chrono::milliseconds >( std::chrono::steady_clock::now() - startTime ).count() << "ms\n";
      while ( !work.empty() )
      {
        std::cout << work.front() << "\n";
        work.pop();
      }
      std::cout << std::flush;
    }
    else
    {
      std::cout << "aborting queue processing at " << std::chrono::duration_cast< std::chrono::milliseconds >( std::chrono::steady_clock::now() - startTime ).count() << "ms with " << work.size() << " remaining items\n";
    }
    running = false;
  }
};

int main()
{
  Batcher batcher;
  batcher.queue( "test 1" );
  std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
  batcher.queue( "test 2" );
  std::this_thread::sleep_for( std::chrono::seconds( 1 ) );
  batcher.queue( "test 3" );
  std::this_thread::sleep_for( std::chrono::seconds( 2 ) );
  batcher.queue( "test 4" );
  std::this_thread::sleep_for( std::chrono::seconds( 5 ) );
  batcher.queue( "test 5" );
}

谢谢你的回答,实际上我发现根本问题并不是由于CallbackTimer / Batcher引起的。由于某种原因,在回调函数中的sqlite语句(在你的情况下,auto function = work.front() && function())删除表中的行,但实际上没有删除该行。 - Dillon

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接