How to use Flink's KafkaSource in Scala?

2020-04-08 11:57发布

问题:

I'm trying to run a simple test program with Flink's KafkaSource. I'm using the following:

  • Flink 0.9
  • Scala 2.10.4
  • Kafka 0.8.2.1

I followed the docs to test KafkaSource (added dependency, bundle the Kafka connector flink-connector-kafka in plugin) as described here and here.

Below is my simple test program:

import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.connectors.kafka

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

However, compilation always complains KafkaSource not found:

[ERROR] TestKafka.scala:8: error: not found: type KafkaSource
[ERROR]     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))

What do I miss here?

回答1:

I'm a sbt user so I used the following build.sbt:

organization := "pl.japila.kafka"
scalaVersion := "2.11.7"

libraryDependencies += "org.apache.flink" % "flink-connector-kafka" % "0.9.0" exclude("org.apache.kafka", "kafka_${scala.binary.version}")
libraryDependencies += "org.apache.kafka" %% "kafka" % "0.8.2.1"

that allowed me to run the program:

import org.apache.flink.streaming.api.environment._
import org.apache.flink.streaming.connectors.kafka
import org.apache.flink.streaming.connectors.kafka.api._
import org.apache.flink.streaming.util.serialization._

object TestKafka {
  def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val stream = env
     .addSource(new KafkaSource[String]("localhost:2181", "test", new SimpleStringSchema))
     .print
  }
}

The output:

[kafka-flink]> run
[info] Running TestKafka
log4j:WARN No appenders could be found for logger (org.apache.flink.streaming.api.graph.StreamGraph).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
[success] Total time: 0 s, completed Jul 15, 2015 9:29:31 AM


回答2:

The problem seems to be that SBT and Maven profiles do not play well together.

The Flink POMs refer to the Scala version (2.10, 2.11, ...) as a variable, some defined in build profiles. The profiles are not evaluated properly from SBT, so the packaging does not work correctly.

There is an issue and pending pull request to fix this: https://issues.apache.org/jira/browse/FLINK-2408



回答3:

object FlinkKafkaStreaming {
    def main(args: Array[String]) {
    val env = StreamExecutionEnvironment.getExecutionEnvironment
    val properties = new Properties()
    properties.setProperty("bootstrap.servers", "localhost:9092")
   // only required for Kafka 0.8
   properties.setProperty("zookeeper.connect", "localhost:2181")
   properties.setProperty("group.id", "flink-kafka")
   val stream = env.addSource(new FlinkKafkaConsumer08[String] 
  ("your_topic_name",new SimpleStringSchema(), properties))   
  stream.setParallelism(1).writeAsText("your_local_dir_path")
  env.execute("XDFlinkKafkaStreaming")
  }
}

In order to test you may do as below:

  1. Run flink demo first;
  2. Run Kafka_Proudcer;