How to handle multi line rows in spark?

2020-02-14 02:16发布

问题:

I am having a dataframe which has some multi-line observations:

+--------------------+----------------+
|         col1|               col2|
+--------------------+----------------+
|something1           |somethingelse1  |
|something2           |somethingelse2  |
|something3           |somethingelse3  |
|something4           |somethingelse4  |
|multiline

 row               |     somethings|
|something            |somethingall    |

What I want is to save in csv format(or txt) this dataframe. Using the following:

df
 .write
 .format("csv")
 .save("s3://../adf/")

But when I check the file it seperates the observations to multiple lines. What I want is the lines that have 'multiline' observatios to be one the same row in the txt/csv file. I tried to save it as txt file:

df
.as[(String,String)]
.rdd
.saveAsTextFile("s3://../adf")

but the same output was observed.

I can imagine that one way is to replace \n with something else and after when loading back do the reverse function. But Is there a way to save it in the desired way without doing any kind of transformation to the data?

回答1:

Assuming the multi-line data is properly quoted, you can parse multi-line csv data using the univocity parser and the multiLine setting

sparkSession.read
  .option("parserLib", "univocity")
  .option("multiLine", "true")
  .csv(file)

Note that this requires reading the entire file onto as single executor, and may not work if your data is too large. The standard text file reading will split the file by lines before doing any other parsing which will prevent you from working with data records containing newlines unless there is a different record delimiter you can use. If not you may need to implement a custom TextInputFormat to handle multiline records.



回答2:

By default spark saveTextFile considers a different row if it encounters \n. This is same with csv. In csv reading you can specify the delimiter with option("delimiter", "\t").

In my opinion the best way to read multiline input is through hadoopAPI. You can specify your own delimiter and process the data.

Something like this :

import org.apache.hadoop.io.LongWritable
import org.apache.hadoop.io.Text
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat

val conf = new Configuration
conf.set("textinputformat.record.delimiter", "<your delimiter>")
val data: RDD[(LongWritable, Text)] =spark.sparkContext.newAPIHadoopFile(<"filepath">, classOf[TextInputFormat], classOf[LongWritable], classOf[Text], conf)

Here in the data Text is your delimiter separated string