The original question was trying to deploy spark 1.4 on Google Cloud. After downloaded and set
SPARK_HADOOP2_TARBALL_URI='gs://my_bucket/my-images/spark-1.4.1-bin-hadoop2.6.tgz'
deployment with bdutil was fine; however, when trying to call SqlContext.parquetFile("gs://my_bucket/some_data.parquet"), it runs into following exception:
java.lang.ClassCastException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to org.apache.hadoop.fs.FileSystem
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2595)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:354)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.hive.metastore.Warehouse.getFs(Warehouse.java:112)
at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:144)
at org.apache.hadoop.hive.metastore.Warehouse.getWhRoot(Warehouse.java:159)
And what confused me is that GoogleHadoopFileSystem should be a subclass of org.apache.hadoop.fs.FileSystem, and I even verified in the same spark-shell instance:
scala> var gfs = new com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem()
gfs: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem = com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem@46f105c
scala> gfs.isInstanceOf[org.apache.hadoop.fs.FileSystem]
res3: Boolean = true
scala> gfs.asInstanceOf[org.apache.hadoop.fs.FileSystem]
res4: org.apache.hadoop.fs.FileSystem = com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem@46f105c
Did I miss anything, any workaround? Thanks in advance!
UPDATE: this is my bdutil (version 1.3.1) setting for deployment:
import_env hadoop2_env.sh
import_env extensions/spark/spark_env.sh
CONFIGBUCKET="my_conf_bucket"
PROJECT="my_proj"
GCE_IMAGE='debian-7-backports'
GCE_MACHINE_TYPE='n1-highmem-4'
GCE_ZONE='us-central1-f'
GCE_NETWORK='my-network'
GCE_MASTER_MACHINE_TYPE='n1-standard-2'
PREEMPTIBLE_FRACTION=1.0
PREFIX='my-hadoop'
NUM_WORKERS=8
USE_ATTACHED_PDS=true
WORKER_ATTACHED_PDS_SIZE_GB=200
MASTER_ATTACHED_PD_SIZE_GB=200
HADOOP_TARBALL_URI="gs://hadoop-dist/hadoop-2.6.0.tar.gz"
SPARK_MODE="yarn-client"
SPARK_HADOOP2_TARBALL_URI='gs://my_conf_bucket/my-images/spark-1.4.1-bin-hadoop2.6.tgz'
Short Answer
Indeed it was related to IsolatedClientLoader, and we've tracked down the root cause and verified a fix. I filed https://issues.apache.org/jira/browse/SPARK-9206 to track this issue, and successfully built a clean Spark tarball from my fork with a simple fix: https://github.com/apache/spark/pull/7549
There are a few short-term options:
--default_fs=hdfs
); you'll still be able to directly specifygs://
paths in your jobs, just that HDFS will be used for intermediate data and staging files. There are some minor incompatibilities using raw Hive in this mode, though.val sqlContext = new org.apache.spark.sql.SQLContext(sc)
instead of a HiveContext if you don't need HiveContext features.git clone https://github.com/dennishuo/spark
and run./make-distribution.sh --name my-custom-spark --tgz --skip-java-test -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver
to get a fresh tarball you can specify in your bdutil'sspark_env.sh
.Long Answer
We've verified that it only manifests when
fs.default.name
andfs.defaultFS
are set to ags://
path regardless of whether trying to load a path fromparquetFile("gs://...")
orparquetFile("hdfs://...")
, and whenfs.default.name
andfs.defaultFS
are set to an HDFS path, loading data from both HDFS and from GCS works fine. This is also specific to Spark 1.4+ currently, and is not present in Spark 1.3.1 or older.The regression appears to have been introduced in https://github.com/apache/spark/commit/9ac8393663d759860c67799e000ec072ced76493 which actually fixes a prior related classloading issue, SPARK-8368. While the fix itself is correct for normal cases, there's a method IsolatedClientLoader.isSharedClass used to determine which classloader to use, and interacts with the aforementioned commit to break GoogleHadoopFileSystem classloading.
The following lines in that file include everything under
com.google.*
as a "shared class" because of Guava and possibly protobuf dependencies which are indeed loaded as shared libraries, but unfortunately GoogleHadoopFileSystem should be loaded as a "hive class" in this case, just likeorg.apache.hadoop.hdfs.DistributedFileSystem
. We just happen to unluckily share thecom.google.*
package namespace.This can be verified by adding the following line to
${SPARK_INSTALL}/conf/log4j.properties
:And the output shows: