How to create DataFrame from fixed-length text fil

2020-04-28 01:01发布

问题:

I am reading fixed positional file. Final result of file is stored in string. I would like to convert string into a DataFrame to process further. Kindly help me on this. Below is my code:

Input data: +---------+----------------------+

|PRGREFNBR|value |

+---------+----------------------+

|01 |11 apple TRUE 0.56|

|02 |12 pear FALSE1.34|

|03 |13 raspberry TRUE 2.43|

|04 |14 plum TRUE .31|

|05 |15 cherry TRUE 1.4 |

+---------+----------------------+

data position: "3,10,5,4"

expected result with default header in data frame:

+-----+-----+----------+-----+-----+

|SeqNo|col_0| col_1|col_2|col_3|

+-----+-----+----------+-----+-----+

| 01 | 11 |apple |TRUE | 0.56|

| 02 | 12 |pear |FALSE| 1.34|

| 03 | 13 |raspberry |TRUE | 2.43|

| 04 | 14 |plum |TRUE | 1.31|

| 05 | 15 |cherry |TRUE | 1.4 |

+-----+-----+----------+-----+-----+

回答1:

Given the fixed-position file (say input.txt):

11 apple     TRUE 0.56

12 pear      FALSE1.34 

13 raspberry TRUE 2.43 

14 plum      TRUE 1.31 

15 cherry    TRUE 1.4 

and the length of every field in the input file as (say lengths):

3,10,5,4

you could create a DataFrame as follows:

// Read the text file as is
// and filter out empty lines
val lines = spark.read.textFile("input.txt").filter(!_.isEmpty)

// define a helper function to do the split per fixed lengths
// Home exercise: should be part of a case class that describes the schema
def parseLinePerFixedLengths(line: String, lengths: Seq[Int]): Seq[String] = {
  lengths.indices.foldLeft((line, Array.empty[String])) { case ((rem, fields), idx) =>
    val len = lengths(idx)
    val fld = rem.take(len)
    (rem.drop(len), fields :+ fld)
  }._2
}

// Split the lines using parseLinePerFixedLengths method
val lengths = Seq(3,10,5,4)
val fields = lines.
  map(parseLinePerFixedLengths(_, lengths)).
  withColumnRenamed("value", "fields") // <-- it'd be unnecessary if a case class were used
scala> fields.show(truncate = false)
+------------------------------+
|fields                        |
+------------------------------+
|[11 , apple     , TRUE , 0.56]|
|[12 , pear      , FALSE, 1.34]|
|[13 , raspberry , TRUE , 2.43]|
|[14 , plum      , TRUE , 1.31]|
|[15 , cherry    , TRUE , 1.4 ]|
+------------------------------+

That's what you may have had already so let's unroll/destructure the nested sequence of fields into columns

val answer = lengths.indices.foldLeft(fields) { case (result, idx) =>
  result.withColumn(s"col_$idx", $"fields".getItem(idx))
}
// drop the unnecessary/interim column
scala> answer.drop("fields").show
+-----+----------+-----+-----+
|col_0|     col_1|col_2|col_3|
+-----+----------+-----+-----+
|  11 |apple     |TRUE | 0.56|
|  12 |pear      |FALSE| 1.34|
|  13 |raspberry |TRUE | 2.43|
|  14 |plum      |TRUE | 1.31|
|  15 |cherry    |TRUE | 1.4 |
+-----+----------+-----+-----+

Done!