I can't figure out how to write a dataframe to elasticsearch using python from spark. I followed the steps from here.
Here is my code:
# Read file
df = sqlContext.read \
.format('com.databricks.spark.csv') \
.options(header='true') \
.load('/vagrant/data/input/input.csv', schema = customSchema)
df.registerTempTable("data")
# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")
es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
Above code gives
Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)
I also started the script from:
spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py
to ensure that elasticsearch-hadoop
is loaded
As zero323 said, the easiest way to load a Dataframe from PySpark to Elasticsearch is with the method
For starters
saveAsNewAPIHadoopFile
expects aRDD
of(key, value)
pairs and in your case this may happen only accidentally. The same thing applies to the value format you declare.I am not familiar with Elastic but just based on the arguments you should probably try something similar to this:
Since Elastic-Hadoop provide SQL Data Source you should be also able to skip that and save data directly: