Node.js EventEmitter 事件无法共享事件循环

7
也许根本问题在于我正在使用的node-kafka模块是如何实现的,但也可能不是,所以我们开始吧...
我在使用node-kafa库时,遇到了订阅consumer.on('message')事件的问题。该库使用标准的events模块,因此我认为这个问题可能是通用的。
我的实际代码结构很大而复杂,因此这里是一个基本布局的伪代码示例,以突出我的问题。(注意:这段代码片段未经测试,因此可能会有错误,但这里不涉及语法问题)
var messageCount = 0;
var queryCount = 0;

// Getting messages via some event Emitter
consumer.on('message', function(message) {
    message++;
    console.log('Message #' + message);

    // Making a database call for each message
    mysql.query('SELECT "test" AS testQuery', function(err, rows, fields) {
        queryCount++;
        console.log('Query   #' + queryCount);
    });
})

我看到的是当我启动服务器时,Kafka会通过事件发射器给我大约10万个积压消息,然后我开始接收这些消息。如果要获取并记录所有消息,需要大约15秒钟。假设MySQL查询相对较快,则以下是我期望看到的输出结果:
Message #1
Message #2
Message #3
...
Message #500
Query   #1
Message #501
Message #502
Query   #2
... and so on in some intermingled fashion

我会期望如此,因为我的第一个MySQL结果应该非常快就准备好了,我期望结果(s)按照事件循环的顺序进行处理以获得响应。但实际上我得到的是:

Message #1
Message #2
...
Message #100000
Query   #1
Query   #2
...
Query   #100000

我在 mysql 响应处理之前收到了每条消息。因此,我的问题是,为什么?为什么在完成所有消息事件之前我不能得到任何一个数据库结果?
另外,需要注意的是:我在 node-kafka 中设置了一个断点 .emit('message'),在我的代码中设置了一个断点 mysql.query(),然后按顺序命中它们。因此,似乎所有的 100,000 个发射都没有在进入我的事件订阅者之前堆积起来。所以这就是我对问题的第一个假设。
非常感谢您提供的想法和知识 :)

如果这个例子确实代表了你的代码,那么我也迷失了。你能用这个例子真正产生这个结果吗?我没有可用于测试的MySQL实例。 - Avery
2
你是否已经使用足够大的fetchMaxBytes值配置了node-kafka,以便所有这100K条消息在一个请求中传输? EventEmitter是同步的,它不使用Node事件循环,因此如果100K个消息一次到达,它们可能会在异步代码有机会运行之前全部被触发。 - robertklep
@Avery 我刚把上面的例子放到我的实际代码中并运行了一下。我得到了相同的结果。大约20秒后,消息日志完成,立即开始查询日志。 - Eric Olson
1
@robertklep 谢谢!在kafka-node的示例中,他们展示了带有 fetchMaxBytes: 1024*10 的默认覆盖示例。在其他默认覆盖中,他们的值等于默认值,甚至还注明了这一点,因此我认为这也适用于此属性。你的问题激发了我去研究他们的代码,并发现它的默认值实际上是 fetchMaxBytes: 1024*1024。所以是的,我实际上是在一次请求中取回了所有消息。而且我不知道EventEmitter是同步的 :) - Eric Olson
@robertklep,我将覆盖更改为文档中的1024 * 10,并获得了我一直期望的结果 :) - Eric Olson
显示剩余6条评论
1个回答

4
node-kafka驱动程序使用相当自由的缓冲区大小(1M),这意味着它将获得尽可能多的来自Kafka的消息,以适合缓冲区。如果服务器积压,并且根据消息大小,这可能意味着有成千上万条消息通过一个请求进入。

因为EventEmitter是同步的(它不使用Node事件循环),这意味着驱动程序将向其侦听器发出(成千上万的)事件,并且由于它是同步的,直到所有消息被传递之前它不会让出Node事件循环。

我认为你无法解决事件洪流的问题,但我不认为特别是事件传递是有问题的。更有可能的问题是为每个事件启动异步操作(在这种情况下是MySQL查询),这可能会使数据库充斥着查询。

一个可能的解决方法是使用队列,而不是直接从事件处理程序执行查询。例如,使用async.queue,您可以限制并发(异步)任务的数量。队列的“worker”部分将执行MySQL查询,而在事件处理程序中,您只需将消息推送到队列中即可。


感谢@robertklep。我将尝试使用async.queue。我正在处理自己的队列,以便只有一个mysql查询并将结果进行内存缓存,以供等待的请求使用,但我怀疑一个经过良好维护/测试的模块会更好 :) - Eric Olson

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接