如何访问由Spark Streaming自定义接收器存储的元数据?

6

Spark Streaming提供了创建自定义接收器的功能,详见此处。要将接收器接收到的数据存储到Spark中,需要使用store(data)方法。

我正在将数据存储到Spark中,这些数据具有与之相关联的某些属性。由自定义接收器扩展的Spark Receiver类提供了几种形式为store(data, metadata)的存储方法,这意味着可以将元数据/属性与数据一起存储。下面的代码片段显示了我如何使用此方法来存储数据及其元数据/属性。

public class CustomReceiver extends Receiver<String> {

    public CustomReceiver() {
        super(StorageLevel.MEMORY_AND_DISK_2());
    }

    @Override
    public void onStart() {
        new Thread() {
            @Override
            public void run() {
                try {
                    receive();
                } catch (IOException e) {
                    restart("Error connecting: ", e);
                }
            }
        }.start();
    }

    @Override
    public void onStop() {
        // Not needed as receive() method closes resources when stopped
    }

    private void receive() throws IOException {
        String str = getData();
        Map<String, String> metadata = getMetadata();
        Iterator<String> it = Arrays.asList(str.split("\n\r")).iterator();

        store(it, metadata);

        if (isStopped()) {
            closeConnections();
        }
    }
}

这些存储的数据可以从另一个类中访问,如下代码示例所示:
private void testCustomReceiver() {
    JavaDStream<String> custom = ssc.receiverStream(new CustomReceiver());

    JavaDStream<String> processedInput = custom.flatMap(row -> {
        return Arrays.asList(row.split("\\r?\\n"));
    });

    processedInput.print();
}

现在让我们来谈谈我的问题:如何从上面展示的testCustomReceiver()方法中访问存储在自定义接收器中数据的元数据/属性?我已经尝试查找文档并在调试器中探索JavaDStream对象以搜索元数据,但都没有成功。如果有任何帮助或建议,将不胜感激,谢谢。

你不是已经通过processedInput.print();访问了“元数据”吗? - Prashant Sharma
1
@ab853 你找到解决方案了吗?我也遇到了同样的问题。 - yishaiz
@yishaiz,可能不是。这是一段时间以前的事情了,但如果我找到了解决方案,我会在这里添加它。 - ab853
1个回答

1

我一直在研究Spark代码,我认为你永远无法再次访问它。事实上,我不认为它被使用过。

您的接收器的监管者将metadataOption放入ReceivedBlockInfo中(这是org.apache.spark.streaming的私有部分)。从那里开始...没有任何引用到ReceivedBlockInfo.metadataOption在流式处理代码库中。理论上可能是ReceivedBlockInfo被序列化然后反序列化成另一个类,或者某些花哨的反射检索元数据,但这两种情况都是如此反模式,我不会指望它发生。

为什么它在那里?我认为意图是将其作为元数据检查点系统的一部分,但它可能从未连接,或者接收器元数据与流检查点之间的连接已经断开。

无论哪种方式,一旦块被丢入流中,块元数据就消失了。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接