Elasticsearch + Spark: write json with custom docu

2019-08-18 04:18发布

问题:

I am trying to write a collection of objects in Elasticsearch from Spark. I have to meet two requirements:

  1. Document is already serialized in JSON and should be written as is
  2. Elasticsearch document _id should be provided

Here's what I tried so far.

saveJsonToEs()

I tried to use saveJsonToEs() like this (the serialized document contains field _id with desired Elasticsearch ID):

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "_id"),
  ("es.mapping.exclude", "_id")
)

EsSpark.saveJsonToEs(rdd, cfg)

But the elasticsearch-hadoop library gives this exception:

Caused by: org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: When writing data as JSON, the field exclusion feature is ignored. This is most likely not what the user intended. Bailing out...
    at org.elasticsearch.hadoop.util.Assert.isTrue(Assert.java:60)
    at org.elasticsearch.hadoop.rest.InitializationUtils.validateSettings(InitializationUtils.java:253)

If I remove es.mapping.exclude but keep es.mapping.id and send a JSON with _id inside (like {"_id":"blah",...})

val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "_id")
)

EsSpark.saveJsonToEs(rdd, cfg)

I get this error:

Exception in thread "main" org.apache.spark.SparkException: Job aborted due to stage failure: Task 15 in stage 84.0 failed 4 times, most recent failure: Lost task 15.3 in stage 84.0 (TID 628, 172.31.35.69, executor 1): org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [172.31.30.184:9200] returned Bad Request(400) - Field [_id] is a metadata field and cannot be added inside a document. Use the index API request parameters.; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)
...

When I try to send this id as a different field (like {"superID":"blah",...":

 val cfg = Map(
  ("es.resource", "myindex/mytype"),
  ("es.mapping.id", "superID")
)

EsSpark.saveJsonToEs(rdd, cfg)

It fails to extract the field:

17/12/20 15:15:38 WARN TaskSetManager: Lost task 8.0 in stage 84.0 (TID 586, 172.31.33.56, executor 0): org.elasticsearch.hadoop.EsHadoopIllegalArgumentException: [JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]
    at org.elasticsearch.hadoop.serialization.bulk.AbstractBulkFactory$FieldWriter.write(AbstractBulkFactory.java:106)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.writeTemplate(TemplatedBulk.java:80)
    at org.elasticsearch.hadoop.serialization.bulk.TemplatedBulk.write(TemplatedBulk.java:56)
    at org.elasticsearch.hadoop.rest.RestRepository.writeToIndex(RestRepository.java:161)
    at org.elasticsearch.spark.rdd.EsRDDWriter.write(EsRDDWriter.scala:67)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.elasticsearch.spark.rdd.EsSpark$$anonfun$doSaveToEs$1.apply(EsSpark.scala:107)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)

When I remove es.mapping.id and es.mapping.exclude from the configuration, it works but the document id is generated by Elasticsearch (which violates requirement 2):

val rdd: RDD[String] = job.map{ r => r.toJson() }

val cfg = Map(
  ("es.resource", "myindex/mytype"),
)

EsSpark.saveJsonToEs(rdd, cfg)

saveToEsWithMeta()

There is another function to provide _id and other metadata for inserting: saveToEsWithMeta() that allows to solve requirement 2 but fails with requirement 1.

val rdd: RDD[(String, String)] = job.map{
  r => r._id -> r.toJson()
}

val cfg = Map(
  ("es.resource", "myindex/mytype"),
)

EsSpark.saveToEsWithMeta(rdd, cfg)

In fact, Elasticsearch is not even able to parse what the elasticsearch-hadoop sends:

Caused by: org.apache.spark.util.TaskCompletionListenerException: Found unrecoverable error [<es_host>:9200] returned Bad Request(400) - failed to parse; Bailing out..
    at org.apache.spark.TaskContextImpl.markTaskCompleted(TaskContextImpl.scala:105)
    at org.apache.spark.scheduler.Task.run(Task.scala:112)

The question

Is it possible to write a collection of (documentID, serializedDocument) from Spark into Elasticsearch (using elasticsearch-hadoop)?

P.S. I am using Elasticsearch 5.6.3 and Spark 2.1.1.

回答1:

Finally I found the problem: it was a typo in the config.

[JsonExtractor for field [superId]] cannot extract value from entity [class java.lang.String] | instance [{...,"superID":"7f48c8ee6a8a"}]

It was looking for a field superID but there was only superID (note the case). In the question it is also a bit misleading since in the code it appears like "es.mapping.id", "superID" (which was not correct).

The actual solution is like Levi Ramsey suggested:

val json = """{"foo":"bar","superID":"deadbeef"}"""

val rdd = spark.makeRDD(Seq(json))
val cfg = Map(
  ("es.mapping.id", "superID"),
  ("es.resource", "myindex/mytype")
)
EsSpark.saveJsonToEs(rdd, cfg = cfg)

The difference is that es.mapping.id cannot be _id (as was indicated in the original post, _id is the metadata and Elasticsearch does not accept it).

Naturally it means that the new field superID should be added to the mapping (unless the mapping is dynamic). If storing additional field in the index is a burden, one should also:

  • exclude it from the mapping
  • and disable its indexing

Thanks a lot to Alex Savitsky for pointing to the correct direction.



回答2:

Have you tried something like:

val rdd: RDD[String] = job.map{ r => r.toJson() }
val cfg = Map(
  ("es.mapping.id", "_id")
)
rdd.saveJsonToEs("myindex/mytype", cfg)

I've tested (with elasticsearch-hadoop (connector version 2.4.5) against ES 1.7) and it works.



回答3:

It can be done by passing ES_INPUT_JSON option to cfg parameters map and returning a tuple containing the document id as the first element and the document serialized in JSON as the second element from the map function.

I tested it with "org.elasticsearch" %% "elasticsearch-spark-20" % "[6.0,7.0[" against Elasticsearch 6.4

import org.elasticsearch.hadoop.cfg.ConfigurationOptions.{ES_INPUT_JSON, ES_NODES}
import org.elasticsearch.spark._
import org.elasticsearch.spark.sql._

job
  .map{ r => (r._id, r.toJson()) }
  .saveToEsWithMeta(
    "myindex/mytype",
    Map(
      ES_NODES -> "https://localhost:9200",
      ES_INPUT_JSON -> true.toString
    )
  )


回答4:

I spent days banging my head against the wall trying to figure out why saveToEsWithMeta would not work when I used a string for the ID like so:

rdd.map(caseClassContainingJson =>
  (caseClassContainingJson._idWhichIsAString, caseClassContainingJson.jsonString)
)
.saveToEsWithMeta(s"$nationalShapeIndexName/$nationalShapeIndexType", Map(
  ES_INPUT_JSON -> true.toString
))

This will throw JSON parsing-related errors, which deceptively leads you towards thinking that the issue is with your JSON, but then you log each one of your JSONs and see they're all valid.

Turns out that for whatever reason ES_INPUT_JSON -> true makes the left-hand side of the tuple, i.e. the ID, get parsed as a JSON too!

the solution, JSON stringify the ID (will wrap ID in extra double quotes) so that parsing it as a JSON works:

rdd.map(caseClassContainingJson =>
  (
    Json.stringify(JsString(caseClassContainingJson._idWhichIsAString)), 
    caseClassContainingJson.jsonString
  )
)
.saveToEsWithMeta(s"$nationalShapeIndexName/$nationalShapeIndexType", Map(
  ES_INPUT_JSON -> true.toString
))