I am trying to receive streaming data from kafka. In this process I am able to receive and store the streaming data into JavaPairInputDStream. Now I need to analyze this data with out storing it into any database.So I want to convert this JavaPairInputDStream to DataSet or DataFrame
What I tried so far is:
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaPairRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.catalog.Function;
import org.apache.spark.streaming.Duration;
import org.apache.spark.streaming.api.java.AbstractJavaDStreamLike;
import org.apache.spark.streaming.api.java.JavaDStream;
import org.apache.spark.streaming.api.java.JavaPairDStream;
import org.apache.spark.streaming.api.java.JavaPairInputDStream;
import org.apache.spark.streaming.api.java.JavaStreamingContext;
import org.apache.spark.streaming.kafka.KafkaUtils;
import kafka.serializer.StringDecoder;
import scala.Tuple2;
//Streaming Working Code
public class KafkaToSparkStreaming
{
public static void main(String arr[]) throws InterruptedException
{
SparkConf conf = new SparkConf();
conf.set("spark.app.name", "SparkReceiver"); //The name of application. This will appear in the UI and in log data.
//conf.set("spark.ui.port", "7077"); //Port for application's dashboard, which shows memory and workload data.
conf.set("dynamicAllocation.enabled","false"); //Which scales the number of executors registered with this application up and down based on the workload
//conf.set("spark.cassandra.connection.host", "localhost"); //Cassandra Host Adddress/IP
conf.set("spark.serializer","org.apache.spark.serializer.KryoSerializer"); //For serializing objects that will be sent over the network or need to be cached in serialized form.
//conf.setMaster("local");
conf.set("spark.streaming.stopGracefullyOnShutdown", "true");
JavaSparkContext sc = new JavaSparkContext(conf);
// Create the context with 2 seconds batch size
JavaStreamingContext ssc = new JavaStreamingContext(sc, new Duration(2000));
Map<String, String> kafkaParams = new HashMap<String, String>();
kafkaParams.put("zookeeper.connect", "localhost:2181"); //Make all kafka data for this cluster appear under a particular path.
kafkaParams.put("group.id", "testgroup"); //String that uniquely identifies the group of consumer processes to which this consumer belongs
kafkaParams.put("metadata.broker.list", "localhost:9092"); //Producer can find a one or more Brokers to determine the Leader for each topic.
kafkaParams.put("serializer.class", "kafka.serializer.StringEncoder"); //Serializer to use when preparing the message for transmission to the Broker.
kafkaParams.put("request.required.acks", "1"); //Producer to require an acknowledgement from the Broker that the message was received.
Set<String> topics = Collections.singleton("ny-2008.csv");
//Create an input DStream for Receiving data from socket
JavaPairInputDStream<String, String> directKafkaStream = KafkaUtils.createDirectStream(ssc,
String.class,
String.class,
StringDecoder.class,
StringDecoder.class,
kafkaParams, topics);
//System.out.println(directKafkaStream);
directKafkaStream.print();
}
}
Technically Dstream is sequence of RRDs, you won't convert Dstream to Datframe instead you will convert each RDD to Dataframe/Dataset as below(Scala code please convert it in Java for your case):
stream.foreachRDD { rdd =>
val dataFrame = rdd.map {case (key, value) => Row(key, value)}.toDF()
}
Here is the complete working code using Spark 2.0.