Spark SQL removing white spaces

2020-02-16 01:25发布

问题:

I have a simple Spark Program which reads a JSON file and emits a CSV file. IN the JSON data the values contain leading and trailing white spaces, when I emit the CSV the leading and trailing white spaces are gone. Is there a way I can retain the spaces. I tried many options like ignoreTrailingWhiteSpace , ignoreLeadingWhiteSpace but no luck

input.json

{"key" : "k1", "value1": "Good String", "value2": "Good String"}
{"key" : "k1", "value1": "With Spaces      ", "value2": "With Spaces      "}
{"key" : "k1", "value1": "with tab\t", "value2": "with tab\t"}

output.csv

_corrupt_record,key,value1,value2
,k1,Good String,Good String
,k1,With Spaces,With Spaces
,k1,with tab,with tab

expected.csv

_corrupt_record,key,value1,value2
,k1,Good String,Good String
,k1,With Spaces      ,With Spaces      
,k1,with tab\t,with tab\t

my code:

public static void main(String[] args) {
    SparkSession sparkSession = SparkSession
            .builder()
            .appName(TestSpark.class.getName())
            .master("local[1]").getOrCreate();

    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel("ERROR");
    SQLContext sqlCtx = sparkSession.sqlContext();
    System.out.println("Spark context established");

    List<StructField> kvFields = new ArrayList<>();
    kvFields.add(DataTypes.createStructField("_corrupt_record", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("key", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("value1", DataTypes.StringType, true));
    kvFields.add(DataTypes.createStructField("value2", DataTypes.StringType, true));
    StructType employeeSchema = DataTypes.createStructType(kvFields);

    Dataset<Row> dataset =
            sparkSession.read()
                    .option("inferSchema", false)
                    .format("json")
                    .schema(employeeSchema)
                    .load("D:\\dev\\workspace\\java\\simple-kafka\\key_value.json");

    dataset.createOrReplaceTempView("sourceView");
    sqlCtx.sql("select * from sourceView")
            .write()
            .option("header", true)
            .format("csv")
            .save("D:\\dev\\workspace\\java\\simple-kafka\\output\\" + UUID.randomUUID().toString());
    sparkSession.close();
}

Update

Added POM dependencies

<dependencies>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-core_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming_2.10</artifactId>
        <version>2.1.0</version>
    </dependency>
    <dependency>
        <groupId>org.slf4j</groupId>
        <artifactId>slf4j-log4j12</artifactId>
        <version>1.7.22</version>
    </dependency>
</dependencies>

回答1:

The CSV writer trims leading and trailing spaces by default. You can turn it off with

   sqlCtx.sql("select * from sourceView").write.
       option("header", true).
       option("ignoreLeadingWhiteSpace",false). // you need this
       option("ignoreTrailingWhiteSpace",false). // and this
       format("csv").save("/my/file/location")

this works for me. If it didn't work for you, can you post what you tried, also, which spark version are you using ? They introduced this feature just last year if I remember right.



回答2:

For Apache Spark 2.2+ you simply use "ignoreLeadingWhiteSpace" and "ignoreTrailingWhiteSpace" options (see details in @Roberto Congiu's answer)

I guess it should be default behaviour for the lower Apache Spark versions - i'm not sure though.

For Apache Spark 1.3+ you can use "univocity" parserLib in order to specify it explicitly:

df.write
  .option("parserLib","univocity")
  .option("ignoreLeadingWhiteSpace","false")
  .option("ignoreTrailingWhiteSpace","false")
  .format("csv")

Old "incorrect" answer - shows how to get rid of leading and trailing spaces and tabs in the whole data frame (in all columns)

Here is a scala solution:

Source DF:

scala> val df = spark.read.json("file:///temp/a.json")
df: org.apache.spark.sql.DataFrame = [key: string, value1: string ... 1 more field]

scala> df.show
+---+-----------------+-----------------+
|key|           value1|           value2|
+---+-----------------+-----------------+
| k1|      Good String|      Good String|
| k1|With Spaces      |With Spaces      |
| k1|        with tab   |        with tab       |
+---+-----------------+-----------------+

Solution:

import org.apache.spark.sql.functions._

val df2 = df.select(df.columns.map(c => regexp_replace(col(c),"(^\\s+|\\s+$)","").alias(c)):_*)

Result:

scala> df2.show
+---+----------+----------+
|key|    value1|    value2|
+---+----------+----------+
| k1|GoodString|GoodString|
| k1|WithSpaces|WithSpaces|
| k1|   withtab|   withtab|
+---+----------+----------+

PS it should be very similar in Java Spark...



回答3:

// hope these two options can solve your question
spark.read.json(inputPath).write
    .option("ignoreLeadingWhiteSpace",false)
    .option("ignoreTrailingWhiteSpace", false)
    .csv(outputPath)

You can check the link below to get more info

https://issues.apache.org/jira/browse/SPARK-18579

https://github.com/apache/spark/pull/17310

Thanks