Flink on Yarn, parallel source with Kafka

2019-09-14 16:55发布

问题:

I am trying to have parallelism with my Kafka source within my Flink job, but I failed so far.

I set 4 partitions to my Kafka producer :

$ ./bin/kafka-topics.sh --describe --zookeeper X.X.X.X:2181 --topic mytopic
Topic:mytopic   PartitionCount:4    ReplicationFactor:1 Configs:
    Topic: mytopic  Partition: 0    Leader: 0   Replicas: 0 Isr: 0
    Topic: mytopic  Partition: 1    Leader: 0   Replicas: 0 Isr: 0
    Topic: mytopic  Partition: 2    Leader: 0   Replicas: 0 Isr: 0
    Topic: mytopic  Partition: 3    Leader: 0   Replicas: 0 Isr: 0

My scala code is as follow :

    val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(4)
        env.getConfig.setGlobalJobParameters(params)

    // **** Kafka CONNECTION ****
    val properties = new Properties();
    properties.setProperty("bootstrap.servers", params.get("server"));
    properties.setProperty("group.id", "test");


    // **** Get KAFKA source ****
   val stream: DataStream[String] = env.addSource(new FlinkKafkaConsumer010[String](params.get("topic"), new SimpleStringSchema(), properties))

I run my job on YARN :

$ ./bin/flink run -m yarn-cluster -yn 4 -yjm 8192 -ynm test -ys 1 -ytm 8192 myjar.jar --server X.X.X.X:9092 --topic mytopic

I tried a bunch of things, but my source is not parallelized :

Having several Kafka partitions and at least as much slot / Task Managers should do it, right?

回答1:

I had the same issue. I would suggest you to check two things.

  1. When you are implementing the producer check if you are producing same key for each record flushed to Kafka. (you should have a well distributed key or simply set it to null).

producer.send(new ProducerRecord<String,String>("topicName","yourKey","yourMessage")

to

producer.send(new ProducerRecord<String,String>("topicName",null,"yourMessage")

  1. Check if your Kafka producer library version is compatible with Kafka consumer library version.