Python spark Dataframe to Elasticsearch

2019-05-07 14:02发布

I can't figure out how to write a dataframe to elasticsearch using python from spark. I followed the steps from here.

Here is my code:

# Read file
df = sqlContext.read \
    .format('com.databricks.spark.csv') \
    .options(header='true') \
    .load('/vagrant/data/input/input.csv', schema = customSchema)

df.registerTempTable("data")

# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")

es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_conf)

Above code gives

Caused by: net.razorvine.pickle.PickleException: expected zero arguments for construction of ClassDict (for pyspark.sql.types._create_row)

I also started the script from: spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py to ensure that elasticsearch-hadoop is loaded

2条回答
我只想做你的唯一
2楼-- · 2019-05-07 14:35

As zero323 said, the easiest way to load a Dataframe from PySpark to Elasticsearch is with the method

Dataframe.write.format("org.elasticsearch.spark.sql").save("index/type")  
查看更多
劫难
3楼-- · 2019-05-07 14:40

For starters saveAsNewAPIHadoopFile expects a RDD of (key, value) pairs and in your case this may happen only accidentally. The same thing applies to the value format you declare.

I am not familiar with Elastic but just based on the arguments you should probably try something similar to this:

kpi1.rdd.map(lambda row: (None, row.asDict()).saveAsNewAPIHadoopFile(...)

Since Elastic-Hadoop provide SQL Data Source you should be also able to skip that and save data directly:

df.write.format("org.elasticsearch.spark.sql").save(...)
查看更多
登录 后发表回答