When I read from hbase using richfatMapFunction inside a map I am getting serialization error. What I am trying to do is if a datastream equals to a particular string read from hbase else ignore. Below is the sample program and error I am getting.
package com.abb.Flinktest
import java.text.SimpleDateFormat
import java.util.Properties
import scala.collection.concurrent.TrieMap
import org.apache.flink.addons.hbase.TableInputFormat
import org.apache.flink.api.common.functions.RichFlatMapFunction
import org.apache.flink.api.common.io.OutputFormat
import org.apache.flink.api.java.tuple.Tuple2
import org.apache.flink.streaming.api.scala.DataStream
import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
import org.apache.flink.streaming.api.scala.createTypeInformation
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer08
import org.apache.flink.streaming.util.serialization.SimpleStringSchema
import org.apache.flink.util.Collector
import org.apache.hadoop.hbase.HBaseConfiguration
import org.apache.hadoop.hbase.TableName
import org.apache.hadoop.hbase.client.ConnectionFactory
import org.apache.hadoop.hbase.client.HTable
import org.apache.hadoop.hbase.client.Put
import org.apache.hadoop.hbase.client.Result
import org.apache.hadoop.hbase.client.Scan
import org.apache.hadoop.hbase.filter.BinaryComparator
import org.apache.hadoop.hbase.filter.CompareFilter
import org.apache.hadoop.hbase.filter.SingleColumnValueFilter
import org.apache.hadoop.hbase.util.Bytes
import org.apache.log4j.Level
import org.apache.flink.api.common.functions.RichMapFunction
object Flinktesthbaseread {
def main(args:Array[String])
{
val env = StreamExecutionEnvironment.createLocalEnvironment()
val kafkaStream = env.fromElements("hello")
val c=kafkaStream.map(x => if(x.equals("hello"))kafkaStream.flatMap(new ReadHbase()) )
env.execute()
}
class ReadHbase extends RichFlatMapFunction[String,Tuple11[String,String,String,String,String,String,String,String,String,String,String]] with Serializable
{
var conf: org.apache.hadoop.conf.Configuration = null;
var table: org.apache.hadoop.hbase.client.HTable = null;
var hbaseconnection:org.apache.hadoop.hbase.client.Connection =null
var taskNumber: String = null;
var rowNumber = 0;
val serialVersionUID = 1L;
override def open(parameters: org.apache.flink.configuration.Configuration) {
println("getting table")
conf = HBaseConfiguration.create()
val in = getClass().getResourceAsStream("/hbase-site.xml")
conf.addResource(in)
hbaseconnection = ConnectionFactory.createConnection(conf)
table = new HTable(conf, "testtable");
// this.taskNumber = String.valueOf(taskNumber);
}
override def flatMap(msg:String,out:Collector[Tuple11[String,String,String,String,String,String,String,String,String,String,String]])
{
//flatmap operation here
}
override def close() {
table.flushCommits();
table.close();
}
}
}
Error:
log4j:WARN No appenders could be found for logger (org.apache.flink.api.scala.ClosureCleaner$).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" org.apache.flink.api.common.InvalidProgramException: Task not serializable
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:172)
at org.apache.flink.api.scala.ClosureCleaner$.clean(ClosureCleaner.scala:164)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.scalaClean(StreamExecutionEnvironment.scala:617)
at org.apache.flink.streaming.api.scala.DataStream.clean(DataStream.scala:959)
at org.apache.flink.streaming.api.scala.DataStream.map(DataStream.scala:484)
at com.abb.Flinktest.Flinktesthbaseread$.main(Flinktesthbaseread.scala:45)
at com.abb.Flinktest.Flinktesthbaseread.main(Flinktesthbaseread.scala)
Caused by: java.io.NotSerializableException: org.apache.flink.streaming.api.scala.DataStream
- field (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", name: "kafkaStream$1", type: "class org.apache.flink.streaming.api.scala.DataStream")
- root object (class "com.abb.Flinktest.Flinktesthbaseread$$anonfun$1", <function1>)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1182)
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548)
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509)
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432)
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178)
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348)
at org.apache.flink.util.InstantiationUtil.serializeObject(InstantiationUtil.java:301)
at org.apache.flink.api.scala.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:170)
... 6 more
I tried wrapping the field inside a method and a class by making the class serializable as wel, but no luck. Could someone throw some lights on this or suggest some workaround for this.
The problem is that you're trying to access the kafka stream variable in the map function which is simply not serializable. It is just an abstract representation of the data. It doesn't contain anything, which invalidates your function in the first place.
instead, do something like this:
The filter funtion will only retain the elements for which the condition is true, and those will be passed to your flatMap function.
I would highly recommend you to read the basis API concepts documentation, as there appears to be some misunderstanding as to what actually happens when you specify a transformation.