Spark结构化流Kafka偏移管理

5

我正在研究将kafka偏移量存储在kafka中,以用于Spark Structured Streaming,就像DStreams一样工作stream.asInstanceOf[CanCommitOffsets].commitAsync(offsetRanges)。 我正在寻找相同的东西,但是针对结构化流。

它是否支持结构化流? 如果是,我该如何实现?

我知道可以使用.option("checkpointLocation", checkpointLocation)进行hdfs检查点,但我只对内置偏移管理感兴趣。

我希望kafka仅在内部存储偏移量,而不需要spark hdfs检查点。

2个回答

0

我正在使用从某处找到的这段代码。

public class OffsetManager {

    private String storagePrefix;

    public OffsetManager(String storagePrefix) {
        this.storagePrefix = storagePrefix;
    }

    /**
     * Overwrite the offset for the topic in an external storage.
     *
     * @param topic     - Topic name.
     * @param partition - Partition of the topic.
     * @param offset    - offset to be stored.
     */
    void saveOffsetInExternalStore(String topic, int partition, long offset) {

        try {

            FileWriter writer = new FileWriter(storageName(topic, partition), false);

            BufferedWriter bufferedWriter = new BufferedWriter(writer);
            bufferedWriter.write(offset + "");
            bufferedWriter.flush();
            bufferedWriter.close();

        } catch (Exception e) {
            e.printStackTrace();
            throw new RuntimeException(e);
        }
    }

    /**
     * @return he last offset + 1 for the provided topic and partition.
     */
    long readOffsetFromExternalStore(String topic, int partition) {

        try {

            Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));

            return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;

        } catch (Exception e) {
            e.printStackTrace();
        }

        return 0;
    }

    private String storageName(String topic, int partition) {
        return "Offsets\\" + storagePrefix + "-" + topic + "-" + partition;
    }

}

SaveOffset...在记录处理成功后被调用,否则不会存储偏移量。我正在使用Kafka主题作为源,因此我将检索到的偏移量指定为从ReadOffsets中获取的起始偏移量...


0
“它是否支持结构化流?”
不支持在结构化流中将偏移量提交回Kafka,类似于使用Spark Streaming(DStreams)所能做的。Spark结构化流+Kafka集成指南(Kafka特定配置)非常明确地说明了这一点:
“Kafka源不会提交任何偏移量。”
我在{{link2:如何在Spark结构化流中手动设置groupId和提交Kafka偏移量}}中写了一个更全面的答案。

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接