Can't add jars pyspark in jupyter of Google Da

2020-03-28 19:42发布

问题:

I have a Jupyter notebook on DataProc and I need a jar to run some job. I'm aware of editting spark-defaults.conf and using the --jars=gs://spark-lib/bigquery/spark-bigquery-latest.jar to submit the job from command line - they both work well. However, if I want to directly add jar to jupyter notebook, I tried the methods below and they all fail.

Method 1:

import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars gs://spark-lib/bigquery/spark-bigquery-latest.jar pyspark-shell'

Method 2:

spark = SparkSession.builder.appName('Shakespeare WordCount')\
.config('spark.jars', 'gs://spark-lib/bigquery/spark-bigquery-latest.jar')\
.getOrCreate()

They both have the same error:

---------------------------------------------------------------------------
Py4JJavaError                             Traceback (most recent call last)
<ipython-input-1-2b7692efb32b> in <module>()
     19 # Read BQ data into spark dataframe
     20 # This method reads from BQ directly, does not use GCS for intermediate results
---> 21 df = spark.read.format('bigquery').option('table', table).load()
     22 
     23 df.show(5)

/usr/lib/spark/python/pyspark/sql/readwriter.py in load(self, path, format, schema, **options)
    170             return self._df(self._jreader.load(self._spark._sc._jvm.PythonUtils.toSeq(path)))
    171         else:
--> 172             return self._df(self._jreader.load())
    173 
    174     @since(1.4)

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1255         answer = self.gateway_client.send_command(command)
   1256         return_value = get_return_value(
-> 1257             answer, self.gateway_client, self.target_id, self.name)
   1258 
   1259         for temp_arg in temp_args:

/usr/lib/spark/python/pyspark/sql/utils.py in deco(*a, **kw)
     61     def deco(*a, **kw):
     62         try:
---> 63             return f(*a, **kw)
     64         except py4j.protocol.Py4JJavaError as e:
     65             s = e.java_exception.toString()

/usr/lib/spark/python/lib/py4j-0.10.7-src.zip/py4j/protocol.py in get_return_value(answer, gateway_client, target_id, name)
    326                 raise Py4JJavaError(
    327                     "An error occurred while calling {0}{1}{2}.\n".
--> 328                     format(target_id, ".", name), value)
    329             else:
    330                 raise Py4JError(

Py4JJavaError: An error occurred while calling o81.load.
: java.lang.ClassNotFoundException: Failed to find data source: bigquery. Please find packages at http://spark.apache.org/third-party-projects.html
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:657)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:194)
    at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:167)
    at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    at java.lang.reflect.Method.invoke(Method.java:498)
    at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
    at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
    at py4j.Gateway.invoke(Gateway.java:282)
    at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
    at py4j.commands.CallCommand.execute(CallCommand.java:79)
    at py4j.GatewayConnection.run(GatewayConnection.java:238)
    at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.ClassNotFoundException: bigquery.DefaultSource
    at java.net.URLClassLoader.findClass(URLClassLoader.java:382)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:424)
    at java.lang.ClassLoader.loadClass(ClassLoader.java:357)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20$$anonfun$apply$12.apply(DataSource.scala:634)
    at scala.util.Try$.apply(Try.scala:192)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at org.apache.spark.sql.execution.datasources.DataSource$$anonfun$20.apply(DataSource.scala:634)
    at scala.util.Try.orElse(Try.scala:84)
    at org.apache.spark.sql.execution.datasources.DataSource$.lookupDataSource(DataSource.scala:634)
    ... 13 more

The task I try to run is very simple:

table = 'publicdata.samples.shakespeare'

df = spark.read.format('bigquery').option('table', table).load()

df.show(5)

I understand there are many similar questions and answers, but they are either not working or not fitting my needs. There are ad-hoc jars I'll need and I don't want to keep all of them in the default configurations. I'd like to be more flexible and add jars on-the-go. How can I solve this? Thank you!

回答1:

Unfortunately there isn't a built-in way to do this dynamically without effectively just editing spark-defaults.conf and restarting the kernel. There's an open feature request in Spark for this.

Zeppelin has some usability features for adding jars through the UI but even in Zeppelin you have to restart the interpreter after doing so for the Spark context to pick it up in its classloader. And also those options require the jarfiles to already be staged on the local filesystem; you can't just refer to remote file paths or URLs.

One workaround would be to create an init action which sets up a systemd service which regularly polls on some HDFS directory to sync into one of the existing classpath directories like /usr/lib/spark/jars:

#!/bin/bash
# Sets up continuous sync'ing of an HDFS directory into /usr/lib/spark/jars

# Manually copy jars into this HDFS directory to have them sync into
# ${LOCAL_DIR} on all nodes.
HDFS_DROPZONE='hdfs:///usr/lib/jars'
LOCAL_DIR='file:///usr/lib/spark/jars'

readonly ROLE="$(/usr/share/google/get_metadata_value attributes/dataproc-role)"
if [[ "${ROLE}" == 'Master' ]]; then
  hdfs dfs -mkdir -p "${HDFS_DROPZONE}"
fi

SYNC_SCRIPT='/usr/lib/hadoop/libexec/periodic-sync-jars.sh'
cat << EOF > "${SYNC_SCRIPT}"
#!/bin/bash
while true; do
  sleep 5
  hdfs dfs -ls ${HDFS_DROPZONE}/*.jar 2>/dev/null | grep hdfs: | \
    sed 's/.*hdfs:/hdfs:/' | xargs -n 1 basename 2>/dev/null | sort \
    > /tmp/hdfs_files.txt
  hdfs dfs -ls ${LOCAL_DIR}/*.jar 2>/dev/null | grep file: | \
    sed 's/.*file:/file:/' | xargs -n 1 basename 2>/dev/null | sort \
    > /tmp/local_files.txt
  comm -23 /tmp/hdfs_files.txt /tmp/local_files.txt > /tmp/diff_files.txt
  if [ -s /tmp/diff_files.txt ]; then
    for FILE in \$(cat /tmp/diff_files.txt); do
      echo "$(date): Copying \${FILE} from ${HDFS_DROPZONE} into ${LOCAL_DIR}"
      hdfs dfs -cp "${HDFS_DROPZONE}/\${FILE}" "${LOCAL_DIR}/\${FILE}"
    done
  fi
done
EOF

chmod 755 "${SYNC_SCRIPT}"

SERVICE_CONF='/usr/lib/systemd/system/sync-jars.service'
cat << EOF > "${SERVICE_CONF}"
[Unit]
Description=Period Jar Sync
[Service]
Type=simple
ExecStart=/bin/bash -c '${SYNC_SCRIPT} &>> /var/log/periodic-sync-jars.log'
Restart=on-failure
[Install]
WantedBy=multi-user.target
EOF

chmod a+rw "${SERVICE_CONF}"

systemctl daemon-reload
systemctl enable sync-jars
systemctl restart sync-jars
systemctl status sync-jars

Then, whenever you need a jarfile to be available everywhere you just copy the jarfile into hdfs:///usr/lib/jars, and the periodic poller will automatically stick it into /usr/lib/spark/jars and then you simply restart your kernel to pick it up. You can add jars to that HDFS directory either by SSH'ing in and running hdfs dfs -cp directly, or simply subprocess out from your Jupyter notebook:

import subprocess
sp = subprocess.Popen(
    ['hdfs', 'dfs', '-cp',
     'gs://spark-lib/bigquery/spark-bigquery-latest.jar',
     'hdfs:///usr/lib/jars/spark-bigquery-latest.jar'],
    stdout=subprocess.PIPE,
    stderr=subprocess.PIPE)
out, err = sp.communicate()
print(out)
print(err)