如何使用Apache Storm元组

10
我刚开始接触Apache Storm。我阅读了教程并查看了examples。我的问题是所有的例子都使用非常简单的元组(通常是一个带有字符串的字段)。这些元组是内联创建的(使用new Values(...))。在我的情况下,我有许多字段(5..100)的元组。所以我的问题是如何为每个字段实现具有名称和类型(所有基本类型)的元组?
是否有任何示例?(我认为直接实现“Tuple”不是一个好主意)
谢谢
2个回答

11

将所有字段作为值创建元组的替代方法是只需创建一个 bean 并将其传递到元组中。

给定以下类:

public class DataBean implements Serializable {
    private static final long serialVersionUID = 1L;

    // add more properties as necessary
    int id;
    String word;

    public DataBean(int id, String word) {
        setId(id);
        setWord(word);
    }
    public int getId() {
        return id;
    }
    public void setId(int id) {
        this.id = id;
    }
    public String getWord() {
        return word;
    }
    public void setWord(String word) {
        this.word = word;
    }
}

在一个bolt中创建并发射DataBean:

collector.emit(new Values(bean));

在目标 bolt 中获取 DataBean:

@Override
public void execute(Tuple tuple, BasicOutputCollector collector) {
    try {
        DataBean bean = (DataBean)tuple.getValue(0);
        // do your bolt processing with the bean
    } catch (Exception e) {
        LOG.error("WordCountBolt error", e);
        collector.reportError(e);
    }       
}

不要忘记在设置拓扑时使你的bean可序列化并进行注册:

Config stormConfig = new Config();
stormConfig.registerSerialization(DataBean.class);
// more stuff
StormSubmitter.submitTopology("MyTopologyName", stormConfig, builder.createTopology());

免责声明:对于shuffle分组,豆子将正常工作。如果您需要进行fieldsGrouping,仍应使用原始值。例如,在单词计数场景中,您需要按单词分组,因此可以发出以下内容:

collector.emit(new Values(word, bean));

这似乎是一个很好的替代方案,但我想知道何时或如何使用元组(它是Storm的关键概念之一)?! - dermoritz
你仍在使用元组..只需传递一个对象而不是原始类型。如果每次要传递相同的100个字段,则我会像上面展示的那样使用bean。如果每次发送的字段变化很大,那么它可能不太有用。 - Kit Menke
我完全明白,但我很好奇如何使用“真正”的元组(具有多个字段的元组)。但似乎没有人在使用它... - dermoritz
1
似乎有一些误解。您从未直接创建元组对象。值对象在内部转换为元组类型。因此,如果输出值具有多个字段,则元组也具有多个字段。此外,Storm不支持类型检查。最后但并非最不重要的是,bean思想的缺点是您无法使用字段分组连接模式访问各个属性。 - Matthias J. Sax
@MatthiasJ.Sax 关于fieldsGrouping的观点非常好,我在底部添加了免责声明。 - Kit Menke
你的解决方案存在数据重复的缺点(而且在我看来,它有些不自然)。我现在添加了自己的答案...我已经实现了这个模式。有关详细信息,请参见此处:https://github.com/mjsax/aeolus/tree/master/queries/lrb/src/main/java/de/hub/cs/dbis/lrb/types - Matthias J. Sax

8
我会实现一个自定义的元组/值类型,具体操作如下:不使用成员变量来存储数据,而是将每个属性映射到继承Values类型的对象列表中的固定索引。这种方法避免了一个普通Bean所面临的“字段分组”问题。
  1. 无需添加附加属性以进行字段分组(这是非常不自然的)
  2. 避免数据重复(减少了运输的字节数)
  3. 它保留了Bean设计模式的优点

以单词计数为例,代码示例如下:

public class WordCountTuple extends Values {
    private final static long serialVersionUID = -4386109322233754497L;

    // attribute indexes
    /** The index of the word attribute. */
    public final static int WRD_IDX = 0;
    /** The index of the count attribute. */
    public final static int CNT_IDX = 1;

    // attribute names
    /** The name of the word attribute. */
    public final static String WRD_ATT = "word";
    /** The name of the count attribute. */
    public final static String CNT_ATT = "count";

    // required for serialization
    public WordCountTuple() {}

    public WordCountTuple(String word, int count) {
        super.add(WRD_IDX, word);
        super.add(CNT_IDX, count);
    }

    public String getWord() {
        return (String)super.get(WRD_IDX);
    }

    public void setWort(String word) {
        super.set(WRD_IDX, word);
    }

    public int getCount() {
        return (Integer)super.get(CNT_IDX);
    }

    public void setCount(int count) {
        super.set(CNT_IDX, count);
    }

    public static Fields getSchema() {
        return new Fields(WRD_ATT, CNT_ATT);
    }
}

为了避免不一致性,使用“word”和“count”属性的final static变量。此外,方法getSchema()返回实现的模式,用于在Spout/Bolt方法.declareOutputFields(...)中声明输出流。
对于输出元组,可以直接使用这种类型:
public MyOutBolt implements IRichBolt {

    @Override
    public void execute(Tuple tuple) {
        // some more processing
        String word = ...
        int cnt = ...
        collector.emit(new WordCountTuple(word, cnt));
    }

    @Override
    public void declareOutputFields(OutputFieldsDeclarer declarer) {
        declarer.declare(WordCountTuple.getSchema());
    }

    // other methods omitted
}

对于输入元组,我建议采用以下模式:

public MyInBolt implements IRichBolt {
    // use a single instance for avoid GC trashing
    private final WordCountTuple input = new WordCountTuple();

    @Override
    public void execute(Tuple tuple) {
        this.input.clear();
        this.input.addAll(tuple.getValues());

        String word = input.getWord();
        int count = input.getCount();

        // do further processing
    }

    // other methods omitted
}

MyOutBoltMyInBolt可以如下连接:

TopologyBuilder b = ...
b.setBolt("out", new MyOutBolt());
b.setBolt("in", new MyInBolt()).fieldsGrouping("out", WordCountTuple.WRD_ATT);

使用字段分组是很简单的,因为WordCountTuple允许分别访问每个属性。

谢谢你,目前我使用“被接受的”方法,但是我使用EnumMap而不是字段。枚举指定了字段和字段类型。目前我正在努力创建一个抽象元组,但是枚举不能是抽象的,所以我必须使用普通Map或者容忍一些冗余——如果我有解决方案,我会在这里发布的。 - dermoritz
+1 我非常喜欢这个。不过有一个问题...由于OP的问题是关于管理许多字段,通过构造函数传递所有字段可能不太可行。另外,您将对象添加到Values数组中的顺序很重要,因此也许您的无参数构造函数应该使用null初始化所有内容? - Kit Menke
通过构造函数添加所有字段是一种口味问题。我猜null初始化可能是一个有效的改进。但对于设置器来说,这无关紧要,因为它们使用索引访问。当然,必须小心使用常规的数组列表方法(Values继承自那个类)。否则,你可能会把一切搞砸! - Matthias J. Sax
有趣且富有洞见的回答。你有任何想法如何为自定义对象列表实现这个功能吗? - Heisenberg
“List of custom objects”是什么意思? - Matthias J. Sax

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接