将Spark-kafka InputDStream转换为字节数组数组

3

我正在使用Scala,并使用以下Spark Streaming方法从Kafka消费数据:

val lines = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics).map(_._2)

上述变量通过以下代码返回InputDStream,我能够以原始/二进制格式查看数据:

println(line)

但是我需要将avro格式(已提供模式)应用于原始/二进制格式,以便以期望的JSON格式查看数据。为了应用avro格式,我需要将上述InputDStream转换为Array [Bytes],该数组由avro使用。

请问有人可以告诉我如何将InputDStream转换为Array [Bytes]吗?

或者

如果您知道在InputDStream(Spark Streaming)上应用avro模式的更好方法,请分享。

1个回答

2

你需要做两件事。第一件是使用Kafka的DefaultDecoder,它可以为值类型提供一个Array[Byte]

val lines: DStream[(String, Array[Byte])] = 
  KafkaUtils
   .createDirectStream[String, Array[Byte], StringDecoder, DefaultDecoder](ssc, kafkaParams, topics)

然后,您需要通过额外的 map 应用您的Avro反序列化逻辑:

lines.map { case (_, bytes) => avroDeserializer.deserialize(bytes) }

这里的avroDeserializer是您自己的任意类,它知道如何从Avro字节创建您的类型。

我个人使用avro4s通过宏获得案例类反序列化。


非常感谢!我只需要将DStream的值作为Array [Byte]提取出来,所以我使用以下代码获取它:val lines:DStream [(Array [Byte])] = KafkaUtils.createDirectStream [ String,Array [Byte],StringDecoder,DefaultDecoder] (ssc,kafkaParams,topics)。map(_._2) - k_b

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接