pyspark: ship jar dependency with spark-submit

2020-02-14 04:46发布

问题:

I wrote a pyspark script that reads two json files, coGroup them and sends the result to an elasticsearch cluster; everything works (mostly) as expected when I run it locally, I downloaded the elasticsearch-hadoop jar file for the org.elasticsearch.hadoop.mr.EsOutputFormat and org.elasticsearch.hadoop.mr.LinkedMapWritable classes, and then run my job with pyspark using the --jars argument, and I can see documents appearing in my elasticsearch cluster.

When I try to run it on a spark cluster, however, I'm getting this error:

Traceback (most recent call last):
  File "/root/spark/spark_test.py", line 141, in <module>
    conf=es_write_conf
  File "/root/spark/python/pyspark/rdd.py", line 1302, in saveAsNewAPIHadoopFile
    keyConverter, valueConverter, jconf)
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__
  File "/root/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value
py4j.protocol.Py4JJavaError: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile.
: java.lang.ClassNotFoundException: org.elasticsearch.hadoop.mr.LinkedMapWritable
    at java.net.URLClassLoader$1.run(URLClassLoader.java:366)
    at java.net.URLClassLoader$1.run(URLClassLoader.java:355)
    at java.security.AccessController.doPrivileged(Native Method)
    at java.net.URLClassLoader.findClass(URLClassLoader.java:354)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:425)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:358)
    at java.lang.Class.forName0(Native Method)
    at java.lang.Class.forName(Class.java:274)
    at org.apache.spark.util.Utils$.classForName(Utils.scala:157)
    at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$1$$anonfun$apply$9.apply(PythonRDD.scala:611)
    at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$1$$anonfun$apply$9.apply(PythonRDD.scala:610)
    at scala.Option.map(Option.scala:145)
    at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$1.apply(PythonRDD.scala:610)
    at org.apache.spark.api.python.PythonRDD$$anonfun$getKeyValueTypes$1.apply(PythonRDD.scala:609)
    at scala.Option.flatMap(Option.scala:170)
    at org.apache.spark.api.python.PythonRDD$.getKeyValueTypes(PythonRDD.scala:609)
    at org.apache.spark.api.python.PythonRDD$.saveAsNewAPIHadoopFile(PythonRDD.scala:701)
    at org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile(PythonRDD.scala)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:606)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:231)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:379)
    at py4j.Gateway.invoke(Gateway.java:259)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:133)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:207)
    at java.lang.Thread.run(Thread.java:745)

which seems pretty clear to me: the elasticsearch-hadoop jar is not available on the workers; so the question: how do I send it along with my app? I could use sc.addPyFile for a python dependency, but it won't work with jars, and using the --jars parameters of spark-submit doesn't help.

回答1:

The --jars just works; the problem is how I run the spark-submit job in the first place; the correct way to execute is:

./bin/spark-submit <options> scriptname

Therefore the --jars option must be placed before the script:

./bin/spark-submit --jars /path/to/my.jar myscript.py

This if obvious if you think that this is the only way to pass arguments to the script itself, as everything after the script name will be used as input arguments for the script:

./bin/spark-submit --jars /path/to/my.jar myscript.py --do-magic=true