虽然没有内置的解决方案,但自己完成并不是太难。这里提供一种基于自定义的UdpSocketInputDStream[T]
的简单解决方案:
import java.io._
import java.net.{ConnectException, DatagramPacket, DatagramSocket, InetAddress}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import scala.reflect.ClassTag
import scala.util.control.NonFatal
class UdpSocketInputDStream[T: ClassTag](
_ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
new UdpSocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
class UdpSocketReceiver[T: ClassTag](host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel) extends Receiver[T](storageLevel) {
var udpSocket: DatagramSocket = _
override def onStart(): Unit = {
try {
udpSocket = new DatagramSocket(port, InetAddress.getByName(host))
} catch {
case e: ConnectException =>
restart(s"Error connecting to $port", e)
return
}
new Thread("Udp Socket Receiver") {
setDaemon(true)
override def run() {
receive()
}
}.start()
}
def receive() {
try {
val buffer = new Array[Byte](2048)
val packet = new DatagramPacket(buffer, buffer.length)
udpSocket.receive(packet)
val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength))
while (!isStopped() && iterator.hasNext) {
store(iterator.next())
}
if (!isStopped()) {
restart("Udp socket data stream had no more data")
}
} catch {
case NonFatal(e) =>
restart("Error receiving data", e)
} finally {
onStop()
}
}
override def onStop(): Unit = {
synchronized {
if (udpSocket != null) {
udpSocket.close()
udpSocket = null
}
}
}
}
为了让
StreamingContext
在自身上添加一个方法,我们使用隐式类对其进行增强:
object Implicits {
implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal {
def udpSocketStream[T: ClassTag](host: String,
port: Int,
converter: InputStream => Iterator[T],
storageLevel: StorageLevel): InputDStream[T] = {
new UdpSocketInputDStream(ssc, host, port, converter, storageLevel)
}
}
}
以下是如何调用它们的方法:
import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.reflect.ClassTag
object TestRunner {
import Implicits._
def main(args: Array[String]): Unit = {
val sparkContext = new SparkContext("local[*]", "udpTest")
val ssc = new StreamingContext(sparkContext, Seconds(4))
val stream = ssc.udpSocketStream("localhost",
3003,
bytesToLines,
StorageLevel.MEMORY_AND_DISK_SER_2)
stream.print()
ssc.start()
ssc.awaitTermination()
}
def bytesToLines(inputStream: InputStream): Iterator[String] = {
val dataInputStream = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))
new NextIterator[String] {
protected override def getNext(): String = {
val nextValue = dataInputStream.readLine()
if (nextValue == null) {
finished = true
}
nextValue
}
protected override def close() {
dataInputStream.close()
}
}
}
abstract class NextIterator[U] extends Iterator[U] {
protected var finished = false
private var gotNext = false
private var nextValue: U = _
private var closed = false
override def next(): U = {
if (!hasNext) {
throw new NoSuchElementException("End of stream")
}
gotNext = false
nextValue
}
override def hasNext: Boolean = {
if (!finished) {
if (!gotNext) {
nextValue = getNext()
if (finished) {
closeIfNeeded()
}
gotNext = true
}
}
!finished
}
def closeIfNeeded() {
if (!closed) {
closed = true
close()
}
}
protected def getNext(): U
protected def close()
}
}
大部分代码都是从Spark提供的
SocketInputDStream[T]
中获取的,我只是重新使用了它。我还使用了
NextIterator
的代码,该代码被
bytesToLines
使用,它只是从数据包中消耗掉一行,并将其转换为
String
。如果您有更复杂的逻辑,可以通过传递
converter: InputStream => Iterator[T]
来提供自己的实现。
使用简单的UDP数据包进行测试:
echo -n "hello hello hello!" >/dev/udp/localhost/3003
产生:
-------------------------------------------
Time: 1482676728000 ms
-------------------------------------------
hello hello hello!
当然,这还需要进一步测试。我还有一个隐藏的假设,即每个从
DatagramPacket
创建的
buffer
都是2048字节,这可能是您想更改的内容。