MySQL 异步操作?

6
我面临的问题是阻塞。我的服务器采用基于C++ Boost.ASIO编码,使用8个线程,因为服务器有8个逻辑核心。
我的问题是,一个线程可能在MySQL查询时面临0.2~1.5秒的阻塞,而我不知道如何解决这个问题,因为MySQL C++连接器不支持异步查询,我也不知道如何正确地设计服务器以使用多个线程进行查询。
这就是我寻求意见的地方。创建100个线程以异步查询SQL?能否请专家给出意见?

你预计有多少并发的SQL查询?你需要支持多少个查询/秒?如果没有具体要求,我会建议:将查询排队,完成后再进行回复。 - sehe
2个回答

5
好的,解决方案是扩展Asio并编写一个mysql_service实现来集成它。我几乎要立即找出如何做到这一点,但我想开始使用“仿真”。

想法是有:

  • 您的业务流程使用io_service(正如您已经在做的那样)
  • 一个数据库“facade”接口,将异步查询调度到一个不同的队列(io_service)中,并将完成处理程序发布回业务流程的io_service中

这里需要进行微调,您需要防止业务流程端的io_service在其作业队列为空时立即关闭,因为它可能仍在等待来自数据库层的响应。

因此,将其建模为快速演示:

namespace database
{
    // data types
    struct sql_statement { std::string dml; };
    struct sql_response { std::string echo_dml; }; // TODO cover response codes, resultset data etc.

我希望你能原谅我的过度简化:/
struct service
{
    service(unsigned max_concurrent_requests = 10)
        : work(io_service::work(service_)),
        latency(mt19937(), uniform_int<int>(200, 1500)) // random 0.2 ~ 1.5s
    {
        for (unsigned i = 0; i < max_concurrent_requests; ++i)
            svc_threads.create_thread(boost::bind(&io_service::run, &service_));
    }

    friend struct connection;

private:
    void async_query(io_service& external, sql_statement query, boost::function<void(sql_response response)> completion_handler)
    {
        service_.post(bind(&service::do_async_query, this, ref(external), std::move(query), completion_handler));
    }

    void do_async_query(io_service& external, sql_statement q, boost::function<void(sql_response response)> completion_handler)
    {
        this_thread::sleep_for(chrono::milliseconds(latency())); // simulate the latency of a db-roundtrip

        external.post(bind(completion_handler, sql_response { q.dml }));
    }

    io_service service_;
    thread_group svc_threads; // note the order of declaration
    optional<io_service::work> work;

    // for random delay
    random::variate_generator<mt19937, uniform_int<int> > latency;
};

这项服务协调了最大数量的并发请求(在“数据库io_service”方面),并将完成情况通过另一个io_service(async_query/do_async_query组合)回传。这个存根实现以明显的方式模拟0.2~1.5秒的延迟 :)。现在是客户端“门面”。
struct connection
{
    connection(int connection_id, io_service& external, service& svc)
        : connection_id(connection_id),
          external_(external), 
          db_service_(svc)
    { }

    void async_query(sql_statement query, boost::function<void(sql_response response)> completion_handler)
    {
        db_service_.async_query(external_, std::move(query), completion_handler);
    }
  private:
    int connection_id;
    io_service& external_;
    service& db_service_;
};

connection实际上只是一种方便,使我们不必在调用站点上显式处理各种队列。

现在,让我们按照良好的Asio风格实现一个演示业务流程:

namespace domain
{
    struct business_process : id_generator
    {
        business_process(io_service& app_service, database::service& db_service_) 
            : id(generate_id()), phase(0), 
            in_progress(io_service::work(app_service)),
            db(id, app_service, db_service_)
        { 
            app_service.post([=] { start_select(); });
        }

    private:
        int id, phase;
        optional<io_service::work> in_progress;

        database::connection db;

        void start_select() {
            db.async_query({ "select * from tasks where completed = false" }, [=] (database::sql_response r) { handle_db_response(r); });
        }

        void handle_db_response(database::sql_response r) {
            if (phase++ < 4)
            {
                if ((id + phase) % 3 == 0) // vary the behaviour slightly
                {
                    db.async_query({ "insert into tasks (text, completed) values ('hello', false)" }, [=] (database::sql_response r) { handle_db_response(r); });
                } else
                {
                    db.async_query({ "update * tasks set text = 'update' where id = 123" }, [=] (database::sql_response r) { handle_db_response(r); });
                }
            } else
            {
                in_progress.reset();
                lock_guard<mutex> lk(console_mx);
                std::cout << "business_process " << id << " has completed its work\n";
            }
        }
    };

}

这个业务流程首先在应用服务上发布自己。然后连续进行多次数据库查询,最终退出(通过执行in_progress.reset(),应用服务会知道这一点)。
一个演示主程序,在单个线程上启动10个业务流程:
int main()
{
    io_service app;
    database::service db;

    ptr_vector<domain::business_process> bps;
    for (int i = 0; i < 10; ++i)
    {
        bps.push_back(new domain::business_process(app, db));
    }

    app.run();
}

在我的示例中,业务流程不会执行任何 CPU 密集型工作,因此没有必要在 CPU 之间安排它们的调度,但如果您想要这样做,只需将 app.run() 行替换为以下内容即可轻松实现:
thread_group g;
for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
    g.create_thread(boost::bind(&io_service::run, &app));
g.join_all();

查看演示,运行在Coliru上实时运行


正如我之前所说,我对asio并不了解(尽管我曾与编写它的Chris Kohlhoff一起工作——一个非常聪明的人),因此我无法为您的解决方案添加太多内容。看起来您正在使用异步完成处理程序,因此线程不会等待数据库完成。我应该预料到这一点,因为asio中的“a”代表“异步”。我会归咎于老年 :-) - Michael J
澄清一下:我原本期望客户端和服务器之间的通信是异步链接,但我没有意识到服务器监听器和数据库之间也是异步的。 - Michael J
COMMs基本上是我们无法控制的,除非您考虑重写连接器。一些库使其可扩展。在这方面,LibCurl非常酷(请参见CURLOPT_SOCKOPTFUNCTION)。 - sehe

0

我不是MySQL大师,但以下是通用的多线程建议。

  • 当没有任何线程阻塞且您只是将负载分配到所有CPU上时,NumberOfThreads == NumberOfCores才是合适的。

  • 一个常见的模式是每个CPU有多个线程,所以一个正在执行,而另一个正在等待某些事情。

  • 在您的情况下,我倾向于设置NumberOfThreads = n * NumberOfCores,其中'n'从配置文件、注册表或其他用户可设置的值中读取。您可以使用不同的'n'值测试系统以找到最佳值。我建议首次猜测约为3。


我觉得你可能对Boost Asio(或类似的异步IO库,如libuv)不是很熟悉。对于IO密集型应用程序,最好的建议通常是在尽可能少的线程上多路复用IO - 通常是1个线程。为“补偿”阻塞而创建许多线程实际上只是增加了更多的瓶颈(线程消耗资源并创建日益增长的调度开销)。 - sehe
@sehe - 你说得对,我不熟悉boost::asio,但是我已经使用过许多多线程通信系统。如果asio/mysql有一个线程池并且进行适当的作业管理,以便等待作业不会占用线程,那么额外的线程是没有帮助的。如果它让线程睡眠,等待i/o(正如问题所示),那么为了保持CPU忙碌而增加额外的线程通常会提高性能。你知道asio/mysql如何管理作业吗? - Michael J
asio/mysql并不管理作业,这就是重点。Asio服务在等待时不会“独占”线程,但Asio尚未了解Mysql接口。请稍后查看我的答案。 - sehe
我刚刚根据我对Asio的当前理解发布了一个答案。我认为有一种更干净的方法,即编写一个适当的扩展服务。这应该不会更加复杂(它正在做同样的事情),但它将更好地集成(例如与[strand](http://www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/reference/io_service__strand.html)和[coroutines](http:// www.boost.org/doc/libs/1_55_0/doc/html/boost_asio/overview/core.html))。它确实展示了Asio的能力;请注意所有业务流程都在单个线程上,但可以自由多路复用的事实。 - sehe

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接