Kafka 流测试:java.util.NoSuchElementException: 未初始化的主题:"output_topic_name"

10

我已根据https://kafka.apache.org/24/documentation/streams/developer-guide/testing.html编写了一个kafka流应用程序的测试类,代码如下:

import com.EventSerde;
import org.apache.kafka.common.serialization.Serde;
import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.*;

import org.junit.After;
import org.junit.Before;
import org.junit.Test;

import java.util.Properties;

public class KafkaStreamsConfigTest {
    
private TopologyTestDriver testDriver;
private TestInputTopic<String, Object> inputTopic;
private TestOutputTopic<String, Object> outputTopic;

private Serde<String> stringSerde = new Serdes.StringSerde();
private EventSerde eventSerde= new EventSerde();

private String key="test";
private Object value = "some value";
private Object expected_value = "real value";

String kafkaEventSourceTopic = "raw_events";
String kafkaEventSinkTopic = "processed_events";
String kafkaCacheSinkTopic = "cache_objects";

String applicationId = "my-app";
String test_dummy = "dummy:1234";

@Before
public void setup() {
    Topology topology = new Topology();
      
    topology.addSource(kafkaEventSourceTopic, kafkaEventSourceTopic);

    topology.addProcessor(ProcessRouter.class.getSimpleName(), ProcessRouter::new, kafkaEventSourceTopic);

    topology.addProcessor(WorkforceVisit.class.getSimpleName(), WorkforceVisit::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(DefaultProcessor.class.getSimpleName(), DefaultProcessor::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(CacheWorkforceShift.class.getSimpleName(), CacheWorkforceShift::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(DigitalcareShiftassisstantTracking.class.getSimpleName(), DigitalcareShiftassisstantTracking::new
            , ProcessRouter.class.getSimpleName());

    topology.addProcessor(WorkforceLocationUpdate.class.getSimpleName(), WorkforceLocationUpdate::new
            , ProcessRouter.class.getSimpleName());

    topology.addSink(kafkaEventSinkTopic, kafkaEventSinkTopic
            , WorkforceVisit.class.getSimpleName(), DefaultProcessor.class.getSimpleName()
            , CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
            , WorkforceLocationUpdate.class.getSimpleName());

    topology.addSink(kafkaCacheSinkTopic, kafkaCacheSinkTopic
            , WorkforceVisit.class.getSimpleName()
            , CacheWorkforceShift.class.getSimpleName(), DigitalcareShiftassisstantTracking.class.getSimpleName()
            , WorkforceLocationUpdate.class.getSimpleName());

    Properties properties = new Properties();
    properties.put(StreamsConfig.APPLICATION_ID_CONFIG, applicationId);       
    properties.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, test_dummy);
    properties.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass().getName());
    properties.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, EventSerde.class.getName());
        
    testDriver = new TopologyTestDriver(topology, properties);

    //setup test topics
    inputTopic = testDriver.createInputTopic(kafkaEventSourceTopic, stringSerde.serializer(), eventSerde.serializer());
    outputTopic = testDriver.createOutputTopic(kafkaEventSinkTopic, stringSerde.deserializer(), eventSerde.deserializer());

}

@After
public void tearDown() {
    testDriver.close();
}

@Test
public void outputEqualsTrue()
{
    inputTopic.pipeInput(key, value);
    Object b =  outputTopic.readValue();
    System.out.println(b.toString());
    assertEquals(b,expected_value);       
}

我使用了 EventSerde 类对值进行序列化和反序列化。

当我运行这段代码时,它会给出以下堆栈跟踪:java.util.NoSuchElementException: Uninitialized topic: processed_events

java.util.NoSuchElementException: Uninitialized topic: processed_events

at org.apache.kafka.streams.TopologyTestDriver.readRecord(TopologyTestDriver.java:715)
at org.apache.kafka.streams.TestOutputTopic.readRecord(TestOutputTopic.java:100)
at org.apache.kafka.streams.TestOutputTopic.readValue(TestOutputTopic.java:80)
at com.uhx.platform.eventprocessor.config.KafkaStreamsConfigTest.outputEqualsTrue(KafkaStreamsConfigTest.java:111)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.base/java.lang.reflect.Method.invoke(Method.java:566)
at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:59)
at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:12)
at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:56)
at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:17)
at org.junit.internal.runners.statements.RunBefores.evaluate(RunBefores.java:26)
at org.junit.internal.runners.statements.RunAfters.evaluate(RunAfters.java:27)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.BlockJUnit4ClassRunner$1.evaluate(BlockJUnit4ClassRunner.java:100)
at org.junit.runners.ParentRunner.runLeaf(ParentRunner.java:366)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:103)
at org.junit.runners.BlockJUnit4ClassRunner.runChild(BlockJUnit4ClassRunner.java:63)
at org.junit.runners.ParentRunner$4.run(ParentRunner.java:331)
at org.junit.runners.ParentRunner$1.schedule(ParentRunner.java:79)
at org.junit.runners.ParentRunner.runChildren(ParentRunner.java:329)
at org.junit.runners.ParentRunner.access$100(ParentRunner.java:66)
at org.junit.runners.ParentRunner$2.evaluate(ParentRunner.java:293)
at org.junit.runners.ParentRunner$3.evaluate(ParentRunner.java:306)
at org.junit.runners.ParentRunner.run(ParentRunner.java:413)
at org.junit.runner.JUnitCore.run(JUnitCore.java:137)
at com.intellij.junit4.JUnit4IdeaTestRunner.startRunnerWithArgs(JUnit4IdeaTestRunner.java:68)
at com.intellij.rt.junit.IdeaTestRunner$Repeater.startRunnerWithArgs(IdeaTestRunner.java:33)
at com.intellij.rt.junit.JUnitStarter.prepareStreamsAndStart(JUnitStarter.java:230)
at com.intellij.rt.junit.JUnitStarter.main(JUnitStarter.java:58)

您可以看到,我已初始化了输入和输出主题。 我还调试了代码,错误发生在从输出主题读取值时。

outputTopic.readValue();

我不明白我还需要做什么来初始化输出主题。有人能帮我解决这个问题吗?
我正在使用apache kafka-streams-test-utils 2.4.0和kafka-streams 2.4.0。
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams</artifactId>
        <version>2.4.0</version>
    </dependency>
    <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-clients</artifactId>
        <version>2.4.0</version>
    </dependency>

    <dependency>
        <groupId>org.apache.kafka</groupId>
        <artifactId>kafka-streams-test-utils</artifactId>
        <version>2.4.0</version>
        <scope>test</scope>
    </dependency>

6
错误提示信息有些误导人,意思是没有向输出主题写入任何输出记录。因此,我认为问题出在您的拓扑结构上。 - Matthias J. Sax
1
马蒂亚斯是正确的 - 这个异常完全是误导性的。 - forevergenin
问题是什么?我的意思是,问题是如何解决的? - Pim van der Heijden
1个回答

12
异常是误导性的。它只意味着没有将任何输出记录写入输出主题。因此,问题出在定义的拓扑结构上。
为了避免这种误导性的异常,您可以在尝试从输出主题读取之前检查它是否为空。这样你就能得到一个更清晰的图片:
@Test
public void outputEqualsTrue()
{
    inputTopic.pipeInput(key, value);
    assert(outputTopic.isEmpty(), false);
    Object b = outputTopic.readValue();
    System.out.println(b.toString());
    assertEquals(b,expected_value);
}

当您尝试从outputTopic读取值,即outputTopic.readValue()时,也会发生异常。有什么解决方法吗?! - sam.ban
在读取之前,您需要通过调用outputTopic.isEmpty()来检查输出主题中是否有数据可用。或者,您可以使用readAsList*方法从outputTopic中读取值。 - forevergenin
1
谢谢。它起作用了。实际上,问题只存在于拓扑内部。 - sam.ban
@sam.ban 是什么东西? - Pim van der Heijden

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接