I was testing ElasticSearch and Spark integration on my local machine, using some test data loaded in elasticsearch.
val sparkConf = new SparkConf().setAppName("Test").setMaster("local")
val sc = new SparkContext(sparkConf)
val conf = new JobConf()
conf.set("spark.serializer", classOf[KryoSerializer].getName)
conf.set("es.nodes", "localhost:9200")
conf.set("es.resource", "bank/account")
conf.set("es.query", "?q=firstname:Daniel")
val esRDD = sc.hadoopRDD(conf,classOf[EsInputFormat[Text, MapWritable]],
classOf[Text], classOf[MapWritable])
esRDD.first()
esRDD.collect()
The code runs fine and returns the correct result successfully with esRDD.first()
However, esRDD.collect() will generate exception:
java.io.NotSerializableException: org.apache.hadoop.io.Text
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeArray(ObjectOutputStream.java:1378)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1174)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:71)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:193)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)
I believe this is related to the issue mentioned here http://www.elasticsearch.org/guide/en/elasticsearch/hadoop/current/spark.html so I added this line accordingly
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Am I supposed to do something else to get it working? Thank you
Updates: the serialziation setup problem was solved. by using
sparkConf.set("spark.serializer", classOf[KryoSerializer].getName)
instead of
conf.set("spark.serializer", classOf[KryoSerializer].getName)
Now there is another one There are 1000 distinct records in this dataset
esRDD.count()
returns 1000 no problem, however
esRDD.distinct().count()
returns 5 ! If I print the records
esRDD.foreach(println)
It prints out the 1000 records correctly. But if I use collect or take
esRDD.collect().foreach(println)
esRDD.take(10).foreach(println)
it will print DUPLICATED records, and there is indeed only 5 UNIQUE records shown up, which seems to be a random subset of the entire dataset - it's not the first 5 records. If I save the RDD and read it back
esRDD.saveAsTextFile("spark-output")
val esRDD2 = sc.textFile("spark-output")
esRDD2.distinct().count()
esRDD2.collect().foreach(println)
esRDD2.take(10).foreach(println)
esRDD2 behaves as expected. I wonder if there is a bug, or something I don't understand about the behavior of collect/take. Or is it because I'm running everything locally. By default Spark RDD seems to use 5 partitions, as shown in the number of part-xxxx files of the "spark-output" file. That's probably why esRDD.collect() and esRDD.distinct() returned 5 unique records, instead of some other random number. But that's still not right.
you can try
You should use the following codes to initialize: