I'm trying to run a simple test program with Flink's KafkaSource. I'm using the following:
- Flink 0.9
- Scala 2.10.4
- Kafka 0.8.2.1
I followed the docs to test KafkaSource (added dependency, bundle the Kafka connector flink-connector-kafka in plugin) as described here and here.
Below is my simple test program:
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka
object TestKafka {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
.print
}
}
However, compilation always complains KafkaSource not found:
[ERROR] TestKafka.scala:8: error: not found: type KafkaSource
[ERROR] .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
What do I miss here?
I'm a sbt user so I used the following build.sbt
:
organization := "pl.japila.kafka"
scalaVersion := "2.11.7"
libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"
that allowed me to run the program:
import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._
object TestKafka {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val stream = env
.addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
.print
}
}
The output:
[kafka-flink]> run
[info] Running TestKafka
log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraph).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[success] Total time: 0 s, completed Jul 15, 2015 9:29:31 AM
The problem seems to be that SBT and Maven profiles do not play well together.
The Flink POMs refer to the Scala version (2.10, 2.11, ...) as a variable, some defined in build profiles. The profiles are not evaluated properly from SBT, so the packaging does not work correctly.
There is an issue and pending pull request to fix this: https://issues.apache.org/jira/browse/FLINK-2408
object FlinkKafkaStreaming {
def main(args: Array[String]) {
val env = StreamExecutionEnvironment.getExecutionEnvironment
val properties = new Properties()
properties.setProperty("bootstrap.servers", "localhost:9092")
// only required for Kafka 0.8
properties.setProperty("zookeeper.connect", "localhost:2181")
properties.setProperty("group.id", "flink-kafka")
val stream = env.addSource(new FlinkKafkaConsumer08[String]
("your_topic_name",new SimpleStringSchema(), properties))
stream.setParallelism(1).writeAsText("your_local_dir_path")
env.execute("XDFlinkKafkaStreaming")
}
}
In order to test you may do as below:
- Run flink demo first;
- Run Kafka_Proudcer;