也许根本问题在于我正在使用的node-kafka模块是如何实现的,但也可能不是,所以我们开始吧...
我在使用node-kafa库时,遇到了订阅
我的实际代码结构很大而复杂,因此这里是一个基本布局的伪代码示例,以突出我的问题。(注意:这段代码片段未经测试,因此可能会有错误,但这里不涉及语法问题)
我看到的是当我启动服务器时,Kafka会通过事件发射器给我大约10万个积压消息,然后我开始接收这些消息。如果要获取并记录所有消息,需要大约15秒钟。假设MySQL查询相对较快,则以下是我期望看到的输出结果:
我在 mysql 响应处理之前收到了每条消息。因此,我的问题是,为什么?为什么在完成所有消息事件之前我不能得到任何一个数据库结果?
另外,需要注意的是:我在 node-kafka 中设置了一个断点
非常感谢您提供的想法和知识 :)
我在使用node-kafa库时,遇到了订阅
consumer.on('message')
事件的问题。该库使用标准的events
模块,因此我认为这个问题可能是通用的。我的实际代码结构很大而复杂,因此这里是一个基本布局的伪代码示例,以突出我的问题。(注意:这段代码片段未经测试,因此可能会有错误,但这里不涉及语法问题)
var messageCount = 0;
var queryCount = 0;
// Getting messages via some event Emitter
consumer.on('message', function(message) {
message++;
console.log('Message #' + message);
// Making a database call for each message
mysql.query('SELECT "test" AS testQuery', function(err, rows, fields) {
queryCount++;
console.log('Query #' + queryCount);
});
})
我看到的是当我启动服务器时,Kafka会通过事件发射器给我大约10万个积压消息,然后我开始接收这些消息。如果要获取并记录所有消息,需要大约15秒钟。假设MySQL查询相对较快,则以下是我期望看到的输出结果:
Message #1
Message #2
Message #3
...
Message #500
Query #1
Message #501
Message #502
Query #2
... and so on in some intermingled fashion
我会期望如此,因为我的第一个MySQL结果应该非常快就准备好了,我期望结果(s)按照事件循环的顺序进行处理以获得响应。但实际上我得到的是:
Message #1
Message #2
...
Message #100000
Query #1
Query #2
...
Query #100000
我在 mysql 响应处理之前收到了每条消息。因此,我的问题是,为什么?为什么在完成所有消息事件之前我不能得到任何一个数据库结果?
另外,需要注意的是:我在 node-kafka 中设置了一个断点
.emit('message')
,在我的代码中设置了一个断点 mysql.query()
,然后按顺序命中它们。因此,似乎所有的 100,000 个发射都没有在进入我的事件订阅者之前堆积起来。所以这就是我对问题的第一个假设。非常感谢您提供的想法和知识 :)
fetchMaxBytes
值配置了node-kafka
,以便所有这100K条消息在一个请求中传输? EventEmitter是同步的,它不使用Node事件循环,因此如果100K个消息一次到达,它们可能会在异步代码有机会运行之前全部被触发。 - robertklepfetchMaxBytes: 1024*10
的默认覆盖示例。在其他默认覆盖中,他们的值等于默认值,甚至还注明了这一点,因此我认为这也适用于此属性。你的问题激发了我去研究他们的代码,并发现它的默认值实际上是fetchMaxBytes: 1024*1024
。所以是的,我实际上是在一次请求中取回了所有消息。而且我不知道EventEmitter是同步的 :) - Eric Olson