Flink: 在CoFlatMapFunction中共享状态

11

CoFlatMapFunction方面遇到了一些困难。如果我将它放在窗口之前的DataStream上,它似乎可以正常工作,但如果将其放在窗口的“apply”函数之后,就会出现错误。

我正在测试两个流,主要的“Features”在flatMap1上不断地输入数据,控制流“Model”在flatMap2上通过请求更改模型。

我能够在flatMap2中正确设置并看到b0/b1的值,但是flatMap1始终将b0和b1视为初始化时设置为0的值。

我是否遗漏了一些明显的东西?

public static class applyModel implements CoFlatMapFunction<Features, Model, EnrichedFeatures> {
    private static final long serialVersionUID = 1L;

    Double b0;
    Double b1;

    public applyModel(){
        b0=0.0;
        b1=0.0;
    }

    @Override
    public void flatMap1(Features value, Collector<EnrichedFeatures> out) {
        System.out.print("Main: " + this + "\n");
    }

    @Override
    public void flatMap2(Model value, Collector<EnrichedFeatures> out) {
        System.out.print("Old Model: " + this + "\n");
        b0 = value.getB0();
        b1 = value.getB1();
        System.out.print("New Model: " + this + "\n");
    }

    @Override
    public String toString(){
        return "CoFlatMapFunction: {b0: " + b0 + ", b1: " + b1 + "}";
    }
}

你在窗口的 apply 函数中做什么?也许你可以与我们分享相应的代码。 - Till Rohrmann
for (Raw value: values) { if(value.getTs() > end_ts) end_ts = value.getTs(); if (value.getTs() < start_ts) start_ts = value.getTs(); if(last_event_ts == 0L){ last_event_ts = value.getTs(); } else { dwell_time += value.getTs() - last_event_ts; last_event_ts = value.getTs(); } } out.collect(new Features(tuple.getField(0), tuple.getField(2), tuple.getField(1), start_ts, end_ts, size, dwell_time, Boolean.FALSE)); - Vladimir Stoyak
我会检查一下是否能够重现你的问题。 - Till Rohrmann
并行实例是我的问题。必须确保两个流以相同的方式进行键控,并将我的CoFlatMapFunction的并行度设置为1。感谢Stephan Ewen。更多信息 - Vladimir Stoyak
很高兴听到这个消息。你想在这里发布Stephan的答案,以便其他遇到同样问题的人可以轻松找到解决方案吗? - Till Rohrmann
1个回答

4

以下是邮件列表中的回答...

CoFlatMapFunction是否可以并行执行?

如果是,你需要一些方法来确定哪条记录会被分配到哪个并行实例。在某种程度上,CoFlatMapFunction在模型和会话窗口结果之间进行了并行(分区)连接,因此你需要一些形式的键来选择元素所属的分区。这样说有意义吗?

如果不是,请尝试将其明确设置为并行度1。

问候,Stephan


通过broadcast()可以实现所有人都可以只读访问的全局状态。

目前还没有提供全局状态可供所有人读取和更新。对该状态进行一致操作将非常昂贵,需要某种形式的分布式通信/共识。

相反,我建议你采用以下方法:

1)如果你可以对状态进行分区,请使用keyBy().mapWithState() - 这样可以本地化状态操作,使其非常快速。

2)如果你的状态没有按键组织,则你的状态可能非常小,你可能可以使用非并行操作。

3)如果某些操作更新状态并且另一个操作访问它,则通常可以使用迭代和CoFlatMapFunction实现(一侧是原始输入,另一侧是反馈输入)。

最终,所有方法都将状态访问和修改本地化,这是一个很好的模式,如果可能的话,请遵循它。

问候,Stephan


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接