I am trying to write a collection of objects in Elasticsearch from Spark. I have to meet two requirements:
- Document is already serialized in JSON and should be written as is
- Elasticsearch document
_id
should be provided
Here's what I tried so far.
saveJsonToEs()
I tried to use saveJsonToEs()
like this (the serialized document contains field _id
with desired Elasticsearch ID):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id"),
("es.mapping.exclude", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
But the elasticsearch-hadoop
library gives this exception:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)
If I remove es.mapping.exclude
but keep es.mapping.id
and send a JSON with _id
inside (like {"_id":"blah",...}
)
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
I get this error:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 84.0 failed 4 times, most recent failure: Lost task 15.3 in stage 84.0 (TID 628, 172.31.35.69, executor 1): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [172.31.30.184:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
...
When I try to send this id as a different field (like {"superID":"blah",..."
:
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "superID")
)
EsSpark.saveJsonToEs(rdd, cfg)
It fails to extract the field:
17/12/20 15:15:38 WARN TaskSetManager: Lost task 8.0 in stage 84.0 (TID 586, 172.31.33.56, executor 0): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:106)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:161)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
When I remove es.mapping.id
and es.mapping.exclude
from the configuration, it works but the document id is generated by Elasticsearch (which violates requirement 2):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveJsonToEs(rdd, cfg)
saveToEsWithMeta()
There is another function to provide _id
and other metadata for inserting: saveToEsWithMeta()
that allows to solve requirement 2 but fails with requirement 1.
val rdd: RDD[(String, String)] = job.map{
r => r._id -> r.toJson()
}
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveToEsWithMeta(rdd, cfg)
In fact, Elasticsearch is not even able to parse what the elasticsearch-hadoop
sends:
Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
The question
Is it possible to write a collection of (documentID, serializedDocument)
from Spark into Elasticsearch (using elasticsearch-hadoop
)?
P.S. I am using Elasticsearch 5.6.3 and Spark 2.1.1.