I am trying to write a collection of objects in Elasticsearch from Spark. I have to meet two requirements:
- Document is already serialized in JSON and should be written as is
- Elasticsearch document
_id
should be provided
Here's what I tried so far.
saveJsonToEs()
I tried to use saveJsonToEs()
like this (the serialized document contains field _id
with desired Elasticsearch ID):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id"),
("es.mapping.exclude", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
But the elasticsearch-hadoop
library gives this exception:
Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)
If I remove es.mapping.exclude
but keep es.mapping.id
and send a JSON with _id
inside (like {"_id":"blah",...}
)
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "_id")
)
EsSpark.saveJsonToEs(rdd, cfg)
I get this error:
Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 84.0 failed 4 times, most recent failure: Lost task 15.3 in stage 84.0 (TID 628, 172.31.35.69, executor 1): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [172.31.30.184:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
...
When I try to send this id as a different field (like {"superID":"blah",..."
:
val cfg = Map(
("es.resource", "myindex/mytype"),
("es.mapping.id", "superID")
)
EsSpark.saveJsonToEs(rdd, cfg)
It fails to extract the field:
17/12/20 15:15:38 WARN TaskSetManager: Lost task 8.0 in stage 84.0 (TID 586, 172.31.33.56, executor 0): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:106)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:161)
at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
When I remove es.mapping.id
and es.mapping.exclude
from the configuration, it works but the document id is generated by Elasticsearch (which violates requirement 2):
val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveJsonToEs(rdd, cfg)
saveToEsWithMeta()
There is another function to provide _id
and other metadata for inserting: saveToEsWithMeta()
that allows to solve requirement 2 but fails with requirement 1.
val rdd: RDD[(String, String)] = job.map{
r => r._id -> r.toJson()
}
val cfg = Map(
("es.resource", "myindex/mytype"),
)
EsSpark.saveToEsWithMeta(rdd, cfg)
In fact, Elasticsearch is not even able to parse what the elasticsearch-hadoop
sends:
Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
at org.apache.spark.scheduler.Task.run(Task.scala:112)
The question
Is it possible to write a collection of (documentID, serializedDocument)
from Spark into Elasticsearch (using elasticsearch-hadoop
)?
P.S. I am using Elasticsearch 5.6.3 and Spark 2.1.1.
I spent days banging my head against the wall trying to figure out why
saveToEsWithMeta
would not work when I used a string for the ID like so:This will throw JSON parsing-related errors, which deceptively leads you towards thinking that the issue is with your JSON, but then you log each one of your JSONs and see they're all valid.
Turns out that for whatever reason
ES_INPUT_JSON -> true
makes the left-hand side of the tuple, i.e. the ID, get parsed as a JSON too!the solution, JSON stringify the ID (will wrap ID in extra double quotes) so that parsing it as a JSON works:
Finally I found the problem: it was a typo in the config.
It was looking for a field
superID
but there was onlysuperID
(note the case). In the question it is also a bit misleading since in the code it appears like"es.mapping.id", "superID"
(which was not correct).The actual solution is like Levi Ramsey suggested:
The difference is that
es.mapping.id
cannot be_id
(as was indicated in the original post,_id
is the metadata and Elasticsearch does not accept it).Naturally it means that the new field
superID
should be added to the mapping (unless the mapping is dynamic). If storing additional field in the index is a burden, one should also:Thanks a lot to Alex Savitsky for pointing to the correct direction.
Have you tried something like:
I've tested (with elasticsearch-hadoop (connector version 2.4.5) against ES 1.7) and it works.
It can be done by passing
ES_INPUT_JSON
option tocfg
parameters map and returning a tuple containing the document id as the first element and the document serialized in JSON as the second element from the map function.I tested it with
"org.elasticsearch" %% "elasticsearch-spark-20" % "[6.0,7.0["
against Elasticsearch 6.4