The example mentioned in
http://spark.apache.org/docs/latest/streaming-programming-guide.html
Lets me receive data packets in a TCP stream and listening on port 9999
import org.apache.spark._
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Create a local StreamingContext with two working thread and batch interval of 1 second.
// The master requires 2 cores to prevent from a starvation scenario.
val conf = new SparkConf().setMaster("local[2]").setAppName("NetworkWordCount")
val ssc = new StreamingContext(conf, Seconds(1))
// Create a DStream that will connect to hostname:port, like localhost:9999
val lines = ssc.socketTextStream("localhost", 9999)
// Split each line into words
val words = lines.flatMap(_.split(" "))
import org.apache.spark.streaming.StreamingContext._ // not necessary since Spark 1.3
// Count each word in each batch
val pairs = words.map(word => (word, 1))
val wordCounts = pairs.reduceByKey(_ + _)
// Print the first ten elements of each RDD generated in this DStream to the console
wordCounts.print()
ssc.start() // Start the computation
ssc.awaitTermination() // Wait for the computation to terminate
I am able to send data over TCP by creating a data server by using in my Linux system
$ nc -lk 9999
Question
I need to receive stream from an android phone streaming using UDP and the Scala/Spark
val lines = ssc.socketTextStream("localhost", 9999)
receives ONLY in TCP streams.
How can I receive UDP streams in a similar simple manner using Scala+Spark and create Spark DStream.
There isn't something built in, but it's not too much work to get it done youself. Here is a simple solution I made based on a custom UdpSocketInputDStream[T]
:
import java.io._
import java.net.{ConnectException, DatagramPacket, DatagramSocket, InetAddress}
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.StreamingContext
import org.apache.spark.streaming.dstream.ReceiverInputDStream
import org.apache.spark.streaming.receiver.Receiver
import scala.reflect.ClassTag
import scala.util.control.NonFatal
class UdpSocketInputDStream[T: ClassTag](
_ssc: StreamingContext,
host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel
) extends ReceiverInputDStream[T](_ssc) {
def getReceiver(): Receiver[T] = {
new UdpSocketReceiver(host, port, bytesToObjects, storageLevel)
}
}
class UdpSocketReceiver[T: ClassTag](host: String,
port: Int,
bytesToObjects: InputStream => Iterator[T],
storageLevel: StorageLevel) extends Receiver[T](storageLevel) {
var udpSocket: DatagramSocket = _
override def onStart(): Unit = {
try {
udpSocket = new DatagramSocket(port, InetAddress.getByName(host))
} catch {
case e: ConnectException =>
restart(s"Error connecting to $port", e)
return
}
// Start the thread that receives data over a connection
new Thread("Udp Socket Receiver") {
setDaemon(true)
override def run() {
receive()
}
}.start()
}
/** Create a socket connection and receive data until receiver is stopped */
def receive() {
try {
val buffer = new Array[Byte](2048)
// Create a packet to receive data into the buffer
val packet = new DatagramPacket(buffer, buffer.length)
udpSocket.receive(packet)
val iterator = bytesToObjects(new ByteArrayInputStream(packet.getData, packet.getOffset, packet.getLength))
// Now loop forever, waiting to receive packets and printing them.
while (!isStopped() && iterator.hasNext) {
store(iterator.next())
}
if (!isStopped()) {
restart("Udp socket data stream had no more data")
}
} catch {
case NonFatal(e) =>
restart("Error receiving data", e)
} finally {
onStop()
}
}
override def onStop(): Unit = {
synchronized {
if (udpSocket != null) {
udpSocket.close()
udpSocket = null
}
}
}
}
In order to get StreamingContext
to add a method on itself, we enrich it with an implicit class:
object Implicits {
implicit class StreamingContextOps(val ssc: StreamingContext) extends AnyVal {
def udpSocketStream[T: ClassTag](host: String,
port: Int,
converter: InputStream => Iterator[T],
storageLevel: StorageLevel): InputDStream[T] = {
new UdpSocketInputDStream(ssc, host, port, converter, storageLevel)
}
}
}
And here is how you call it all:
import java.io.{BufferedReader, InputStream, InputStreamReader}
import java.nio.charset.StandardCharsets
import org.apache.spark.SparkContext
import org.apache.spark.storage.StorageLevel
import org.apache.spark.streaming.dstream.InputDStream
import org.apache.spark.streaming.{Seconds, StreamingContext}
import scala.reflect.ClassTag
object TestRunner {
import Implicits._
def main(args: Array[String]): Unit = {
val sparkContext = new SparkContext("local[*]", "udpTest")
val ssc = new StreamingContext(sparkContext, Seconds(4))
val stream = ssc.udpSocketStream("localhost",
3003,
bytesToLines,
StorageLevel.MEMORY_AND_DISK_SER_2)
stream.print()
ssc.start()
ssc.awaitTermination()
}
def bytesToLines(inputStream: InputStream): Iterator[String] = {
val dataInputStream = new BufferedReader(
new InputStreamReader(inputStream, StandardCharsets.UTF_8))
new NextIterator[String] {
protected override def getNext(): String = {
val nextValue = dataInputStream.readLine()
if (nextValue == null) {
finished = true
}
nextValue
}
protected override def close() {
dataInputStream.close()
}
}
}
abstract class NextIterator[U] extends Iterator[U] {
protected var finished = false
private var gotNext = false
private var nextValue: U = _
private var closed = false
override def next(): U = {
if (!hasNext) {
throw new NoSuchElementException("End of stream")
}
gotNext = false
nextValue
}
override def hasNext: Boolean = {
if (!finished) {
if (!gotNext) {
nextValue = getNext()
if (finished) {
closeIfNeeded()
}
gotNext = true
}
}
!finished
}
def closeIfNeeded() {
if (!closed) {
closed = true
close()
}
}
protected def getNext(): U
protected def close()
}
}
Most of this code is taken from the SocketInputDStream[T]
provided by Spark, I simply re-used it. I also took the code for the NextIterator
which is used by bytesToLines
, all it does is consume the line from the packet and transform it to a String
. If you have more complex logic, you can provide it by passing converter: InputStream => Iterator[T]
your own implementation.
Testing it with simple UDP packet:
echo -n "hello hello hello!" >/dev/udp/localhost/3003
Yields:
-------------------------------------------
Time: 1482676728000 ms
-------------------------------------------
hello hello hello!
Of course, this has to be further tested. I also has a hidden assumption that each buffer
created from the DatagramPacket
is 2048 bytes, which is perhaps something you'll want to change.