我希望一个消费者角色能够订阅Kafka主题并流式传输数据以便在使用Spark Streaming进行进一步处理,为什么要使用Actor? 因为我读过它的监督策略是处理Kafka故障的好方法(例如,在失败时重新启动)。
我发现了两个选项:
我发现了两个选项:
- Java
KafkaConsumer
类:其poll()
方法返回一个Map[String, Object]
。 我想要像KafkaUtils.createDirectStream
一样返回一个DStream
,但不知道如何在角色外部获取该流。 - 扩展
ActorHelper
特质并使用actorStream()
,如此示例所示。 后一种选项没有显示连接到主题,而是连接到套接字。