Flink thowing serialization error when reading fro

2019-09-06 10:27发布

When I read from hbase using richfatMapFunction inside a map I am getting serialization error. What I am trying to do is if a datastream equals to a particular string read from hbase else ignore. Below is the sample program and error I am getting.

package com.abb.Flinktest
import java.text.SimpleDateFormat
import java.util.Properties

import scala.collection.concurrent.TrieMap 
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.BinaryComparator
import org.apache.hadoop.hbase.filter.CompareFilter
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Level
import org.apache.flink.api.common.functions.RichMapFunction

object Flinktesthbaseread {

  def main(args:Array[String])
  {
   val env = StreamExecutionEnvironment.createLocalEnvironment()
   val kafkaStream = env.fromElements("hello")
   val c=kafkaStream.map(x => if(x.equals("hello"))kafkaStream.flatMap(new ReadHbase()) )       
   env.execute()
  }
      class ReadHbase extends RichFlatMapFunction[String,Tuple11[String,String,String,String,String,String,String,String,String,String,String]] with Serializable
    {
        var conf: org.apache.hadoop.conf.Configuration = null;
    var table: org.apache.hadoop.hbase.client.HTable = null;
    var hbaseconnection:org.apache.hadoop.hbase.client.Connection =null
    var taskNumber: String = null;
    var rowNumber = 0;
    val serialVersionUID = 1L;

    override def open(parameters: org.apache.flink.configuration.Configuration) {
      println("getting table")
       conf = HBaseConfiguration.create()
      val in = getClass().getResourceAsStream("/hbase-site.xml")

      conf.addResource(in)
      hbaseconnection = ConnectionFactory.createConnection(conf)
      table = new HTable(conf, "testtable");
     // this.taskNumber = String.valueOf(taskNumber);
    }

     override def flatMap(msg:String,out:Collector[Tuple11[String,String,String,String,String,String,String,String,String,String,String]]) 
      {
                //flatmap operation here
      }

      override def close() {

      table.flushCommits();
      table.close();
    }

    }
}

Error:

log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
    at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
    at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:617)
    at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:959)
    at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:484)
    at com.abb.Flinktest.Flinktesthbaseread$.main(Flinktesthbaseread.scala:45)
    at com.abb.Flinktest.Flinktesthbaseread.main(Flinktesthbaseread.scala)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.scala.DataStream
    - field (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", name: "kafkaStream$1", type: "class org.apache.flink.streaming.api.scala.DataStream")
    - root object (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", <function1>)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
    at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
    at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
    at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
    at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
    at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
    at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
    at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
    ... 6 more

I tried wrapping the field inside a method and a class by making the class serializable as wel, but no luck. Could someone throw some lights on this or suggest some workaround for this.

1条回答
看我几分像从前
2楼-- · 2019-09-06 11:17

The problem is that you're trying to access the kafka stream variable in the map function which is simply not serializable. It is just an abstract representation of the data. It doesn't contain anything, which invalidates your function in the first place.

instead, do something like this:

   kafkaStream.filter(x => x.equals("hello")).flatMap(new ReadHBase())

The filter funtion will only retain the elements for which the condition is true, and those will be passed to your flatMap function.

I would highly recommend you to read the basis API concepts documentation, as there appears to be some misunderstanding as to what actually happens when you specify a transformation.

查看更多
登录 后发表回答