如何在Scala类中模拟使用的KafkaProducer

5

我希望为一个Scala类编写单元测试。这个类的目的是收集指标并将它们发布到Kafka主题上。我正在尝试在单元测试中模拟生产者,以确保代码的健全性。以下是该类的简化版本:

class MyEmitter(sparkConf: SparkConf) {
    <snip> -- member variables
    private val kafkaProducer = createProducer()

def createProducer(): Producer[String, MyMetricClass] = {
    val props = new Properties()
    ...
    Code to initialize properties
    ...

    new KafkaProducer[String, MyMetricClass](props)
}

def initEmitter(metricName: String): SomeClass = {
    // Some implementation
}

def collect(key: String, value: String): Unit = {
    // Some implementation
}

def emit(): Unit = {
    val record = new ProducerRecord("<topic name>", "<key>", "<value>")
    kafkaProducer.send(record)
}

我在单元测试中想要模拟生产者并检查是否调用了send()方法,若有,则检查生产者记录是否符合预期。但我自己尝试解决问题未成功,Google搜索也没有结果。如果有人知道如何解决这个问题,我将不胜感激。

https://kafka.apache.org/10/javadoc/org/apache/kafka/clients/producer/MockProducer.html - Shankar Shastri
1个回答

4
'new'通常是测试的敌人,因此您应该提取该对象的创建,以便可以传递真正的KafkaProducer或模拟对象。一种不改变接口的方法可能是:
def createProducer(
        producer: Properties => KafkaProducer = props => new KafkaProducer[String, MyMetricClass](props)
        ): Producer[String, MyMetricClass] = {

   val props = new Properties()
   producer(props)
}

那么在实际代码中,您需要不断调用:
myEmmiter.createProducer()

但在测试中,您需要做的是

val producerMock = mock[KafkaProducer]    
myEmmiter.createProducer(_ => producerMock)

另一个好处是,您还可以存根函数本身,以便验证方法创建的 props 是否符合预期。
希望这有所帮助。

感谢您提供的解决方案。它让我能够看到已传递给模拟生产者的内容。话虽如此,这是因为已经存在一个模拟Kafka生产者。如果代码中有不同的服务/类,则意味着我需要创建所述类的模拟版本,对吗? - nads
是的,每当您想要验证与第三方的交互时,您需要以一种允许用模拟替换该第三方的方式构建您的代码。 - ultrasecr.eth

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接