I am reading fixed positional file. Final result of file is stored in string. I would like to convert string into a DataFrame
to process further. Kindly help me on this. Below is my code:
Input data:
+---------+----------------------+
|PRGREFNBR|value |
+---------+----------------------+
|01 |11 apple TRUE 0.56|
|02 |12 pear FALSE1.34|
|03 |13 raspberry TRUE 2.43|
|04 |14 plum TRUE .31|
|05 |15 cherry TRUE 1.4 |
+---------+----------------------+
data position: "3,10,5,4"
expected result with default header in data frame:
+-----+-----+----------+-----+-----+
|SeqNo|col_0| col_1|col_2|col_3|
+-----+-----+----------+-----+-----+
| 01 | 11 |apple |TRUE | 0.56|
| 02 | 12 |pear |FALSE| 1.34|
| 03 | 13 |raspberry |TRUE | 2.43|
| 04 | 14 |plum |TRUE | 1.31|
| 05 | 15 |cherry |TRUE | 1.4 |
+-----+-----+----------+-----+-----+
Given the fixed-position file (say input.txt
):
11 apple TRUE 0.56
12 pear FALSE1.34
13 raspberry TRUE 2.43
14 plum TRUE 1.31
15 cherry TRUE 1.4
and the length of every field in the input file as (say lengths
):
3,10,5,4
you could create a DataFrame as follows:
// Read the text file as is
// and filter out empty lines
val lines = spark.read.textFile("input.txt").filter(!_.isEmpty)
// define a helper function to do the split per fixed lengths
// Home exercise: should be part of a case class that describes the schema
def parseLinePerFixedLengths(line: String, lengths: Seq[Int]): Seq[String] = {
lengths.indices.foldLeft((line, Array.empty[String])) { case ((rem, fields), idx) =>
val len = lengths(idx)
val fld = rem.take(len)
(rem.drop(len), fields :+ fld)
}._2
}
// Split the lines using parseLinePerFixedLengths method
val lengths = Seq(3,10,5,4)
val fields = lines.
map(parseLinePerFixedLengths(_, lengths)).
withColumnRenamed("value", "fields") // <-- it'd be unnecessary if a case class were used
scala> fields.show(truncate = false)
+------------------------------+
|fields |
+------------------------------+
|[11 , apple , TRUE , 0.56]|
|[12 , pear , FALSE, 1.34]|
|[13 , raspberry , TRUE , 2.43]|
|[14 , plum , TRUE , 1.31]|
|[15 , cherry , TRUE , 1.4 ]|
+------------------------------+
That's what you may have had already so let's unroll/destructure the nested sequence of fields into columns
val answer = lengths.indices.foldLeft(fields) { case (result, idx) =>
result.withColumn(s"col_$idx", $"fields".getItem(idx))
}
// drop the unnecessary/interim column
scala> answer.drop("fields").show
+-----+----------+-----+-----+
|col_0| col_1|col_2|col_3|
+-----+----------+-----+-----+
| 11 |apple |TRUE | 0.56|
| 12 |pear |FALSE| 1.34|
| 13 |raspberry |TRUE | 2.43|
| 14 |plum |TRUE | 1.31|
| 15 |cherry |TRUE | 1.4 |
+-----+----------+-----+-----+
Done!