Spark Streaming提供了创建自定义接收器的功能,详见此处。要将接收器接收到的数据存储到Spark中,需要使用store(data)
方法。
我正在将数据存储到Spark中,这些数据具有与之相关联的某些属性。由自定义接收器扩展的Spark Receiver类提供了几种形式为store(data, metadata)
的存储方法,这意味着可以将元数据/属性与数据一起存储。下面的代码片段显示了我如何使用此方法来存储数据及其元数据/属性。
public class CustomReceiver extends Receiver<String> {
public CustomReceiver() {
super(StorageLevel.MEMORY_AND_DISK_2());
}
@Override
public void onStart() {
new Thread() {
@Override
public void run() {
try {
receive();
} catch (IOException e) {
restart("Error connecting: ", e);
}
}
}.start();
}
@Override
public void onStop() {
// Not needed as receive() method closes resources when stopped
}
private void receive() throws IOException {
String str = getData();
Map<String, String> metadata = getMetadata();
Iterator<String> it = Arrays.asList(str.split("\n\r")).iterator();
store(it, metadata);
if (isStopped()) {
closeConnections();
}
}
}
这些存储的数据可以从另一个类中访问,如下代码示例所示:
private void testCustomReceiver() {
JavaDStream<String> custom = ssc.receiverStream(new CustomReceiver());
JavaDStream<String> processedInput = custom.flatMap(row -> {
return Arrays.asList(row.split("\\r?\\n"));
});
processedInput.print();
}
现在让我们来谈谈我的问题:如何从上面展示的
testCustomReceiver()
方法中访问存储在自定义接收器中数据的元数据/属性?我已经尝试查找文档并在调试器中探索JavaDStream
对象以搜索元数据,但都没有成功。如果有任何帮助或建议,将不胜感激,谢谢。
processedInput.print();
访问了“元数据”吗? - Prashant Sharma