好的,解决方案是扩展Asio并编写一个mysql_service实现来集成它。我几乎要立即
找出如何做到这一点,但我想开始使用“仿真”。
想法是有:
- 您的业务流程使用io_service(正如您已经在做的那样)
- 一个数据库“facade”接口,将异步查询调度到一个不同的队列(io_service)中,并将完成处理程序发布回业务流程的io_service中
这里需要进行微调,您需要防止业务流程端的io_service在其作业队列为空时立即关闭,因为它可能仍在等待来自数据库层的响应。
因此,将其建模为快速演示:
namespace database
{
// data types
struct sql_statement { std::string dml; };
struct sql_response { std::string echo_dml; }; // TODO cover response codes, resultset data etc.
我希望你能原谅我的过度简化:/
struct service
{
service(unsigned max_concurrent_requests = 10)
: work(io_service::work(service_)),
latency(mt19937(), uniform_int<int>(200, 1500))
{
for (unsigned i = 0; i < max_concurrent_requests; ++i)
svc_threads.create_thread(boost::bind(&io_service::run, &service_));
}
friend struct connection;
private:
void async_query(io_service& external, sql_statement query, boost::function<void(sql_response response)> completion_handler)
{
service_.post(bind(&service::do_async_query, this, ref(external), std::move(query), completion_handler));
}
void do_async_query(io_service& external, sql_statement q, boost::function<void(sql_response response)> completion_handler)
{
this_thread::sleep_for(chrono::milliseconds(latency()));
external.post(bind(completion_handler, sql_response { q.dml }));
}
io_service service_;
thread_group svc_threads;
optional<io_service::work> work;
random::variate_generator<mt19937, uniform_int<int> > latency;
};
这项服务协调了最大数量的并发请求(在“数据库io_service”方面),并将完成情况通过另一个io_service(async_query/do_async_query组合)回传。这个存根实现以明显的方式模拟0.2~1.5秒的延迟 :)。现在是客户端“门面”。
struct connection
{
connection(int connection_id, io_service& external, service& svc)
: connection_id(connection_id),
external_(external),
db_service_(svc)
{ }
void async_query(sql_statement query, boost::function<void(sql_response response)> completion_handler)
{
db_service_.async_query(external_, std::move(query), completion_handler);
}
private:
int connection_id;
io_service& external_;
service& db_service_;
};
connection
实际上只是一种方便,使我们不必在调用站点上显式处理各种队列。
现在,让我们按照良好的Asio风格实现一个演示业务流程:
namespace domain
{
struct business_process : id_generator
{
business_process(io_service& app_service, database::service& db_service_)
: id(generate_id()), phase(0),
in_progress(io_service::work(app_service)),
db(id, app_service, db_service_)
{
app_service.post([=] { start_select(); });
}
private:
int id, phase;
optional<io_service::work> in_progress;
database::connection db;
void start_select() {
db.async_query({ "select * from tasks where completed = false" }, [=] (database::sql_response r) { handle_db_response(r); });
}
void handle_db_response(database::sql_response r) {
if (phase++ < 4)
{
if ((id + phase) % 3 == 0)
{
db.async_query({ "insert into tasks (text, completed) values ('hello', false)" }, [=] (database::sql_response r) { handle_db_response(r); });
} else
{
db.async_query({ "update * tasks set text = 'update' where id = 123" }, [=] (database::sql_response r) { handle_db_response(r); });
}
} else
{
in_progress.reset();
lock_guard<mutex> lk(console_mx);
std::cout << "business_process " << id << " has completed its work\n";
}
}
};
}
这个业务流程首先在应用服务上发布自己。然后连续进行多次数据库查询,最终退出(通过执行
in_progress.reset()
,应用服务会知道这一点)。
一个演示主程序,在单个线程上启动10个业务流程:
int main()
{
io_service app;
database::service db;
ptr_vector<domain::business_process> bps;
for (int i = 0; i < 10; ++i)
{
bps.push_back(new domain::business_process(app, db));
}
app.run();
}
在我的示例中,业务流程不会执行任何 CPU 密集型工作,因此没有必要在 CPU 之间安排它们的调度,但如果您想要这样做,只需将
app.run()
行替换为以下内容即可轻松实现:
thread_group g;
for (unsigned i = 0; i < thread::hardware_concurrency(); ++i)
g.create_thread(boost::bind(&io_service::run, &app));
g.join_all();
查看演示,运行在Coliru上实时运行