Issue: Scala code in Spark shell to retrieve data

2019-01-23 21:58发布

We are trying to execute a simple Scala code in Spark shell to retrieve data from Hbase. The Hadoop environment is Kerberos enabled and we have ensured to execute kinit.

Steps to invoke Spark Shell:

MASTER=yarn-client

DRIVER_CLASSPATH="/opt/cloudera/parcels/CDH/lib/hbase/lib/*"
DRIVER_LIBRARY_PATH="/opt/cloudera/parcels/CDH/lib/hadoop/lib/native"

spark-shell --driver-class-path "$DRIVER_CLASSPATH" --driver-library-path "$DRIVER_LIBRARY_PATH" --driver-memory 10G --executor-memory 15G --executor-cores 8 --num-executors 3 --master $MASTER

Code:

import org.apache.hadoop.fs._
import org.apache.hadoop.hbase._
import org.apache.hadoop.hbase.client._
import org.apache.hadoop.hbase.io._
import org.apache.hadoop.hbase.mapreduce._
import org.apache.hadoop.hbase.util._
import org.apache.spark._

val hc = HBaseConfiguration.create
hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/hbase-site.xml"))

hc.addResource(new Path("file:///opt/cloudera/parcels/CDH/lib/hbase/conf/core-site.xml"))

hc.set(TableInputFormat.INPUT_TABLE, "poc-customers")
val rdd = sc.newAPIHadoopRDD(hc, classOf[TableInputFormat], classOf[ImmutableBytesWritable], classOf[Result])

rdd.count

Following is the ERROR below

org.apache.hadoop.hbase.client.RetriesExhaustedException: Can't get the location
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:308)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:149)
        at org.apache.hadoop.hbase.client.ScannerCallableWithReplicas.call(ScannerCallableWithReplicas.java:57)
        at org.apache.hadoop.hbase.client.RpcRetryingCaller.callWithoutRetries(RpcRetryingCaller.java:200)
        at org.apache.hadoop.hbase.client.ClientScanner.call(ClientScanner.java:293)
        at org.apache.hadoop.hbase.client.ClientScanner.nextScanner(ClientScanner.java:268)
        at org.apache.hadoop.hbase.client.ClientScanner.initializeScannerInConstruction(ClientScanner.java:140)
        at org.apache.hadoop.hbase.client.ClientScanner.<init>(ClientScanner.java:135)
        at org.apache.hadoop.hbase.client.HTable.getScanner(HTable.java:888)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.restart(TableRecordReaderImpl.java:90)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReaderImpl.initialize(TableRecordReaderImpl.java:167)
        at org.apache.hadoop.hbase.mapreduce.TableRecordReader.initialize(TableRecordReader.java:134)
        at org.apache.hadoop.hbase.mapreduce.TableInputFormatBase$1.initialize(TableInputFormatBase.java:200)
        at org.apache.spark.rdd.NewHadoopRDD$$anon$1.<init>(NewHadoopRDD.scala:133)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:104)
        at org.apache.spark.rdd.NewHadoopRDD.compute(NewHadoopRDD.scala:66)
        at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:277)
        at org.apache.spark.rdd.RDD.iterator(RDD.scala:244)
        at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:61)
        at org.apache.spark.scheduler.Task.run(Task.scala:64)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:203)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
        at java.lang.Thread.run(Thread.java:745)
Caused by: java.io.IOException: Could not set up IO Streams to <management-node-server-hostname>/10.118.114.40:60020
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:773)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.writeRequest(RpcClientImpl.java:881)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.tracedWriteRequest(RpcClientImpl.java:850)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl.call(RpcClientImpl.java:1184)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient.callBlockingMethod(AbstractRpcClient.java:216)
        at org.apache.hadoop.hbase.ipc.AbstractRpcClient$BlockingRpcChannelImplementation.callBlockingMethod(AbstractRpcClient.java:300)
        at org.apache.hadoop.hbase.protobuf.generated.ClientProtos$ClientService$BlockingStub.get(ClientProtos.java:31865)
        at org.apache.hadoop.hbase.protobuf.ProtobufUtil.getRowOrBefore(ProtobufUtil.java:1580)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegionInMeta(ConnectionManager.java:1294)
        at org.apache.hadoop.hbase.client.ConnectionManager$HConnectionImplementation.locateRegion(ConnectionManager.java:1126)
        at org.apache.hadoop.hbase.client.RpcRetryingCallerWithReadReplicas.getRegionLocations(RpcRetryingCallerWithReadReplicas.java:299)
        ... 23 more
Caused by: java.lang.RuntimeException: SASL authentication failed. The most likely cause is missing or invalid credentials. Consider 'kinit'.
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$1.run(RpcClientImpl.java:673)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.handleSaslConnectionFailure(RpcClientImpl.java:631)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:739)
        ... 33 more
Caused by: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)]
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:212)
        at org.apache.hadoop.hbase.security.HBaseSaslRpcClient.saslConnect(HBaseSaslRpcClient.java:179)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupSaslConnection(RpcClientImpl.java:605)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.access$600(RpcClientImpl.java:154)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:731)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection$2.run(RpcClientImpl.java:728)
        at java.security.AccessController.doPrivileged(Native Method)
        at javax.security.auth.Subject.doAs(Subject.java:415)
        at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1671)
        at org.apache.hadoop.hbase.ipc.RpcClientImpl$Connection.setupIOstreams(RpcClientImpl.java:728)
        ... 33 more
Caused by: GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)
        at sun.security.jgss.krb5.Krb5InitCredential.getInstance(Krb5InitCredential.java:147)
        at sun.security.jgss.krb5.Krb5MechFactory.getCredentialElement(Krb5MechFactory.java:121)
        at sun.security.jgss.krb5.Krb5MechFactory.getMechanismContext(Krb5MechFactory.java:187)
        at sun.security.jgss.GSSManagerImpl.getMechanismContext(GSSManagerImpl.java:223)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:212)
        at sun.security.jgss.GSSContextImpl.initSecContext(GSSContextImpl.java:179)
        at com.sun.security.sasl.gsskerb.GssKrb5Client.evaluateChallenge(GssKrb5Client.java:193)
        ... 42 more

Please note:

  1. We are able to invoke Hbase shell from the same session and scan records from the same table
  2. We are able to execute a word count for an HDFS file from the same Spark Shell session
  3. We are able to execute the above code in local mode
  4. We are able to perform other operations from the same spark-shell session like – a. val admin = new HBaseAdmin(hc) b. print(admin.isTableAvailable(“poc-customers”))

Looking for help to resolve this issue.

3条回答
2楼-- · 2019-01-23 22:28

I'm working on the same project as the OP. We didn't use Samson Scharfrichter's answer directly, but it lent confidence that this kind of solution was possible. Here's what worked for us:

We are now using the RDD from SparkOnHBase (https://github.com/cloudera-labs/SparkOnHBase), but we have incorporated the change suggested at https://github.com/cloudera-labs/SparkOnHBase/pull/7. Since this pull request is open, its changes can also be implemented by subclassing:

import com.cloudera.spark.hbase.{HBaseContext, HBaseScanRDD}
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.security.UserGroupInformation
import org.apache.hadoop.security.UserGroupInformation.AuthenticationMethod
import org.apache.spark.{SerializableWritable, SparkContext}
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.deploy.SparkHadoopUtil
import org.apache.spark.rdd.RDD

class MyHBaseScanRDD (sc: SparkContext,
    @transient tableName: String,
    @transient scan: Scan,
    configBroadcast: Broadcast[SerializableWritable[Configuration]]) extends HBaseScanRDD(sc, tableName, scan, configBroadcast) {
  val jobCredentialBroadcast = sc.broadcast(new SerializableWritable(jobTransient.getCredentials))

  override def addCreds {
    val creds = SparkHadoopUtil.get.getCurrentUserCredentials
    @transient val ugi = UserGroupInformation.getCurrentUser
    ugi.addCredentials(creds)
    ugi.setAuthenticationMethod(AuthenticationMethod.PROXY)
    ugi.addCredentials(jobCredentialBroadcast.value.value)
  }
}

class MyHBaseContext (sc: SparkContext,
    @transient config: Configuration,
    val tmpHdfsConfigFile: String = null) extends HBaseContext(sc, config, tmpHdfsConfigFile) {
  def myHBaseScanRDD(tableName: String, scan: Scan): RDD[(Array[Byte], java.util.List[(Array[Byte], Array[Byte], Array[Byte])])] = {
    new MyHBaseScanRDD(sc, tableName, scan, broadcastedConf)
  }
}

val hc = HBaseConfiguration.create
val scan = new Scan
val hbaseContext = new MyHBaseContext(sc, hc)
val rdd = hbaseContext.myHBaseScanRDD("tableName", scan)
rdd.count

It looks like these changes have been incorporated into HBase's HBase-Spark module, which is a successor to SparkOnHBase. Versioning issues have kept us from using the newer HBase library, but I'd suggest that anybody facing this problem give it a try first.

查看更多
再贱就再见
3楼-- · 2019-01-23 22:44

When the Spark "driver" requests YARN to spawn its "executors" somewhere in the cluster, it uses its local Kerberos TGT -- the one you created with kinit -- to authenticate. Then YARN issues a global delegation token that is shared by all executors to access HDFS and YARN.

Alas, HBase does not support that delegation token. Each executor must re-authenticate to ZK, then to the actual HBase RegionServer, with a local TGT.

In a perfect world, you would just need to insert two properties in "spark-default.conf" i.e. spark.yarn.principal and spark.yarn.keytab (creating a keytab to store your password is sthg you do with "ktutil" utility)

Alas, that feature was built for long-running Streaming jobs that need to renew their HDFS delegation token (every 7 days, typically), not for HBase initial authentication. Now, the Release Notes for Spark 1.6 show a lot of bug fixes related to YARN and Kerberos, maybe the feature now works out-of-the-box for HBase also. But I wouldn't bet on it.

So what is the workaround?

  1. In the Java code run by the driver, state that the keytab file must be shipped to each executor with an addFile()
  2. In the Java code run by the Executors, explicitly create a Hadoop UserGroupInformation that explicitly gets its own Kerberos TGT from the keytab, before connecting to HBase

Note that when used that way, the UGI keeps its TGT private -- it does not show in the cache, so that other processes on the same machine cannot reuse it (and on the other hand a kinit from another process will not tamper it).

查看更多
兄弟一词,经得起流年.
4楼-- · 2019-01-23 22:50

The root cause of your problem is

GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)

Cloudera Troubleshooting Guide suggests a solution for this problem

Description: A user must have a valid Kerberos ticket in order to interact with a secure Hadoop cluster. Running any Hadoop command (such as hadoop fs -ls) will fail if you do not have a valid Kerberos ticket in your credentials cache. If you do not have a valid ticket, you will receive an error such as:

11/01/04 12:08:12 WARN ipc.Client: Exception encountered while connecting to the server : javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Bad connection to FS. command aborted. exception: Call to nn-host/10.0.0.2:8020 failed on local exception: java.io.IOException: javax.security.sasl.SaslException: GSS initiate failed [Caused by GSSException: No valid credentials provided (Mechanism level: Failed to find any Kerberos tgt)] Solution: You can examine the Kerberos tickets currently in your credentials cache by running the klist command. You can obtain a ticket by running the kinit command and either specifying a keytab file containing credentials, or entering the password for your principal.

You can give a try to the suggested solution.

查看更多
登录 后发表回答