How can you write to multiple outputs dependent on the key using Spark in a single Job.
Related: Write to multiple outputs by key Scalding Hadoop, one MapReduce Job
E.g.
sc.makeRDD(Seq((1, "a"), (1, "b"), (2, "c")))
.writeAsMultiple(prefix, compressionCodecOption)
would ensure cat prefix/1
is
a
b
and cat prefix/2
would be
c
EDIT: I've recently added a new answer that includes full imports, pimp and compression codec, see https://stackoverflow.com/a/46118044/1586965, which may be helpful in addition to the earlier answers.
I had a similar use case. I resolved it in Java by writing two custom classes implemeting
MultipleTextOutputFormat
andRecordWriter
.My input was a
JavaPairRDD<String, List<String>>
and I wanted to store it in a file named by its key, with all the lines contained in its value.Here is the code for my
MultipleTextOutputFormat
implementationHere is the code for my
RecordWriter
implementation.Most of the code is exactly the same than in
FileOutputFormat
. The only difference is those few linesThese lines allowed me to write each line of my input
List<String>
on the file. The first argument of thewrite
function is set tonull
in order to avoid writting the key on each line.To finish, I only need to do this call to write my files
good news for python user in the case you have multi columns and you want to save all the other columns not partitioned in csv format which will failed if you use "text" method as Nick Chammas' suggestion .
error message is "AnalysisException: u'Text data source supports only a single column, and you have 2 columns.;'"
In spark 2.0.0 (my test enviroment is hdp's spark 2.0.0) package "com.databricks.spark.csv" is now integrated , and it allow us save text file partitioned by only one column, see the example blow:
In my spark 1.6.1 enviroment ,the code didn't throw any error,however ther is only one file generated. it's not partitioned by two folders.
Hope this can help .
saveAsText() and saveAsHadoop(...) are implemented based on the RDD data, specifically by the method: PairRDD.saveAsHadoopDataset which takes the data from the PairRdd where it's executed. I see two possible options: If your data is relatively small in size, you could save some implementation time by grouping over the RDD, creating a new RDD from each collection and using that RDD to write the data. Something like this:
Note that it will not work for large datasets b/c the materialization of the iterator at
v.toSeq
might not fit in memory.The other option I see, and actually the one I'd recommend in this case is: roll your own, by directly calling the hadoop/hdfs api.
Here's a discussion I started while researching this question: How to create RDDs from another RDD?
I had a similar use case where I split the input file on Hadoop HDFS into multiple files based on a key (1 file per key). Here is my scala code for spark
I have grouped the records based on key. The values for each key is written to separate file.
If you use Spark 1.4+, this has become much, much easier thanks to the DataFrame API. (DataFrames were introduced in Spark 1.3, but
partitionBy()
, which we need, was introduced in 1.4.)If you're starting out with an RDD, you'll first need to convert it to a DataFrame:
In Python, this same code is:
Once you have a DataFrame, writing to multiple outputs based on a particular key is simple. What's more -- and this is the beauty of the DataFrame API -- the code is pretty much the same across Python, Scala, Java and R:
And you can easily use other output formats if you want:
In each of these examples, Spark will create a subdirectory for each of the keys that we've partitioned the DataFrame on:
I have a similar need and found an way. But it has one drawback (which is not a problem for my case): you need to re-partition you data with one partition per output file.
To partition in this way it generally requires to know beforehand how many files the job will output and find a function that will map each key to each partition.
First let's create our MultipleTextOutputFormat-based class:
With this class Spark will get a key from a partition (the first/last, I guess) and name the file with this key, so it's not good to mix multiple keys on the same partition.
For your example, you will require a custom partitioner. This will do the job:
Now let's put everything together:
This will generate 3 files under prefix (named 1, 2 and 7), processing everything in one pass.
As you can see, you need some knowledge about your keys to be able to use this solution.
For me it was easier because I needed one output file for each key hash and the number of files was under my control, so I could use the stock HashPartitioner to do the trick.