将Spark Streaming输出写入Socket

4

我有一个DStream“Crowd”,我想将“Crowd”中的每个元素写入套接字。当我尝试从该套接字读取时,它不会打印任何内容。我使用以下代码行:

val server = new ServerSocket(4000,200);
val conn = server.accept()
val out = new PrintStream(conn.getOutputStream());
crowd.foreachRDD(rdd => {rdd.foreach(record=>{out.println(record)})})

但是如果使用这种方法(虽然这不是我想要的):
crowd.foreachRDD(rdd => out.println(rdd)) 

它会向套接字中写入一些内容。

我怀疑使用rdd.foreach()存在问题,虽然它应该能够正常工作。但我不确定我错过了什么。

3个回答

4
在DStream闭包之外的代码在Driver上执行,而rdd.foreach(...)将在RDD的每个分布式分区上执行。 因此,在Driver的机器上创建一个套接字并尝试在另一台机器上写入它是不起作用的,这是显而易见的原因。 DStream.foreachRDD在Driver上执行,因此在这种情况下,套接字和计算在同一主机上执行,因此可以正常工作。
考虑到RDD计算的分布式特性,使用此服务器套接字方法会很难使其正常工作,因为动态服务发现会成为一个挑战,即“我的服务器套接字打开在哪里?”请考虑使用某些系统,这些系统将允许您对分布式数据进行集中访问。Kafka是这种流处理的良好替代方案。

谢谢。这很有帮助,我明白了。为了克服这个服务发现的挑战,它将语句修改为:crowd.foreachRDD(rdd => {rdd.collect.foreach(record=>{out.println(record)})}). 这将起作用(已经起作用),因为它将从工作节点收集rdd分区并将其发送到驱动程序,后者将写入套接字。我希望这是正确的方式。你对这个collect.foreach怎么看? - vick

0

这里是官方文档,你可以找到答案!

你需要在 foreachRDD 函数内创建连接,如果想要更优化的效果,需创建一组连接池,在 foreachPartition 函数内选择所需的连接,并通过调用 foreach 函数来将元素发送至该连接。以下是最佳实践示例代码:

dstream.foreachRDD { rdd =>
  rdd.foreachPartition { partitionOfRecords =>
    // ConnectionPool is a static, lazily initialized pool of connections
    val connection = ConnectionPool.getConnection()
    partitionOfRecords.foreach(record => connection.send(record))
    ConnectionPool.returnConnection(connection)  // return to the pool for future reuse
  }
}

无论如何,请检查其他评论,因为它们提供了关于问题背景的良好知识。

0
crowd.foreachRDD(rdd => {rdd.collect.foreach(record=>{out.println(record)})})

你在评论中提供的代码建议可以正常工作,但在这种情况下,您必须在驱动程序中收集RDD的所有记录。如果记录数量很少,那么这将是可以接受的,但如果记录数量大于驱动程序的内存,则会成为瓶颈。您的第一次尝试应始终在客户端处理数据。请记住,RDD分布在工作机器上,这意味着首先需要将RDD中的所有记录带到驱动程序中,从而增加了通信量,这在分布式计算中是致命的。因此,如上所述,只有当RDD中的记录有限时,您的代码才能正常工作。

我正在解决类似的问题,一直在寻找如何汇集连接并将其序列化到客户端机器的方法。如果有人有任何答案,那将是很好的。


网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接