How to add jar using HiveContext in the spark job

2019-07-19 17:04发布

问题:

I am trying to add JSONSerDe jar file to in order to access the json data load the JSON data to hive table from the spark job. My code is shown below:

SparkConf  sparkConf = new SparkConf().setAppName("KafkaStreamToHbase");
        JavaSparkContext sc = new JavaSparkContext(sparkConf);
        JavaStreamingContext jssc = new JavaStreamingContext(sc, Durations.seconds(10));
        final SQLContext sqlContext = new SQLContext(sc);
        final HiveContext hiveContext = new HiveContext(sc);
hiveContext.sql("ADD JAR hdfs://localhost:8020/tmp/hive-serdes-1.0-SNAPSHOT.jar");

                hiveContext.sql("LOAD DATA INPATH '/tmp/mar08/part-00000' OVERWRITE INTO TABLE testjson");

But I end up the following error:

java.net.MalformedURLException: unknown protocol: hdfs
        at java.net.URL.<init>(URL.java:592)
        at java.net.URL.<init>(URL.java:482)
        at java.net.URL.<init>(URL.java:431)
        at java.net.URI.toURL(URI.java:1096)
        at org.apache.spark.sql.hive.client.ClientWrapper.addJar(ClientWrapper.scala:578)
        at org.apache.spark.sql.hive.HiveContext.addJar(HiveContext.scala:652)
        at org.apache.spark.sql.hive.execution.AddJar.run(commands.scala:89)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult$lzycompute(commands.scala:58)
        at org.apache.spark.sql.execution.ExecutedCommand.sideEffectResult(commands.scala:56)
        at org.apache.spark.sql.execution.ExecutedCommand.doExecute(commands.scala:70)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:132)
        at org.apache.spark.sql.execution.SparkPlan$$anonfun$execute$5.apply(SparkPlan.scala:130)
        at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
        at org.apache.spark.sql.execution.SparkPlan.execute(SparkPlan.scala:130)
        at org.apache.spark.sql.execution.QueryExecution.toRdd$lzycompute(QueryExecution.scala:55)
        at org.apache.spark.sql.execution.QueryExecution.toRdd(QueryExecution.scala:55)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:145)
        at org.apache.spark.sql.DataFrame.<init>(DataFrame.scala:130)
        at org.apache.spark.sql.DataFrame$.apply(DataFrame.scala:52)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:817)
        at com.macys.apm.kafka.spark.parquet.KafkaStreamToHbase$2.call(KafkaStreamToHbase.java:148)
        at com.macys.apm.kafka.spark.parquet.KafkaStreamToHbase$2.call(KafkaStreamToHbase.java:141)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:327)
        at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$foreachRDD$2.apply(JavaDStreamLike.scala:327)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply$mcV$sp(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1$$anonfun$apply$mcV$sp$1.apply(ForEachDStream.scala:50)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply$mcV$sp(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at org.apache.spark.streaming.dstream.ForEachDStream$$anonfun$1.apply(ForEachDStream.scala:49)
        at scala.util.Try$.apply(Try.scala:161)

I was able to add the jar through hive shell. But it throws an error when I was trying to add using hiveContext.sql() in the spark job(javacode). Quick help will be a great helpful.

Thanks.

回答1:

One work around is you can pass the udf jars at run time by passing --jars to spark-submit command or you can copy those required jars to spark libs.

Basically it supports file, hdfs and ivy schemes.

Which version of spark you are using. I am not able see addJar method in ClientWrapper.scala in the latest version.



回答2:

I just looked into spark code. Seems like it is an issue from spark side. They are using simple java.net.Uri for getting scheme. Java URI class does not understands hdfs scheme. Ideally they have to register FsUrlStreamHandlerFactory.java (i.e. [ hdfs-link https://github.com/apache/hadoop/blob/trunk/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/FsUrlStreamHandlerFactory.java]) to the URI.

You can add jars from local file system or you can pass at jars at job submission time or you can copy the jars to spark lib folder.