Issue on Spark Streaming data put data into HBase

2019-07-09 08:19发布

I am a beginner in this field, so I can not get a sense of it...

  • HBase ver: 0.98.24-hadoop2
  • Spark ver: 2.1.0

The following code try to put receiving data from Spark Streming-Kafka producer into HBase.

  • Kafka input data format is like this :

    Line1,TAG1,123
    Line1,TAG2,134

Spark-streaming process split the receiving line by delimiter ',' then put the data into HBase. However, my application met an error when it call the htable.put() method. Can any one help why the below code is throwing error?

Thank you.

JavaDStream<String> records = lines.flatMap(new FlatMapFunction<String, String>() {   
    private static final long serialVersionUID = 7113426295831342436L;

    HTable htable; 
    public HTable set() throws IOException{ 
        Configuration hconfig = HBaseConfiguration.create();
        hconfig.set("hbase.zookeeper.property.clientPort", "2222");
        hconfig.set("hbase.zookeeper.quorum", "127.0.0.1");  

        HConnection hconn = HConnectionManager.createConnection(hconfig);  

        htable = new HTable(hconfig, tableName); 

        return htable;  
    };  
    @Override
    public Iterator<String> call(String x) throws IOException {  

        ////////////// Put into HBase   ///////////////////// 
        String[] data = x.split(",");   

        if (null != data && data.length > 2 ){ 
            SimpleDateFormat sdf = new SimpleDateFormat("yyyyMMddHHmmss");   
            String ts = sdf.format(new Date());  

            Put put = new Put(Bytes.toBytes(ts)); 

            put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("LINEID"), Bytes.toBytes(data[0]));
            put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("TAGID"), Bytes.toBytes(data[1]));
            put.addImmutable(Bytes.toBytes(familyName), Bytes.toBytes("VAL"), Bytes.toBytes(data[2]));

/*I've checked data passed like this 
{"totalColumns":3,"row":"20170120200927",
"families":{"TAGVALUE":
[{"qualifier":"LINEID","vlen":3,"tag[],  "timestamp":9223372036854775807},
{"qualifier":"TAGID","vlen":3,"tag":[],"timestamp":9223372036854775807},
{"qualifier":"VAL","vlen":6,"tag" [],"timestamp":9223372036854775807}]}}*/ 


//********************* ERROR *******************//   
            htable.put(put);  
            htable.close();  


        }

        return Arrays.asList(COLDELIM.split(x)).iterator(); 
    } 
}); 

ERRO Code :

Exception in thread "main" org.apache.spark.SparkException: Job 

aborted due to stage failure: Task 0 in stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23, localhost, executor driver): java.lang.NullPointerException
at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:154)
at org.test.avro.sparkAvroConsumer$2.call(sparkAvroConsumer.java:123)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171)
at org.apache.spark.streaming.api.java.JavaDStreamLike$$anonfun$fn$1$1.apply(JavaDStreamLike.scala:171)
at scala.collection.Iterator$$anon$12.nextCur(Iterator.scala:434)
at scala.collection.Iterator$$anon$12.hasNext(Iterator.scala:440)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:389)
at scala.collection.Iterator$class.foreach(Iterator.scala:893)
at scala.collection.AbstractIterator.foreach(Iterator.scala:1336)
at scala.collection.generic.Growable$class.$plus$plus$eq(Growable.scala:59)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:104)
at scala.collection.mutable.ArrayBuffer.$plus$plus$eq(ArrayBuffer.scala:48)
at scala.collection.TraversableOnce$class.to(TraversableOnce.scala:310)
at scala.collection.AbstractIterator.to(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toBuffer(TraversableOnce.scala:302)
at scala.collection.AbstractIterator.toBuffer(Iterator.scala:1336)
at scala.collection.TraversableOnce$class.toArray(TraversableOnce.scala:289)
at scala.collection.AbstractIterator.toArray(Iterator.scala:1336)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.rdd.RDD$$anonfun$take$1$$anonfun$29.apply(RDD.scala:1353)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.SparkContext$$anonfun$runJob$5.apply(SparkContext.scala:1944)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:87)
at org.apache.spark.scheduler.Task.run(Task.scala:99)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:282)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
at java.lang.Thread.run(Thread.java:745)

1条回答
混吃等死
2楼-- · 2019-07-09 08:52

you are not calling this method public HTable set() throws IOException which returns htable instance.

Since htable instance is null and you are trying to do operation on null

htable.put() 

you are getting NPE like below

 stage 23.0 failed 1 times, most recent failure: Lost task 0.0 in stage 23.0 (TID 23, localhost, executor driver): java.lang.NullPointerException
查看更多
登录 后发表回答