Vert.x事件循环 - 它是如何实现异步的?

18

我正在尝试使用Vert.x,并且对基于事件循环而不是线程/连接模型的服务器还很陌生。

public void start(Future<Void> fut) {
    vertx
        .createHttpServer()
        .requestHandler(r -> {
            LocalDateTime start = LocalDateTime.now();
            System.out.println("Request received - "+start.format(DateTimeFormatter.ISO_DATE_TIME));
            final MyModel model = new MyModel();
            try {

                for(int i=0;i<10000000;i++){
                    //some simple operation
                }

                model.data = start.format(DateTimeFormatter.ISO_DATE_TIME) +" - "+LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME);

            } catch (Exception e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }

          r.response().end(
                  new Gson().toJson(model)
                 );
        })
        .listen(4568, result -> {
          if (result.succeeded()) {
            fut.complete();
          } else {
            fut.fail(result.cause());
          }
        });
    System.out.println("Server started ..");
  }
  • 我只是试图模拟一个长时间运行的请求处理程序来理解这个模型是如何工作的。
  • 我观察到所谓的事件循环在我的第一个请求完成之前被阻塞。无论需要多少时间,直到前一个请求完成后,后续请求才会得到响应。
  • 很明显,我在这里缺少一件事情,这就是我想问的问题。
  1. 接受所有请求难道不被认为是异步吗?如果只有在前一个连接被清除后才能接受新连接,那么它如何是异步的呢?
    • 假设一个典型的请求需要100毫秒到1秒的任意时间(根据请求的种类和性质)。这意味着,即使在一秒钟内结束,事件循环也不能接受新的连接直到上一个请求完成。如果我作为一个程序员不得不考虑所有这些,并将这样的请求处理程序推送到工作线程中,那么它与线程/连接模型有什么不同呢?
    • 我只是试图了解这个模型如何比传统的线程/连接服务器模型更好?假设没有I/O操作或者所有的I/O操作都是异步处理的,那么它如何解决c10k问题,当它不能启动所有并发请求并行处理,必须等待前一个请求终止时才能处理下一个请求?
  2. 即使我决定将所有这些操作推送到一个工作线程中(池化的),那么我不是回到了同样的问题吗?在线程之间进行上下文切换? 根据答案编辑并置顶此问题以获取悬赏
    • 我完全不明白这个模型被称为异步的原理。
    • Vert.x有一个异步JDBC客户端(异步是关键词),我试图将其与RXJava适配。
    • 以下是一个代码示例(相关部分)

    server.requestStream().toObservable().subscribe(req -> {

        LocalDateTime start = LocalDateTime.now();
        System.out.println("Request for " + req.absoluteURI() +" received - " +start.format(DateTimeFormatter.ISO_DATE_TIME));
        jdbc.getConnectionObservable().subscribe(
                conn -> {

                    // Now chain some statements using flatmap composition
                    Observable<ResultSet> resa = conn.queryObservable("SELECT * FROM CALL_OPTION WHERE UNDERLYING='NIFTY'");
                    // Subscribe to the final result
                    resa.subscribe(resultSet -> {

                        req.response().end(resultSet.getRows().toString());
                        System.out.println("Request for " + req.absoluteURI() +" Ended - " +LocalDateTime.now().format(DateTimeFormatter.ISO_DATE_TIME));
                    }, err -> {
                        System.out.println("Database problem");
                        err.printStackTrace();
                    });
                },

                // Could not connect
                err -> {
                    err.printStackTrace();
                }
                );

});
server.listen(4568);
  • 查询需要大约3秒钟才能返回完整的表格转储。
  • 当我同时发送请求(仅尝试了2个),我发现第二个请求完全等待第一个请求完成。
  • 如果JDBC选择是异步的,那么有没有一个合理的期望是在等待选择查询返回任何内容时,框架处理第二个连接?

2
简而言之:对于长时间运行的阻塞任务,策略是切换到后台线程池(经典多线程),该线程池不使用与事件循环相同的线程,以避免阻塞。 - Christophe Roussy
3个回答

34

Vert.x事件循环实际上是许多平台上存在的经典事件循环。当然,大多数说明和文档都可以在Node.js中找到,因为它是基于这种架构模式最流行的框架。看一下一个比较好的 Node.js事件循环机制的解释。Vert.x教程也对“别打电话给我们,我们会打电话给你”和“Verticles”之间有很好的解释。

针对您的更新:

首先,在使用事件循环时,主线程应快速处理所有请求。您不应在此循环中执行任何长时间的任务。当然,您也不应等待对数据库的调用响应。 - 异步地安排一个调用 - 为结果分配回调(处理程序) - 回调将在工作线程而不是事件循环线程中执行。例如,此回调将返回套接字的响应。 因此,您在事件循环中的操作应该只是安排所有具有回调的异步操作,并在不等待任何结果的情况下转到下一个请求。

假设一个典型的请求需要花费100毫秒到1秒钟的时间(基于请求的种类和性质)。在这种情况下,如果您的请求有一些计算密集型部分或需要访问IO,那么事件循环中的代码不应等待这些操作的结果。
我只是想了解这种模型与传统的线程/连接服务器模型相比有何优势?假设没有I/O操作或所有I/O操作都以异步方式处理?
当您有过多并发请求和传统的编程模型时,您将为每个请求创建一个线程。这些线程会做什么?它们大多数时间都在等待IO操作(例如来自数据库的结果)。这是一种浪费资源的方式。在我们的事件循环模型中,您有一个主线程来调度操作,并为长任务预分配了一定数量的工作线程。+ 这些工作线程实际上都不等待响应,它们只能在等待IO结果时执行另一段代码(可以实现为回调或周期性检查当前正在进行的IO作业的状态)。我建议您阅读Java NIO和Java NIO 2,以了解如何在框架内实际实现异步IO。Green threads也是一个非常相关的概念,值得了解。绿色线程和协程是一种类型的阴影事件循环,试图实现相同的目标-减少线程,因为我们可以在绿色线程等待某些内容时重用系统线程。
“当它不能启动所有并发请求并且必须等待前一个终止时,它如何解决c10k问题?”
当然,我们不会在主线程中等待前一个请求的响应。获取请求,安排长时间/IO任务执行,下一个请求。
即使我决定将所有这些操作都推到工作线程(池化),那么我不是又回到了同样的问题吗?在线程之间进行上下文切换?
如果你做得对 - 不会。更重要的是,你会获得良好的数据局部性和执行流程预测。一个CPU核心将执行您的短事件循环并安排异步工作,无需上下文切换或其他任何操作。其他核心调用数据库并返回响应,仅此而已。在回调之间切换或检查不同的IO状态通道实际上不需要任何系统线程的上下文切换 - 它实际上在一个工作线程中工作。因此,我们每个核心有一个工作线程,这个系统线程等待/检查来自多个连接到数据库的结果可用性。请重新访问Java NIO概念,以了解它如何以这种方式工作。(NIO的经典示例是代理服务器,可以接受许多并行连接(数千个),将请求代理到某些其他远程服务器,监听响应并将响应发送回客户端,所有这些都使用一个或两个线程)
关于您的代码,我为您制作了一个样例项目,以展示一切都按预期运行:
public class MyFirstVerticle extends AbstractVerticle {

    @Override
    public void start(Future<Void> fut) {
        JDBCClient client = JDBCClient.createShared(vertx, new JsonObject()
                .put("url", "jdbc:hsqldb:mem:test?shutdown=true")
                .put("driver_class", "org.hsqldb.jdbcDriver")
                .put("max_pool_size", 30));


        client.getConnection(conn -> {
            if (conn.failed()) {throw new RuntimeException(conn.cause());}
            final SQLConnection connection = conn.result();

            // create a table
            connection.execute("create table test(id int primary key, name varchar(255))", create -> {
                if (create.failed()) {throw new RuntimeException(create.cause());}
            });
        });

        vertx
            .createHttpServer()
            .requestHandler(r -> {
                int requestId = new Random().nextInt();
                System.out.println("Request " + requestId + " received");
                    client.getConnection(conn -> {
                         if (conn.failed()) {throw new RuntimeException(conn.cause());}

                         final SQLConnection connection = conn.result();

                         connection.execute("insert into test values ('" + requestId + "', 'World')", insert -> {
                             // query some data with arguments
                             connection
                                 .queryWithParams("select * from test where id = ?", new JsonArray().add(requestId), rs -> {
                                     connection.close(done -> {if (done.failed()) {throw new RuntimeException(done.cause());}});
                                     System.out.println("Result " + requestId + " returned");
                                     r.response().end("Hello");
                                 });
                         });
                     });
            })
            .listen(8080, result -> {
                if (result.succeeded()) {
                    fut.complete();
                } else {
                    fut.fail(result.cause());
                }
            });
    }
}

@RunWith(VertxUnitRunner.class)
public class MyFirstVerticleTest {

  private Vertx vertx;

  @Before
  public void setUp(TestContext context) {
    vertx = Vertx.vertx();
    vertx.deployVerticle(MyFirstVerticle.class.getName(),
        context.asyncAssertSuccess());
  }

  @After
  public void tearDown(TestContext context) {
    vertx.close(context.asyncAssertSuccess());
  }

  @Test
  public void testMyApplication(TestContext context) {
      for (int i = 0; i < 10; i++) {
          final Async async = context.async();
          vertx.createHttpClient().getNow(8080, "localhost", "/",
                            response -> response.handler(body -> {
                                context.assertTrue(body.toString().contains("Hello"));
                                async.complete();
                            })
        );
    }
  }
}

输出:

Request 1412761034 received
Request -1781489277 received
Request 1008255692 received
Request -853002509 received
Request -919489429 received
Request 1902219940 received
Request -2141153291 received
Request 1144684415 received
Request -1409053630 received
Request -546435082 received
Result 1412761034 returned
Result -1781489277 returned
Result 1008255692 returned
Result -853002509 returned
Result -919489429 returned
Result 1902219940 returned
Result -2141153291 returned
Result 1144684415 returned
Result -1409053630 returned
Result -546435082 returned

所以,我们接受一个请求 - 将请求安排到数据库中,转到下一个请求,我们消耗所有请求,并仅在完成数据库的所有操作后为每个请求发送响应。
关于您的代码示例,我看到两个可能的问题 - 首先,您似乎没有关闭连接,这很重要,以便将其返回到池中。其次,您的池是如何配置的?如果只有一个空闲连接 - 这些请求将串行等待此连接。
我建议您添加一些时间戳打印,以查找序列化的位置。您有某些使事件循环调用阻塞的东西。或...检查您在测试中是否并行发送请求。不是在上一个请求获得响应后立即进行下一个请求。

池大小为30,我已经放置了日志以发出时间戳。一个日志用于在收到请求时输出开始时间,另一个日志用于请求完成时输出。我看到每个请求大约需要4秒钟才能完成,并且每个请求都等待前一个请求结束。根据Vert.x文档,他们的JDBC包装器是异步的,不会等待SQL结果返回,但我看到的是同步行为。它会等待结果返回后再返回到事件循环中。 - user378101
@user378101 请查看示例项目。您可以在那里开始测试并查看输出。一切都按预期工作,JDBC客户端实际上是异步的。已将详细信息添加到答案中。 - Dmitry Spikhalskiy
当然我是并行发送请求的。我不是那么蠢的 :- 我会尝试你的示例代码,稍后回复。 - user378101
是的,现在我明白了,JDBC调用确实是异步的。非常感谢。 - user378101
在语句“回调将在工作线程、非事件循环线程中执行”中,当我在回调中打印当前线程名称时,它打印的是“vert.x-eventloop-thread-2”,这不意味着它正在事件循环中执行吗? - spakai

7

这是异步的吗?答案就在你的问题里

我观察到所谓的事件循环在我的第一个请求完成之前被阻塞。无论它需要多少时间,后续请求在前一个请求完成之前都不会被处理。

这个想法是使用同一个线程来服务每个HTTP请求,而不是为每个请求创建一个新的线程,而你已经通过长时间运行的任务阻塞了该线程。

事件循环的目标是节省从一个线程切换到另一个线程所需的时间,并利用当任务正在使用IO/网络活动时的理想CPU时间。如果在处理您的请求时,需要进行其他IO/网络操作(例如:从远程MongoDB实例获取数据),则在此期间,您的线程将不会被阻塞,而是由相同的线程服务另一个请求,这是事件循环模型的理想用例(假设您的服务器有并发请求)。

如果您有长时间运行的任务,其中不涉及网络/IO操作,则应考虑使用线程池;如果阻塞主事件循环线程本身,则其他请求将被延迟。即对于长时间运行的任务,您可以承担上下文切换的代价,以使服务器响应灵敏。

编辑:服务器处理请求的方式可能会有所不同:

1) 对于每个传入的请求,产生一个新的线程(在这种模式下,上下文切换会很高,并且每次产生一个新的线程会有额外的成本)

2) 使用线程池来处理请求(同样的一组线程将用于服务请求,并且额外的请求将排队等待)

3) 使用事件循环(单个线程处理所有请求。几乎没有上下文切换。因为会有一些线程运行,例如:排队传入的请求)

首先,上下文切换并不是坏事,它是必要的,以保持应用程序服务器的响应性,但是,如果并发请求的数量过多(大约超过10k),太多的上下文切换可能会成为问题。如果您想了解更多详细信息,我建议您阅读C10K文章

假设典型请求需要100毫秒到1秒钟不等(根据请求的类型和性质)。因此,这意味着,在前一个请求完成之前,事件循环无法接受新连接(即使它在一秒钟内结束)。

如果您需要响应大量并发请求(超过10k),我会认为超过500毫秒是一个较长的运行操作。其次,正如我所说,涉及一些线程/上下文切换,例如:排队等待传入请求,但是线程之间的上下文切换将大大减少,因为每次只有很少的线程。第三,如果解决第一个请求涉及网络/IO操作,则在解决第一个请求之前,第二个请求将有机会得到解决,这就是这个模型的优势所在。
“如果作为程序员我必须考虑所有这些内容,并将这样的请求处理程序推送到工作线程中,那么它与线程/连接模型有何不同?”
Vertx试图为您提供最佳的线程和事件循环,因此,作为程序员,您可以根据两种情况下的应用程序效率进行调用,即具有网络/IO操作的长时间运行操作和没有网络/IO操作的长时间运行操作。
“我只是想了解这种模型如何比传统的线程/连接服务器模型更好?假设没有I/O操作或者所有I/O操作都是异步处理的?当它不能同时启动所有并发请求并且必须等待之前的请求终止时,它如何解决c10k问题?”
上面的解释应该可以回答这个问题。
即使我决定将所有这些操作推送到工作线程(池化),那么我不是又回到了同样的问题吗?在线程之间进行上下文切换?
就像我说的,两者都有优缺点,vertx为您提供了这两种模型,根据您的用例,您必须选择对于您的情况最理想的。

我已基于反馈修改了问题。我知道长时间运行的操作必须推送到工作线程。那么,如何定义长时间运行的操作呢?如果典型请求需要一秒钟才能完成,对于我的应用程序来说它不是长时间运行的,但同时事件循环也无法处理同时到达的10个并发请求。这是一个更好的模型吗? - user378101

3
在这种处理引擎中,您需要将长时间运行的任务转换为异步执行的操作,并有一种方法来实现此目的,以便关键线程尽可能快地完成并返回执行另一个任务。即任何IO操作都会传递给框架,在IO完成时回调您。
该框架是异步的,因为它支持您生成和运行这些异步任务,但它不会改变您的代码从同步变为异步。

良好的异步I/O意味着回调函数是异步执行的。但是,如果只有一个线程来执行您的任何代码,那就不同于许多线程。我不确定在使用标签方面的技术差异是什么。一个是通过多个线程实现异步,另一个是通过延迟执行实现异步。 - Alexander Mills

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接