Spark Scala 监听端口接收 UDP 数据

3

http://spark.apache.org/docs/latest/streaming-programming-guide.html中提到的例子,允许我在TCP流中接收数据包并监听9999端口

import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3

// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.

val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))


 // Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)

// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start()             // Start the computation
ssc.awaitTermination()  // Wait for the computation to terminate

我可以通过在Linux系统中创建数据服务器,使用以下命令发送TCP数据:

$ nc -lk 9999

问题
我需要使用UDP从安卓手机流式传输并使用Scala/Spark接收
val lines = ssc.socketTextStream("localhost", 9999)
但是该代码只能接收TCP流。

如何使用类似的简单方式接收UDP流,并创建Spark DStream?

2个回答

6

虽然没有内置的解决方案,但自己完成并不是太难。这里提供一种基于自定义的UdpSocketInputDStream[T]的简单解决方案:

import java.io._
import java.net.{ConnectException, DatagramPacket, DatagramSocket, InetAddress}

import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver

import scala.reflect.ClassTag
import scala.util.control.NonFatal

class UdpSocketInputDStream[T: ClassTag](
                                          _ssc: StreamingContext,
                                          host: String,
                                          port: Int,
                                          bytesToObjects: InputStream => Iterator[T],
                                          storageLevel: StorageLevel
                                        ) extends ReceiverInputDStream[T](_ssc) {

  def getReceiver(): Receiver[T] = {
    new UdpSocketReceiver(host, port, bytesToObjects, storageLevel)
  }
}

class UdpSocketReceiver[T: ClassTag](host: String,
                                     port: Int,
                                     bytesToObjects: InputStream => Iterator[T],
                                     storageLevel: StorageLevel) extends Receiver[T](storageLevel) {

  var udpSocket: DatagramSocket = _

  override def onStart(): Unit = {

    try {
      udpSocket = new DatagramSocket(port, InetAddress.getByName(host))
    } catch {
      case e: ConnectException =>
        restart(s"Error connecting to $port", e)
        return
    }

    // Start the thread that receives data over a connection
    new Thread("Udp Socket Receiver") {
      setDaemon(true)

      override def run() {
        receive()
      }
    }.start()
  }

  /** Create a socket connection and receive data until receiver is stopped */
  def receive() {
    try {
      val buffer = new Array[Byte](2048)

      // Create a packet to receive data into the buffer
      val packet = new DatagramPacket(buffer, buffer.length)

      udpSocket.receive(packet)

      val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength))
      // Now loop forever, waiting to receive packets and printing them.
      while (!isStopped() && iterator.hasNext) {
        store(iterator.next())
      }

      if (!isStopped()) {
        restart("Udp socket data stream had no more data")
      }
    } catch {
      case NonFatal(e) =>
        restart("Error receiving data", e)
    } finally {
      onStop()
    }
  }

  override def onStop(): Unit = {
    synchronized {
      if (udpSocket != null) {
        udpSocket.close()
        udpSocket = null
      }
    }
  }
}

为了让StreamingContext在自身上添加一个方法,我们使用隐式类对其进行增强:
object Implicits {
  implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal {
    def udpSocketStream[T: ClassTag](host: String,
                                     port: Int,
                                     converter: InputStream => Iterator[T],
                                     storageLevel: StorageLevel): InputDStream[T] = {
      new UdpSocketInputDStream(ssc, host, port, converter, storageLevel)
    }
  }
}

以下是如何调用它们的方法:

import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.nio.charset.StandardCharsets

import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}

import scala.reflect.ClassTag

object TestRunner {
  import Implicits._

  def main(args: Array[String]): Unit = {
    val sparkContext = new SparkContext("local[*]", "udpTest")
    val ssc = new StreamingContext(sparkContext, Seconds(4))

    val stream = ssc.udpSocketStream("localhost", 
                                     3003, 
                                     bytesToLines, 
                                     StorageLevel.MEMORY_AND_DISK_SER_2)
    stream.print()

    ssc.start()
    ssc.awaitTermination()
  }

  def bytesToLines(inputStream: InputStream): Iterator[String] = {
    val dataInputStream = new BufferedReader(
      new InputStreamReader(inputStream, StandardCharsets.UTF_8))
    new NextIterator[String] {
      protected override def getNext(): String = {
        val nextValue = dataInputStream.readLine()
        if (nextValue == null) {
          finished = true
        }
        nextValue
      }

      protected override def close() {
        dataInputStream.close()
      }
    }
  }

  abstract class NextIterator[U] extends Iterator[U] {
    protected var finished = false
    private var gotNext = false
    private var nextValue: U = _
    private var closed = false

    override def next(): U = {
      if (!hasNext) {
        throw new NoSuchElementException("End of stream")
      }
      gotNext = false
      nextValue
    }

    override def hasNext: Boolean = {
      if (!finished) {
        if (!gotNext) {
          nextValue = getNext()
          if (finished) {
            closeIfNeeded()
          }
          gotNext = true
        }
      }
      !finished
    }

    def closeIfNeeded() {
      if (!closed) {
        closed = true
        close()
      }
    }

    protected def getNext(): U
    protected def close()
  }
}

大部分代码都是从Spark提供的SocketInputDStream[T]中获取的,我只是重新使用了它。我还使用了NextIterator的代码,该代码被bytesToLines使用,它只是从数据包中消耗掉一行,并将其转换为String。如果您有更复杂的逻辑,可以通过传递converter: InputStream => Iterator[T]来提供自己的实现。
使用简单的UDP数据包进行测试:
echo -n "hello hello hello!" >/dev/udp/localhost/3003

产生:

-------------------------------------------
Time: 1482676728000 ms
-------------------------------------------
hello hello hello!

当然,这还需要进一步测试。我还有一个隐藏的假设,即每个从DatagramPacket创建的buffer都是2048字节,这可能是您想更改的内容。

有没有类似的Python代码可用,例如使用Spark Streaming或Spark结构化流的Python代码。 - OSK
我需要重新磨练我的Python技巧。我会看看是否很快有时间做这件事。 - Yuval Itzchakov
非常感谢。我在那里发布了一个问题,并收到了一条评论 http://stackoverflow.com/questions/42458812/spark-streaming-custom-receiver-in-python-receive-udp-over-socket - OSK

0
Yuval Itzchakov 的解决方案存在问题,即接收器只接收一条消息并重新启动。只需按下面所示将“重新启动”替换为“接收”即可解决问题。
def receive() {
    try {
      val buffer = new Array[Byte](200000)

      // Create a packet to receive data into the buffer
      val packet = new DatagramPacket(buffer, buffer.length)
      udpSocket.receive(packet)

      val iterator = bytesToLines(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength))
      // Now loop forever, waiting to receive packets and printing them.
      while (!isStopped() && iterator.hasNext) {
        store(iterator)
      }

      if (!isStopped()) {
//        restart("Udp socket data stream had no more data")
       receive()
      }
    } catch {
      case NonFatal(e) =>
        restart("Error receiving data", e)
    } finally {
      onStop()
    }
  }

网页内容由stack overflow 提供, 点击上面的
可以查看英文原文,
原文链接