My objective is to setup a high throughput cluster using Kafka as source & Flink as the stream processing engine. Here's what I have done.
I have setup a 2-node cluster the following configuration on the master and the slave.
Master flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 256
taskmanager.heap.mb: 512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
Slave flink-conf.yaml
jobmanager.rpc.address: <MASTER_IP_ADDR> #localhost
jobmanager.rpc.port: 6123
jobmanager.heap.mb: 512 #256
taskmanager.heap.mb: 1024 #512
taskmanager.numberOfTaskSlots: 50
parallelism.default: 100
The slaves file on the Master node looks like this:
<SLAVE_IP_ADDR>
localhost
The flink setup on both nodes is in a folder which has the same name. I start up the cluster on the master by running
bin/start-cluster-streaming.sh
This starts up the task manager on the slave node.
My input source is Kafka. Here is the snippet.
final StreamExecutionEnvironment env =
StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> stream =
env.addSource(
new KafkaSource<String>(kafkaUrl,kafkaTopic, new SimpleStringSchema()));
stream.addSink(stringSinkFunction);
env.execute("Kafka stream");
Here is my Sink function
public class MySink implements SinkFunction<String> {
private static final long serialVersionUID = 1L;
public void invoke(String arg0) throws Exception {
processMessage(arg0);
System.out.println("Processed Message");
}
}
Here are the Flink Dependencies in my pom.xml.
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-core</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients</artifactId>
<version>0.9.0</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-connector-kafka</artifactId>
<version>0.9.0</version>
</dependency>
Then I run the packaged jar with this command on the master
bin/flink run flink-test-jar-with-dependencies.jar
However when I insert messages into the Kafka topic I am able to account for all messages coming in from my Kafka topic (via debug messages in the invoke method of my SinkFunction
implementation) on the Master node alone.
In the Job manager UI I am able to see 2 Task managers as below:
Also The dashboard looks like so : Questions:
- Why are the slave nodes not getting the tasks?
- Am I missing some configuration?