如何实现基本的事件循环?

75

如果你有使用过图形用户界面工具包,就会知道在所有操作完成后需要执行一个事件循环/主循环,来保持应用程序的活性并对不同的事件做出响应。例如,在Qt中,你可以在main()函数中实现:

int main() {
    QApplication app(argc, argv);
    // init code
    return app.exec();
}

在这种情况下,app.exec() 是应用程序的主循环。

实现这种循环的明显方法是:

void exec() {
    while (1) {
        process_events(); // create a thread for each new event (possibly?)
    }
}

不过这会将CPU占用率限制在100%,实际上毫无用处。那么,我该如何实现这样一个事件循环,在不完全占用CPU的情况下保持响应性呢?

Python和/或C ++的答案将不胜感激。谢谢。

脚注:为了学习,我将自己实现信号/槽,并使用它们生成自定义事件(例如go_forward_event(steps))。但是如果您知道如何手动使用系统事件,我也想知道。


4
我相信你可以深入Qt的源代码并查看他们在exec()中正在做什么,这可能会给你一些很好的指示。 - JimDaniel
7个回答

85

我曾经对这个问题也很疑惑!

GUI主循环的伪代码如下:

void App::exec() {
    for(;;) {
        vector<Waitable> waitables;
        waitables.push_back(m_networkSocket);
        waitables.push_back(m_xConnection);
        waitables.push_back(m_globalTimer);
        Waitable* whatHappened = System::waitOnAll(waitables);
        switch(whatHappened) {
            case &m_networkSocket: readAndDispatchNetworkEvent(); break;
            case &m_xConnection: readAndDispatchGuiEvent(); break;
            case &m_globalTimer: readAndDispatchTimerEvent(); break;
        }
    }
}

什么是“Waitable”?这取决于系统。在UNIX上,它被称为“文件描述符”,而“waitOnAll”是::select系统调用。所谓的vector<Waitable>在UNIX上是一个::fd_set,“whatHappened”实际上是通过FD_ISSET查询的。实际的可等待句柄可以通过各种方式获取,例如m_xConnection可以从::XConnectionNumber()中获取。X11还为此提供了一个高级、可移植的API——::XNextEvent()——但如果您使用它,就无法同时等待多个事件源。

阻塞是如何工作的?“waitOnAll”是一个系统调用,告诉操作系统将您的进程放入“睡眠列表”中。这意味着在等待任何可等待对象发生事件之前,您的进程不会获得任何CPU时间。因此,您的进程处于空闲状态,消耗0%的CPU。当事件发生时,您的进程将短暂地对其进行反应,然后返回空闲状态。GUI应用程序几乎全部时间都在空闲状态。

当你睡觉时,所有的CPU周期会发生什么?这取决于情况。有时候其他进程会利用它们。如果没有,你的操作系统将会忙等待CPU,或将其置于临时低功耗模式等。

请进一步询问细节!


我该如何实现这样一个等待系统,以等待我的自定义信号,而不是系统信号? - fengshaun
正如我所说,你的代码只有在响应事件时才会运行。因此,如果你触发自己的事件,你将作为对某个系统事件的反应而这样做。然后就变得清楚,你实际上不需要一个事件系统来处理你的自定义事件。直接调用处理程序即可! - Iraimbilanja
例如,考虑一个信号 "Button::clicked"。它只会在响应系统事件(左鼠标释放)时触发。因此,您的代码变成了 "virtual void Button::handleLeftRelease(Point) { clicked.invoke(); }",无需线程、事件队列或任何其他东西。 - Iraimbilanja
1
多年来,我一直在想如何在低级别上实现事件驱动的进程。非常感谢! - Jonathan Dumaine

25

Python:

你可以查看Twisted反应器的实现,这可能是Python中最好的事件循环实现。在Twisted中,反应器是接口的实现,您可以指定要运行的反应器类型:select、epoll、kqueue(都基于使用这些系统调用的c api),还有基于QT和GTK工具包的反应器。

一种简单的实现方式是使用select:

#echo server that accepts multiple client connections without forking threads

import select
import socket
import sys

host = ''
port = 50000
backlog = 5
size = 1024
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind((host,port))
server.listen(backlog)
input = [server,sys.stdin]
running = 1

#the eventloop running
while running:
    inputready,outputready,exceptready = select.select(input,[],[])

    for s in inputready:

        if s == server:
            # handle the server socket
            client, address = server.accept()
            input.append(client)

        elif s == sys.stdin:
            # handle standard input
            junk = sys.stdin.readline()
            running = 0

        else:
            # handle all other sockets
            data = s.recv(size)
            if data:
                s.send(data)
            else:
                s.close()
                input.remove(s)
server.close() 

14

通常我会使用某种形式的计数信号量来完成以下操作:

  1. 信号量从零开始。
  2. 事件循环等待信号量。
  3. 事件发生,信号量递增。
  4. 事件处理程序解除阻塞并减少信号量,然后处理事件。
  5. 当所有事件都被处理完毕时,信号量为零,事件循环再次阻塞。

如果你不想那么麻烦,你可以在你的while循环中添加一个sleep()调用,休眠时间非常短。这将使你的消息处理线程将CPU时间让出给其他线程。CPU利用率就不会一直保持在100%了,但仍然存在浪费的情况。


这听起来很诱人,我得学习更多关于线程的知识。谢谢。 - fengshaun
1
@FallingFromBed - 不是忙等待,而是在信号量上进行阻塞等待。这种区别很重要,因为阻塞等待不会消耗 CPU 时间。 - Eric Petroelje

13

我会使用一个简单、轻量级的消息传递库,叫做ZeroMQ(http://www.zeromq.org/)。它是一个开源库(LGPL)。这是一个非常小的库;在我的服务器上,整个项目编译大约需要60秒。

ZeroMQ将极大地简化您的事件驱动代码,并且在性能方面也是最高效的解决方案。使用ZeroMQ在线程之间通信比使用信号量或本地UNIX套接字要快得多(速度方面)。ZeroMQ还可以成为100%可移植的解决方案,而所有其他解决方案都会将您的代码限制在特定的操作系统上。


7

这是一个 C++ 事件循环。在创建 EventLoop 对象时,它会创建一个线程,该线程会不断运行任何给定的任务。如果没有可用的任务,则主线程会休眠,直到添加了某个任务。

首先,我们需要一个线程安全队列,允许多个生产者和至少一个消费者(即 EventLoop 线程)。EventLoop 对象控制消费者和生产者。稍加修改后,可以添加多个消费者(执行线程),而不仅仅是一个线程。

#include <stdio.h>
#include <thread>
#include <mutex>
#include <condition_variable>
#include <iostream>
#include <set>
#include <functional>

#if defined( WIN32 )
    #include <windows.h>
#endif

class EventLoopNoElements : public std::runtime_error
{
public:
    EventLoopNoElements(const char* error)
        : std::runtime_error(error)
    {
    }
};

template <typename Type>
struct EventLoopCompare {
    typedef std::tuple<std::chrono::time_point<std::chrono::system_clock>, Type> TimePoint;

    bool operator()(const typename EventLoopCompare<Type>::TimePoint left, const typename EventLoopCompare<Type>::TimePoint right) {
        return std::get<0>(left) < std::get<0>(right);
    }
};

/**
 * You can enqueue any thing with this event loop. Just use lambda functions, future and promises!
 * With lambda `event.enqueue( 1000, [myvar, myfoo](){ myvar.something(myfoo); } )`
 * With futures we can get values from the event loop:
 * ```
 * std::promise<int> accumulate_promise;
 * event.enqueue( 2000, [&accumulate_promise](){ accumulate_promise.set_value(10); } );
 * std::future<int> accumulate_future = accumulate_promise.get_future();
 * accumulate_future.wait(); // It is not necessary to call wait, except for syncing the output.
 * std::cout << "result=" << std::flush << accumulate_future.get() << std::endl;
 * ```
 * It is just not a nice ideia to add something which hang the whole event loop queue.
 */
template <class Type>
struct EventLoop {
    typedef std::multiset<
        typename EventLoopCompare<Type>::TimePoint,
        EventLoopCompare<Type>
    > EventLoopQueue;

    bool _shutdown;
    bool _free_shutdown;

    std::mutex _mutex;
    std::condition_variable _condition_variable;
    EventLoopQueue _queue;
    std::thread _runner;

    // free_shutdown - if true, run all events on the queue before exiting
    EventLoop(bool free_shutdown)
        : _shutdown(false),
        _free_shutdown(free_shutdown),
        _runner( &EventLoop<Type>::_event_loop, this )
    {
    }

    virtual ~EventLoop() {
        std::unique_lock<std::mutex> dequeuelock(_mutex);
        _shutdown = true;
        _condition_variable.notify_all();
        dequeuelock.unlock();

        if (_runner.joinable()) {
            _runner.join();
        }
    }

    // Mutex and condition variables are not movable and there is no need for smart pointers yet
    EventLoop(const EventLoop&) = delete;
    EventLoop& operator =(const EventLoop&) = delete;
    EventLoop(const EventLoop&&) = delete;
    EventLoop& operator =(const EventLoop&&) = delete;

    // To allow multiple threads to consume data, just add a mutex here and create multiple threads on the constructor
    void _event_loop() {
        while ( true ) {
            try {
                Type call = dequeue();
                call();
            }
            catch (EventLoopNoElements&) {
                return;
            }
            catch (std::exception& error) {
                std::cerr << "Unexpected exception on EventLoop dequeue running: '" << error.what() << "'" << std::endl;
            }
            catch (...) {
                std::cerr << "Unexpected exception on EventLoop dequeue running." << std::endl;
            }
        }
        std::cerr << "The main EventLoop dequeue stopped running unexpectedly!" << std::endl;
    }

    // Add an element to the queue
    void enqueue(int timeout, Type element) {
        std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
        std::chrono::time_point<std::chrono::system_clock> newtime = timenow + std::chrono::milliseconds(timeout);

        std::unique_lock<std::mutex> dequeuelock(_mutex);
        _queue.insert(std::make_tuple(newtime, element));
        _condition_variable.notify_one();
    }

    // Blocks until getting the first-element or throw EventLoopNoElements if it is shutting down
    // Throws EventLoopNoElements when it is shutting down and there are not more elements
    Type dequeue() {
        typename EventLoopQueue::iterator queuebegin;
        typename EventLoopQueue::iterator queueend;
        std::chrono::time_point<std::chrono::system_clock> sleeptime;

        // _mutex prevents multiple consumers from getting the same item or from missing the wake up
        std::unique_lock<std::mutex> dequeuelock(_mutex);
        do {
            queuebegin = _queue.begin();
            queueend = _queue.end();

            if ( queuebegin == queueend ) {
                if ( _shutdown ) {
                    throw EventLoopNoElements( "There are no more elements on the queue because it already shutdown." );
                }
                _condition_variable.wait( dequeuelock );
            }
            else {
                if ( _shutdown ) {
                    if (_free_shutdown) {
                        break;
                    }
                    else {
                        throw EventLoopNoElements( "The queue is shutting down." );
                    }
                }
                std::chrono::time_point<std::chrono::system_clock> timenow = std::chrono::system_clock::now();
                sleeptime = std::get<0>( *queuebegin );
                if ( sleeptime <= timenow ) {
                    break;
                }
                _condition_variable.wait_until( dequeuelock, sleeptime );
            }
        } while ( true );

        Type firstelement = std::get<1>( *queuebegin );
        _queue.erase( queuebegin );
        dequeuelock.unlock();
        return firstelement;
    }
};

打印当前时间戳的实用工具:

std::string getTime() {
    char buffer[20];
#if defined( WIN32 )
    SYSTEMTIME wlocaltime;
    GetLocalTime(&wlocaltime);
    ::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03d ", wlocaltime.wHour, wlocaltime.wMinute, wlocaltime.wSecond, wlocaltime.wMilliseconds);
#else
    std::chrono::time_point< std::chrono::system_clock > now = std::chrono::system_clock::now();
    auto duration = now.time_since_epoch();
    auto hours = std::chrono::duration_cast< std::chrono::hours >( duration );
    duration -= hours;
    auto minutes = std::chrono::duration_cast< std::chrono::minutes >( duration );
    duration -= minutes;
    auto seconds = std::chrono::duration_cast< std::chrono::seconds >( duration );
    duration -= seconds;
    auto milliseconds = std::chrono::duration_cast< std::chrono::milliseconds >( duration );
    duration -= milliseconds;
    time_t theTime = time( NULL );
    struct tm* aTime = localtime( &theTime );
    ::snprintf(buffer, sizeof buffer, "%02d:%02d:%02d.%03ld ", aTime->tm_hour, aTime->tm_min, aTime->tm_sec, milliseconds.count());
#endif
    return buffer;
}

使用这些的示例程序:

// g++ -o test -Wall -Wextra -ggdb -g3 -pthread test.cpp && gdb --args ./test
// valgrind --leak-check=full --show-leak-kinds=all --track-origins=yes --verbose ./test
// procdump -accepteula -ma -e -f "" -x c:\ myexe.exe
int main(int argc, char* argv[]) {
    std::cerr << getTime().c_str() << "Creating EventLoop" << std::endl;
    EventLoop<std::function<void()>>* eventloop = new EventLoop<std::function<void()>>(true);

    std::cerr << getTime().c_str() << "Adding event element" << std::endl;
    eventloop->enqueue( 3000, []{ std::cerr << getTime().c_str() << "Running task 3" << std::endl; } );
    eventloop->enqueue( 1000, []{ std::cerr << getTime().c_str() << "Running task 1" << std::endl; } );
    eventloop->enqueue( 2000, []{ std::cerr << getTime().c_str() << "Running task 2" << std::endl; } );

    std::this_thread::sleep_for( std::chrono::milliseconds(5000) );
    delete eventloop;
    std::cerr << getTime().c_str() << "Exiting after 10 seconds..." << std::endl;
    return 0;
}

输出测试示例:

02:08:28.960 Creating EventLoop
02:08:28.960 Adding event element
02:08:29.960 Running task 1
02:08:30.961 Running task 2
02:08:31.961 Running task 3
02:08:33.961 Exiting after 10 seconds...

更新

最终,所呈现的事件循环类似于时间管理器。更好的时间管理器接口不应强制用户使用线程,以下是一个示例:

class TimerManager    
{
public:
    std::chrono::steady_clock clock_type;
    // setup given function to be executed at given timeout
    // @return unique identifier
    uint64_t start( std::chrono::milliseconds timeout, const std::function< void( void ) >& func );
    // cancel given unique identifier
    void cancel( uint64_t id );
    // handle all expired entries
    // @return next expiration or zero when queue is empty
    std::chrono::milliseconds run( );
}

-1:讨厌者会讨厌;+1 爱好者会喜欢。 - user
2
很好的实现,我给你点赞。 - asitdhal
1
这是一个非常优美的实现。感谢您的分享。 - sleepystar96

1

这个答案适用于类Unix系统,如Linux或Mac OS X。我不知道在Windows上如何实现。

使用select()或pselect()。Linux还有poll()。

请查阅man页面以获取详细信息。这些系统调用需要一个文件描述符列表、超时和/或信号掩码。这些系统调用让程序等待事件。如果列表中的任何一个文件描述符已准备好读取或写入(取决于设置,请参见man页面),超时到期或信号到达,则这些系统调用将返回。然后程序可以读/写文件描述符、处理信号或执行其他操作。之后它再次调用(p)select/poll并等待下一个事件。

套接字应该作为非阻塞方式打开,以便在没有数据/缓冲区满时读/写函数返回。对于常见的显示服务器X11,GUI通过套接字处理并具有文件描述符。因此,可以以相同的方式处理。


0
在Python中创建事件循环的基本应用程序之前,让我们先了解一下什么是事件循环。
事件循环是任何异步I/O框架的核心组件,它允许您在不阻塞程序执行的情况下并发执行I/O操作。事件循环在单个线程中运行,并负责接收和分派I/O事件(例如读取/写入文件或键盘中断)随着它们的发生。
import asyncio

async def coroutine():
    print('Start')
    await asyncio.sleep(1)
    print('End')

loop = asyncio.get_event_loop()
loop.run_until_complete(coroutine())

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接