测试 Storm 的 Bolt 和 Spout

48

这是一个关于Java编写的Storm拓扑中如何对Bolts和Spouts进行单元测试的一般性问题。

在单元测试(JUnit?)中,对于测试 BoltsSpouts,有哪些推荐的实践和指南?

例如,我可以为一个Bolt编写一个JUnit测试,但如果没有完全理解框架(比如Bolt的生命周期)和序列化的影响,就会很容易犯错,比如基于构造函数创建非可序列化成员变量。在JUnit中,这个测试会通过,但在拓扑中它将无法正常工作。我完全可以想象,需要考虑许多测试点(例如,这个示例中的序列化和生命周期)。

因此,是否建议在使用基于JUnit的单元测试时,运行一个小的模拟拓扑(LocalMode?),并测试该Topoology下的Bolt(或Spout)的暗示合同?或者,是否可以使用JUnit,但前提是我们必须仔细模拟Bolt的生命周期(创建它、调用prepare()、mock一个Config等)?在这种情况下,对于要测试的类(Bolt/Spout),有哪些通用的测试点需要考虑?

其他开发者在创建适当的单元测试方面做了什么?

我注意到有一个Topology测试API(请参见:https://github.com/xumingming/storm-lib/blob/master/src/jvm/storm/TestingApiDemo.java)。使用这个API,针对每个独立的BoltSpout建立“测试拓扑”,并验证Bolt必须提供的隐式合同(例如,它的Declared outputs)是否更好?

谢谢!


你最终决定采用哪种方法了吗? - Chris Gerken
好的,我已经阅读了下面的答案。似乎有一些通用的指导方针,但没有什么是铁板钉钉的。我会再等一会儿看看是否还有其他人有想法,然后关闭它。我喜欢使用测试API(TestingApiDemo.java)以及您对模拟依赖项的回答,@ChrisGerken。 - Jack
4个回答

14

6
确实是一个不错的API,不过再多一点文档资料会大大有助于理解它。 - Barry NL
您IP地址为143.198.54.68,由于运营成本限制,当前对于免费用户的使用频率限制为每个IP每72小时10次对话,如需解除限制,请点击左下角设置图标按钮(手机用户先点击左上角菜单按钮)。 - asmaier
那不是一个单元测试。那是一个集成测试,因为它会启动一个内存集群来执行Bolt。单元测试是测试最小可能的“单元”的测试,这将是Bolt的各个方法。 - user167019

11
我们的方法是将可序列化工厂通过构造函数注入到 Spout/Bolt 中。然后,在其 open/prepare 方法中,Spout/Bolt 会咨询该工厂。该工厂的唯一职责是以可序列化的方式封装获取 Spout/Bolt 依赖项的过程。这种设计允许我们的单元测试注入伪造/测试/模拟工厂,并在咨询时返回模拟服务。这样,我们就可以使用 Mockito 等模拟库来进行狭义的单位测试。
下面是一个泛型 Bolt 示例及其测试。我省略了工厂 UserNotificationFactory 的实现,因为它取决于您的应用程序。您可能会使用服务定位器来获取服务、序列化配置、HDFS 可访问配置,或者任何其他适合您的方式来获取正确的服务,只要工厂在序列化/反序列化周期内都能做到即可。您应该涵盖该类的序列化。
public class NotifyUserBolt extends BaseBasicBolt {
  public static final String NAME = "NotifyUser";
  private static final String USER_ID_FIELD_NAME = "userId";

  private final UserNotifierFactory factory;
  transient private UserNotifier notifier;

  public NotifyUserBolt(UserNotifierFactory factory) {
    checkNotNull(factory);

    this.factory = factory;
  }

  @Override
  public void prepare(Map stormConf, TopologyContext context) {
    notifier = factory.createUserNotifier();
  }

  @Override
  public void execute(Tuple input, BasicOutputCollector collector) {
    // This check ensures that the time-dependency imposed by Storm has been observed
    checkState(notifier != null, "Unable to execute because user notifier is unavailable.  Was this bolt successfully prepared?");

    long userId = input.getLongByField(PreviousBolt.USER_ID_FIELD_NAME);

    notifier.notifyUser(userId);

    collector.emit(new Values(userId));
  }

  @Override
  public void declareOutputFields(OutputFieldsDeclarer declarer) {
    declarer.declare(new Fields(USER_ID_FIELD_NAME));
  }
}

测试

public class NotifyUserBoltTest {

  private NotifyUserBolt bolt;

  @Mock
  private TopologyContext topologyContext;

  @Mock
  private UserNotifier notifier;

  // This test implementation allows us to get the mock to the unit-under-test.
  private class TestFactory implements UserNotifierFactory {

    private final UserNotifier notifier;

    private TestFactory(UserNotifier notifier) {
      this.notifier = notifier;
    }

    @Override
    public UserNotifier createUserNotifier() {
      return notifier;
    }
  }

  @Before
  public void before() {
    MockitoAnnotations.initMocks(this);

    // The factory will return our mock `notifier`
    bolt = new NotifyUserBolt(new TestFactory(notifier));
    // Now the bolt is holding on to our mock and is under our control!
    bolt.prepare(new Config(), topologyContext);
  }

  @Test
  public void testExecute() {
    long userId = 24;
    Tuple tuple = mock(Tuple.class);
    when(tuple.getLongByField(PreviousBolt.USER_ID_FIELD_NAME)).thenReturn(userId);
    BasicOutputCollector collector = mock(BasicOutputCollector.class);

    bolt.execute(tuple, collector);

    // Here we just verify a call on `notifier`, but we could have stubbed out behavior befor
    //  the call to execute, too.
    verify(notifier).notifyUser(userId);
    verify(collector).emit(new Values(userId));
  }
}

我知道这是一个旧帖子,如果你已经离开这个项目,请原谅 :) 我想知道 verify(notifier).notifyUser(userId) 测试是否通过。我发现Storm在工厂上执行的序列化和反序列化会导致新的模拟通知者被实例化。因此,原始的模拟通知者没有接收到任何交互。这在你那里也是这种情况吗? - ilana917
@ilana917 Storm和mocks不应该互相交互。这种模式的目的是以一种让你能够在Storm运行时之外单独测试代码的方式编写代码。在测试中,不应该发生任何序列化。在new NotifyUserBoltbolt.prepare之间的测试中没有序列化。在Storm运行时,Storm会对Bolt进行序列化。 - Carl G
感谢 @carl-g,那正是我要走的方向。我现在正在进行大规模重构,最初计划是使用外部连接的模拟来运行 storm 端到端作为一种集成测试,但对我们的逻辑进行单元测试更有意义。 - ilana917

11

我们采用的一种方法是将大部分应用程序逻辑从bolt和spout移出,并移到对象中,通过最小化的接口实例化和使用它们来完成重活。之后,我们对这些对象进行单元测试和集成测试,尽管这样做留下了一个空缺。


2
虽然你所说的是正确的,也是一个好主意,但它并没有涉及到 OP 在生产部署之前寻找诸如序列化问题之类的事情的兴趣。 - Steven Magana-Zook
谢谢您的回复!我很好奇您所说的“leave a gap”是什么意思? - taylorcressy
间隙在Storm和我们的对象之间的接口处。那部分没有经过彻底测试,因为它只出现在集成测试中,这是昂贵的,所以有一些连接代码没有很好地覆盖。 - Gordon Seidoh Worley

1
事实证明,模拟类似OutputDeclarer、Tuple和OutputFieldsDeclarer的storm对象相当容易。其中,只有OutputDeclarer会看到任何副作用,因此需要编写OutputDeclarer模拟类以能够回答任何已发出的元组和锚点等。例如,您的测试类可以使用这些模拟类的实例轻松配置一个bolt/spout实例,调用它并验证预期的副作用。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接