Converting DataSet to Json Array Spark using Scala

2020-04-21 01:51发布

问题:

I am new to the spark and unable to figure out the solution for the following problem.

I have a JSON file to parse and then create a couple of metrics and write the data back into the JSON format.

now following is my code I am using

import org.apache.spark.sql._
import org.apache.log4j.{Level, Logger}
import org.apache.spark.sql.functions._

object quick2 {

  def main(args: Array[String]): Unit = {
    Logger.getLogger("org").setLevel(Level.ERROR)
    val spark = SparkSession
      .builder
      .appName("quick1")
      .master("local[*]")
      .getOrCreate()

    val rawData = spark.read.json("/home/umesh/Documents/Demo2/src/main/resources/sampleQuick.json")

    val mat1 = rawData.select(rawData("mal_name"),rawData("cust_id")).distinct().orderBy("cust_id").toJSON.cache()
    val mat2 = rawData.select(rawData("file_md5"),rawData("mal_name")).distinct().orderBy(asc("file_md5")).toJSON.cache()

val write1 = mat1.coalesce(1).toJavaRDD.saveAsTextFile("/home/umesh/Documents/Demo2/src/test/mat1/")

val write = mat2.coalesce(1).toJavaRDD.saveAsTextFile("/home/umesh/Documents/Demo2/src/test/mat2/")
}
}

Now above code is writing the proper json format. However, matrices can contain duplicate result as well example:

md5   mal_name
1       a
1       b
2       c
3       d
3       e

so with above code every object is getting written in single line

like this

{"file_md5":"1","mal_name":"a"}
{"file_md5":"1","mal_name":"b"}
{"file_md5":"2","mal_name":"c"}
{"file_md5":"3","mal_name":"d"}

and so on.

but I want to combine the data of common keys:

so the output should be

{"file_md5":"1","mal_name":["a","b"]}

can somebody please suggest me what shall I do here. Or if there is any other better way to approach this problem.

Thanks!

回答1:

  1. You can use collect_list or collect_set as per your need on mal_name column
  2. You can directly save DataFrame/DataSet directly as JSON file
import org.apache.spark.sql.functions.{alias, collect_list}
import spark.implicits._

rawData.groupBy($"file_md5")
  .agg(collect_set($"mal_name").alias("mal_name"))
  .write
  .format("json")
  .save("json/file/location/to/save")


回答2:

as wrote by @mrsrinivas I changed my code as per below

val mat2 = rawData.select(rawData("file_md5"),rawData("mal_name")).distinct().orderBy(asc("file_md5")).cache()
val labeledDf = mat2.toDF("file_md5","mal_name")
labeledDf.groupBy($"file_md5").agg(collect_list($"mal_name")).coalesce(1).write.format("json").save("/home/umesh/Documents/Demo2/src/test/run8/")

Keeping this quesion open for some more suggestions if any.