I have flat file with the following structure:
key1|"value-001"
key2|"value-002"
key2|"value-003"
key3|"value-004"
key2|"value-005"
key1|"value-006"
key3|"value-007"
I need to map this data file to key-value pairs where value will be list of values for one key, such as:
key1:["value-001","value-006"]
key2:["value-002","value-003","value-005"]
key3:["value-004","value-007"]
I need do this from Java code. As I understood from Spark Programming Guide this operation should be implemented by sc.flatMapValues(..)
, sc.flatMap(..)
or sc.groupByKey(..)
but I don't know which one. How do I do this?
I would recommend reduceByKey
:) This list imitates your input:
List<String> input = Arrays.asList(
new String[]{new String("key1|value-001"),
new String("key2|value-002"),
new String("key2|value-003"),
new String("key3|value-004"),
new String("key2|value-005"),
new String("key1|value-006"),
new String("key3|value-007")});
Converting to rdd (you will of course just read in your file with sc.textFile()
)
JavaRDD<String> rdd = javaSparkContext.parallelize(input);
We now have an RDD of strings. The following maps to key-value pairs (note the value is being added to a list) and then reduceByKey
combines all values for each key into a list, yielding the result you want.
JavaPairRDD<String, List<String>> keyValuePairs = rdd.mapToPair(obj -> {
String[] split = obj.split("|");
return new Tuple2(split[0], Arrays.asList(new String[]{split[1]}));
});
JavaPairRDD<String, List<String>> result = keyValuePairs.reduceByKey((v1, v2) -> {
v1.addAll(v2);
return v1;
});
EDIT: I feel I should mention that you could also use a groupByKey
. However, you usually want to favor reduceByKey
over groupByKey
because reduceByKey
does a map-side reduce BEFORE shuffling the data around, whereas groupByKey
shuffles everything around. In your particular case, you will probably end up shuffling the same amount of data around as with a groupByKey
since you want all values to be gathered, but using reduceByKey
is just a better habit to be in :)