I have a Flink Cluster on VirtualBox incliding three node, 1 master and 2 slaves. I customized WordCount example and create a fat jar file to run it using VirtualBox Flink remote cluster, But I faced Error.
Notice: I imported dependencies manually to the project(using Intellij IDEA) and I didn't use maven as dependency provider. I test my code on local machine and it was OK!
More details are following:
Here is my Java code:
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.DataSet;
import org.apache.flink.api.java.ExecutionEnvironment;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.util.Collector;
public class WordCount {
// *************************************************************************
// PROGRAM
// *************************************************************************
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
final int port;
final String ip;
DataSet<String> text;
try {
ip = params.get("ip");
port = params.getInt("port");
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(ip, port, 2);
text = env.readTextFile(params.get("input"));
} catch (Exception e) {
System.err.println("No port or input or ip specified. Please run 'SocketWindowWordCount --ip <ip> --port <port>'" +
" --input <input>");
return;
}
DataSet<Tuple2<String, Integer>> counts =
// split up the lines in pairs (2-tuples) containing: (word,1)
text.flatMap(new Tokenizer())
// group by the tuple field "0" and sum up tuple field "1"
.groupBy(0)
.sum(1);
System.out.println("Printing result to stdout. Use --output to specify output path.");
counts.print();
}
// *************************************************************************
// USER FUNCTIONS
// *************************************************************************
/**
* Implements the string tokenizer that splits sentences into words as a user-defined
* FlatMapFunction. The function takes a line (String) and splits it into
* multiple pairs in the form of "(word,1)" ({@code Tuple2<String, Integer>}).
*/
public static final class Tokenizer implements FlatMapFunction<String, Tuple2<String, Integer>> {
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) {
// normalize and split the line
String[] tokens = value.toLowerCase().split("\\W+");
// emit the pairs
for (String token : tokens) {
if (token.length() > 0) {
out.collect(new Tuple2<String, Integer>(token, 1));
}
}
}
}
}
I created ExecutionEnvironment object using command:
final ExecutionEnvironment env = ExecutionEnvironment.createRemoteEnvironment(ip, port, 2);
, And I run the code using the following command on host machine(that is connected to the cluster nodes and VirtualBox is running on that)
java -cp FlinkWordCountClusetr.jar WordCount --ip 192.168.101.10 --port 6123 --input /usr/local/flink/LICENSE
, But I faced the following error(in summarized):
Exception in thread "main" org.apache.flink.client.program.ProgramInvocationException: Could not start the ActorSystem needed to talk to the JobManager.
Caused by: com.typesafe.config.ConfigException$Missing: No configuration setting found for key 'akka.remote.log-received-messages'
How can I fix that?
After trying many settings, it was all about maven dependencies was not matching Flink version installed on the remote cluster. Maven dependencies were Flink version
1.3.2
build on Scala2.10
, while Flink installed on the remote cluster was1.3.2
build on Scala2.11
. Just a minor difference but important!