Spark on YARN confilict with Elasticsearch Transpo

2019-09-09 12:17发布

I want to run a Spark-job on a Google Cloud VM cluster and inside a map operation I need to make a query on elastic search. My problem is that Spark and Elastic Search have a conflict on the Guava library, as Spark is using Guava 14 and ES Guava 18.

My problem is this method call com.google.common.util.concurrent.MoreExecutors.directExecutor(), which exists in Guava 18, but not in Guava 14.

In more detail the job I am trying to do is something like the following.

 input.map(record=>{
    val client=openConnection()
    val newdata=client.query(record.someInfo)
      new record(newdata)
})

The method openConnection is shown below

 public static TransportClient openConnection(String ipAddress, int ipPort) throws UnknownHostException {


    Settings settings = Settings.settingsBuilder().put("cluster.name", "elasticsearch").build();
    TransportClient client = TransportClient.builder().settings(settings).build().
            addTransportAddress(new InetSocketTransportAddress(InetAddress.getByName(ipAddress), ipPort));

    return client;

}

I have tried to use shading to force ES to use Guava 18 by adding a shading rule in the sbt file as follows:

libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies += "org.apache.spark" % "spark-graphx_2.10" % "1.6.0" % "provided"  ,
libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided" ,
libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "

 libraryDependencies += "org.elasticsearch" % "elasticsearch" % "2.2.0",

 assemblyShadeRules in assembly :=Seq(
  ShadeRule.rename("com.google.common.*" -> "googlecommona.@1").
    inLibrary("org.elasticsearch" % "elasticsearch" % "2.2.0"))

The problem however seems to remain. Is there a way to resolve this confict?

2条回答
萌系小妹纸
2楼-- · 2019-09-09 12:42

You can't exclude transitional dependencies from provided dependencies. By marking it provided, you tell the packager to not put it in the final jar, because you assume it will be already on the classpath where it will be deployed.

So you can't exclude transitional dependencies, because the entire dependency itself has been excluded.

Your options are:

  • excluding the dependency from the ElasticSearch library: this will mean that ES will use the Guava which is provided by Spark, and might be an incompatible version
  • switching to an ElasticSearch library which has the same Guava version (differences in minor versions are mostly compatible, although you might need to exclude it still)
  • switching to a Spark version which has the same Guava version as ElasticSearch.
  • Using shading: this is now available in the sbt-assembly SBT plugin. Shading is the renaming of classes. You know that ElasticSearch and Spark both have their own version of Guava, which they are also including for themselves as a dependency. So you instruct SBT to rename the Guava provided by ES to Guava1, and the Guava provided by Spark to Guava2, and each reference in Spark to Guava will be renamed to Guava1, and the same with ES to Guava2.

You can read about the sbt-assembly shading here.

查看更多
Rolldiameter
3楼-- · 2019-09-09 12:56

Shading was the answer: I added the following rule in the build.sbt file.

The solution below, works for a SPARK-cluster over YARN that uses the ElasticSearch TransportClient class.

  assemblyShadeRules in assembly :=Seq(
      ShadeRule.rename("com.google.**" -> "googlecommona.@1").inAll
  )

I attach the whole sbt file for completeness:

import sbt.ExclusionRule
import sbt.Keys._

lazy val root = (project in file(".")).
  settings(
  name := "scala_code",
  version := "1.0",
  scalaVersion := "2.10.6",
  conflictManager := ConflictManager.latestRevision,
  test in assembly := {},
  assemblyMergeStrategy in assembly := {
      case PathList("META-INF", "MANIFEST.MF") => MergeStrategy.discard
      case _ => MergeStrategy.first
  },

  parallelExecution in test := false,
  libraryDependencies += "com.fasterxml.jackson.module" % "jackson-module-scala_2.10" % "2.6.5",
  libraryDependencies += "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"  exclude("javax.servlet", "servlet-api"),
  libraryDependencies += "org.wikidata.wdtk" % "wdtk-datamodel" % "0.6.0" exclude ("com.fasterxml.jackson.core",  "jackson-annotations"),
  libraryDependencies += "org.apache.spark" % "spark-graphx_2.10" % "1.6.0" % "provided"  ,
  libraryDependencies += "org.apache.spark" % "spark-sql_2.10" % "1.6.0" % "provided" ,
  libraryDependencies += "org.apache.spark" % "spark-streaming_2.10" % "1.6.0" % "provided",
  libraryDependencies += "org.scalatest" % "scalatest_2.10" % "2.0" % "test",
 += "com.typesafe" % "config" % "1.2.1",
  libraryDependencies += "org.jsoup" % "jsoup" % "1.8.3",
  libraryDependencies += "org.elasticsearch" % "elasticsearch" % "2.2.0",// exclude("com.google.guava", "guava"),

  assemblyShadeRules in assembly :=Seq(
      ShadeRule.rename("com.google.**" -> "googlecommona.@1").inAll
  )

)
查看更多
登录 后发表回答