GoogleHadoopFileSystem cannot be cast to hadoop Fi

2019-04-12 07:35发布

问题:

The original question was trying to deploy spark 1.4 on Google Cloud. After downloaded and set

SPARK_HADOOP2_TARBALL_URI='gs://my_bucket/my-images/spark-1.4.1-bin-hadoop2.6.tgz'

deployment with bdutil was fine; however, when trying to call SqlContext.parquetFile("gs://my_bucket/some_data.parquet"), it runs into following exception:

 java.lang.ClassCastException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to org.apache.hadoop.fs.FileSystem
at org.apache.hadoop.fs.FileSystem.createFileSystem(FileSystem.java:2595)
at org.apache.hadoop.fs.FileSystem.access$200(FileSystem.java:91)
at org.apache.hadoop.fs.FileSystem$Cache.getInternal(FileSystem.java:2630)
at org.apache.hadoop.fs.FileSystem$Cache.get(FileSystem.java:2612)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:370)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:169)
at org.apache.hadoop.fs.FileSystem.get(FileSystem.java:354)
at org.apache.hadoop.fs.Path.getFileSystem(Path.java:296)
at org.apache.hadoop.hive.metastore.Warehouse.getFs(Warehouse.java:112)
at org.apache.hadoop.hive.metastore.Warehouse.getDnsPath(Warehouse.java:144)
at org.apache.hadoop.hive.metastore.Warehouse.getWhRoot(Warehouse.java:159)

And what confused me is that GoogleHadoopFileSystem should be a subclass of org.apache.hadoop.fs.FileSystem, and I even verified in the same spark-shell instance:

scala> var gfs = new com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem()
gfs: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem = com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem@46f105c

scala> gfs.isInstanceOf[org.apache.hadoop.fs.FileSystem]
res3: Boolean = true

scala> gfs.asInstanceOf[org.apache.hadoop.fs.FileSystem]
res4: org.apache.hadoop.fs.FileSystem = com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem@46f105c

Did I miss anything, any workaround? Thanks in advance!

UPDATE: this is my bdutil (version 1.3.1) setting for deployment:

import_env hadoop2_env.sh
import_env extensions/spark/spark_env.sh
CONFIGBUCKET="my_conf_bucket"
PROJECT="my_proj"
GCE_IMAGE='debian-7-backports'
GCE_MACHINE_TYPE='n1-highmem-4'
GCE_ZONE='us-central1-f'
GCE_NETWORK='my-network'
GCE_MASTER_MACHINE_TYPE='n1-standard-2'
PREEMPTIBLE_FRACTION=1.0
PREFIX='my-hadoop'
NUM_WORKERS=8
USE_ATTACHED_PDS=true
WORKER_ATTACHED_PDS_SIZE_GB=200
MASTER_ATTACHED_PD_SIZE_GB=200
HADOOP_TARBALL_URI="gs://hadoop-dist/hadoop-2.6.0.tar.gz"
SPARK_MODE="yarn-client"
SPARK_HADOOP2_TARBALL_URI='gs://my_conf_bucket/my-images/spark-1.4.1-bin-hadoop2.6.tgz'

回答1:

Short Answer

Indeed it was related to IsolatedClientLoader, and we've tracked down the root cause and verified a fix. I filed https://issues.apache.org/jira/browse/SPARK-9206 to track this issue, and successfully built a clean Spark tarball from my fork with a simple fix: https://github.com/apache/spark/pull/7549

There are a few short-term options:

  1. Use Spark 1.3.1 for now.
  2. In your bdutil deployment, use HDFS as the default filesystem (--default_fs=hdfs); you'll still be able to directly specify gs:// paths in your jobs, just that HDFS will be used for intermediate data and staging files. There are some minor incompatibilities using raw Hive in this mode, though.
  3. Use the raw val sqlContext = new org.apache.spark.sql.SQLContext(sc) instead of a HiveContext if you don't need HiveContext features.
  4. git clone https://github.com/dennishuo/spark and run ./make-distribution.sh --name my-custom-spark --tgz --skip-java-test -Pyarn -Phadoop-2.6 -Dhadoop.version=2.6.0 -Phive -Phive-thriftserver to get a fresh tarball you can specify in your bdutil's spark_env.sh.

Long Answer

We've verified that it only manifests when fs.default.name and fs.defaultFS are set to a gs:// path regardless of whether trying to load a path from parquetFile("gs://...") or parquetFile("hdfs://..."), and when fs.default.name and fs.defaultFS are set to an HDFS path, loading data from both HDFS and from GCS works fine. This is also specific to Spark 1.4+ currently, and is not present in Spark 1.3.1 or older.

The regression appears to have been introduced in https://github.com/apache/spark/commit/9ac8393663d759860c67799e000ec072ced76493 which actually fixes a prior related classloading issue, SPARK-8368. While the fix itself is correct for normal cases, there's a method IsolatedClientLoader.isSharedClass used to determine which classloader to use, and interacts with the aforementioned commit to break GoogleHadoopFileSystem classloading.

The following lines in that file include everything under com.google.* as a "shared class" because of Guava and possibly protobuf dependencies which are indeed loaded as shared libraries, but unfortunately GoogleHadoopFileSystem should be loaded as a "hive class" in this case, just like org.apache.hadoop.hdfs.DistributedFileSystem. We just happen to unluckily share the com.google.* package namespace.

protected def isSharedClass(name: String): Boolean =
  name.contains("slf4j") ||
  name.contains("log4j") ||
  name.startsWith("org.apache.spark.") ||
  name.startsWith("scala.") ||
  name.startsWith("com.google") ||
  name.startsWith("java.lang.") ||
  name.startsWith("java.net") ||
  sharedPrefixes.exists(name.startsWith)

...

/** The classloader that is used to load an isolated version of Hive. */
protected val classLoader: ClassLoader = new URLClassLoader(allJars, rootClassLoader) {
  override def loadClass(name: String, resolve: Boolean): Class[_] = {
    val loaded = findLoadedClass(name)
    if (loaded == null) doLoadClass(name, resolve) else loaded
  }

  def doLoadClass(name: String, resolve: Boolean): Class[_] = {
    ...
    } else if (!isSharedClass(name)) {
      logDebug(s"hive class: $name - ${getResource(classToPath(name))}")
      super.loadClass(name, resolve)
    } else {
      // For shared classes, we delegate to baseClassLoader.
      logDebug(s"shared class: $name")
      baseClassLoader.loadClass(name)
    }
  }
}

This can be verified by adding the following line to ${SPARK_INSTALL}/conf/log4j.properties:

log4j.logger.org.apache.spark.sql.hive.client=DEBUG

And the output shows:

...
15/07/20 20:59:14 DEBUG IsolatedClientLoader: hive class: org.apache.hadoop.hdfs.DistributedFileSystem - jar:file:/home/hadoop/spark-install/lib/spark-assembly-1.4.1-hadoop2.6.0.jar!/org/apache/hadoop/hdfs/DistributedFileSystem.class
...
15/07/20 20:59:14 DEBUG IsolatedClientLoader: shared class: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
java.lang.RuntimeException: java.lang.ClassCastException: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem cannot be cast to org.apache.hadoop.fs.FileSystem