What I want is basically to have each element of data consist of 10 lines. However, with the following code, each element is still one line. What mistake am I doing here?
val conf = new SparkConf().setAppName("MyApp")
conf.set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
conf.registerKryoClasses(Array[Class[_]](classOf[NLineInputFormat], classOf[LongWritable],
classOf[Text]))
val sc = new SparkContext(conf)
val c = new Configuration(sc.hadoopConfiguration)
c.set("lineinputformat.linespermap", 10);
val data = sc.newAPIHadoopFile(fname, classOf[NLineInputFormat], classOf[LongWritable],
classOf[Text], c)
NLineInputFormat
by design just doesn't perform operation you expect it to:
NLineInputFormat which splits N lines of input as one split. (...) that splits the input file such that by default, one line is fed as a value to one map task.
As you can see it modifies how splits (partitions in the Spark nomenclature) are computed, not how records are determined.
If description is not clear we can illustrate that with a following example:
def nline(n: Int, path: String) = {
val sc = SparkContext.getOrCreate
val conf = new Configuration(sc.hadoopConfiguration)
conf.setInt("mapreduce.input.lineinputformat.linespermap", n);
sc.newAPIHadoopFile(path,
classOf[NLineInputFormat], classOf[LongWritable], classOf[Text], conf
)
}
require(nline(1, "README.md").glom.map(_.size).first == 1)
require(nline(2, "README.md").glom.map(_.size).first == 2)
require(nline(3, "README.md").glom.map(_.size).first == 3)
As show above each partition (possibly excluding the last one) contains exactly n lines.
While you can try to retrofit this to fit your case it won't be recommended for small values of the linespermap
parameter.