Vert.x 处理器是如何工作的?

3
在过去的一周中,我阅读了有关Vertx的文档。但是我不理解Vertx处理程序的工作方式。例如:
public class Client extends AbstractVerticle{

    @Override
    public void start() throws Exception {
       final HttpClient httpClient = this.vertx.createHttpClient();
       this.vertx.setPeriodic(1000, handler->{
           httpClient.getNow(8080, "localhost", "/", responseHandler -> {
                System.out.println("response");
            });
       });
    }

}

服务器是:

public class JdbcVertx extends AbstractVerticle{

    @Override
    public void start() throws Exception {
        JDBCClient client = JDBCClient.createNonShared(this.vertx, new JsonObject()
                                .put("url", "jdbc:postgresql://localhost:5432/test")
                                .put("user", "user")
                                .put("password", "password")
                                .put("driver_class", "org.postgresql.Driver")
                                .put("max_pool_size", 30));
        this.vertx.createHttpServer()
                .requestHandler(r -> {
                    client.getConnection(handler -> {                     
                        final SQLConnection connection = handler.result();
                        connection.execute(execute(), hndlr -> {
                                connection.close(closehndlr -> {                                 
                                        r.response().putHeader("content-type", "text/html").end("Response");
                                });                                   
                        });
                    });
                }).listen(8080);
    }

    private String execute(){
            return "insert into rubish (name) values ('test')";      
    }
}

(P.S 我知道我应该先检查处理程序是否成功,然后再执行某些操作,但我删除了此检查以简化代码,并且从官方文档中得知如果在30秒内没有任何响应,则处理程序会出现异常)

从上面的代码可以看出,客户端每秒发送一次请求并不等待响应,但它有一个处理程序,当响应到达时将被执行。

'JdbcVertx' 监听8080端口,接收请求,进行插入到数据库中,例如睡眠3秒钟(我将1_000_000行放入数据库并创建索引以减慢插入时间),然后发送响应,因此每个请求都是非阻塞的。

据我所知,vertx只有一个名为EventLoop的线程,jdbcVertx从中获取请求但不立即返回响应,而是放置一个处理程序,该处理程序在数据库插入成功时将被执行。事件循环如何知道IO操作何时完成?我认为它使用类似于以下内容的东西

if(Thread.currentThread().getState != 'blocked' && sec != 30){
    this.object.getHandler().execute();
} else if(sec == 30){
 Thread.currentThread.inerrupt();
  } else{
    sec++;
  }

但我们只有一个线程,当我们有阻塞调用时,它没有线程,只有处理程序。

问题是,事件循环如何知道阻塞操作何时结束,以及执行处理程序的时间。


1
老实说,我没能理解你的问题是什么... - injecteer
1
问题是,事件循环如何知道阻塞操作何时结束并且该执行处理程序了。 - Almas Abdrazak
2个回答

4
但是我们只有一个线程,当我们有阻塞调用时,它没有线程,只有处理程序。它是如何工作的,为什么我们需要使用Worker Verticle,如果我们可以使用处理程序呢?
处理程序只是在收到事件总线消息或http调用时触发的操作。它们不是为了帮助你处理可扩展性。如果您仅使用处理程序,并且如果您的操作开始花费很长时间或者如果您的请求数量增加,那么您将会阻塞您的Verticle的事件循环,并且会有很多“Thread xxxx has been blocked”警告。
要回答处理程序如何工作以及为什么事件循环不等待处理程序结束就开始另一个处理程序的问题,请参考以下内容:https://vertx.io/docs/vertx-core/java/#_reactor_and_multi_reactor “每个Vertx实例维护多个事件循环,而不是单一的事件循环。默认情况下,我们根据机器上可用的核数选择数量,但是这可以被覆盖。”
这意味着单个Vertx进程可以跨越服务器进行扩展,而不像Node.js。
我们将此模式称为“多反应堆模式”,以区别于单线程反应堆模式。
但是,这还不足以为您处理所有可扩展性和线程阻塞问题,您应该阅读以下内容:https://vertx.io/docs/vertx-core/java/#golden_rule 有许多设计Verticles的方法,但您必须尽可能保持非阻塞。在我看来,使用传统的阻塞方法(例如阻塞restful端点)与vert.x无关。
个人而言,我会按照以下方式设计我的Verticles:
1. Verticle A:公开一个restful端点,并接受回调URL(无论是GET / POST / PUT / PATCH / DELETE操作)。Verticle立即响应202 Accepted而不返回结果,并向Verticle B发送事件总线消息。
2. Verticle B:获取消息,执行操作(可以异步地使用事件总线调用其他Verticles并等待回复),然后通过调用回调URL进行响应。
我会避免使用Worker Verticle或executeBlocking方法,甚至创建线程池。我会优先考虑将我的Verticle B实例(在单独的pids中)增加到相同的事件总线集群中(并且可能是带有http反向代理的Verticle A)。我们甚至可以根据实时请求的数量想象具有可变数量的Verticle B实例(在单独的pids中)。

附注:有时,我会使用比原生事件总线更强大的消息代理工具,例如Apache Kafka(当我需要遵守某种消息排序规则或重放一些消息时)。


谢谢,但这与我的有关处理程序的问题有什么关系? - Almas Abdrazak
@AlmasAbdrazak 我认为仅使用http处理程序是不足以处理可扩展性和线程阻塞问题的,就是这样。我回答了你关于“为什么我们需要工作 verticle”的问题。我已经更新了我的答案,以便更加清晰明了 ;) - Idriss Neumann
这只是一个例子,我想展示事件循环不会等待每个请求完成,而是使用处理程序,它们是如何工作的,这才是问题,而不是如何使用Vertx设计我的应用程序。 - Almas Abdrazak
此外,使用事件总线的send方法,您可以为回复操作初始化处理程序。 - Almas Abdrazak
@AlmasAbdrazak 我更新了我的回答,更详细地解释了vert.x多反应器模式(你的问题不是很清楚,你似乎在问为什么要做其他事情,比如使用vert.x worker而不仅仅使用http处理程序,我已经回答了这个问题)。总之,我真的认为你不能只依靠处理程序来处理可扩展性和线程阻塞问题,就像你在第一条消息中所说的那样。这就是我的回答的目的!我认为这很相关 ;) - Idriss Neumann

0
回答问题:
事件循环如何知道阻塞操作何时结束并执行处理程序?
根据非阻塞模型,事件循环在调用时
connection.execute( execute(), hndlr )

创建一个新的线程,执行您的阻塞代码,并在其完成后(类似于Thread.join())在事件循环线程中调用hndlr回调。因此,即使可以执行阻塞代码,主循环也不会被阻塞。


谢谢,根据您的回答“生成一个新线程”,这意味着所有阻塞操作都会生成一个新线程,该线程将等待操作完成,然后事件循环线程在每次迭代后检查新生成的线程的状态,我理解得对吗? - Almas Abdrazak
大致是这样的,事件循环并不关心它所生成的线程,它只会接收来自线程的“完成信号”以执行处理程序。 - injecteer
非常感谢,但我在官方文档中没有看到类似的内容。我想知道事件循环可以创建多少个线程,这些线程是否来自池,我在哪里可以找到这样的信息? - Almas Abdrazak
@AlmasAbdrazak,你可以阅读官方文档中的多反应器部分(就像我之前说的那样);) https://vertx.io/docs/vertx-core/java/#_reactor_and_multi_reactor - Idriss Neumann
@injecteer 我认为在介绍如何规避黄金法则之前,先介绍黄金法则会更好(我指的是executeBlocking方法,在良好的非阻塞设计下是无用的)。 - Idriss Neumann

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接