I am trying to have parallelism with my Kafka source within my Flink job, but I failed so far.
I set 4 partitions to my Kafka producer :
$ ./bin/kafka-topics.sh --describe --zookeeper X.X.X.X:2181 --topic mytopic
Topic:mytopic PartitionCount:4 ReplicationFactor:1 Configs:
Topic: mytopic Partition: 0 Leader: 0 Replicas: 0 Isr: 0
Topic: mytopic Partition: 1 Leader: 0 Replicas: 0 Isr: 0
Topic: mytopic Partition: 2 Leader: 0 Replicas: 0 Isr: 0
Topic: mytopic Partition: 3 Leader: 0 Replicas: 0 Isr: 0
My scala code is as follow :
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.setParallelism(4)
env.getConfig.setGlobalJobParameters(params)
// **** Kafka CONNECTION ****
val properties = new Properties();
properties.setProperty("bootstrap.servers", params.get("server"));
properties.setProperty("group.id", "test");
// **** Get KAFKA source ****
val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer010[String](params.get("topic"), new SimpleStringSchema(), properties))
I run my job on YARN :
$ ./bin/flink run -m yarn-cluster -yn 4 -yjm 8192 -ynm test -ys 1 -ytm 8192 myjar.jar --server X.X.X.X:9092 --topic mytopic
I tried a bunch of things, but my source is not parallelized :
Having several Kafka partitions and at least as much slot / Task Managers should do it, right?