Best way to have nested tuples or nested columns i

2019-08-22 16:22发布

问题:

I'm having issue for grouping by nested column

my application scala version is is 2.11.7 and This is my sbt dependency

libraryDependencies ++= {
  val akkaVersion = "2.4.10"
  val sparkVersion = "2.1.1"

  Seq(
    "com.typesafe.akka" %% "akka-actor"                           % akkaVersion,
    "com.typesafe"      %  "config"                               % "1.3.0" ,
    "org.apache.spark"  %%  "spark-core"                          % sparkVersion,
    "org.apache.spark"  %%  "spark-sql"                           % sparkVersion,
    "com.typesafe.akka" %% "akka-slf4j"                           % akkaVersion,
    "org.apache.spark"  %% "spark-streaming"                      % sparkVersion
  )
}

This is my sample data (1 row)

124567893|254887452|52448796|2017-02-22 00:00:02|1|4|0014551233548|N|0|0|2||2|44|4||1|1|||2|-1||1|USD|||1457784114521||7|[1~26.927900~0.390200][4~0.000000~0.000000][8~0.000000~0.000000][9~0.000000~0.000000][11~0.000000~0.000000][12~0.000000~0.000000][13~0.000000~0.000000][71~0.000000~0.000000][91~0.000000~0.000000][111~0.000000~0.000000][131~0.000000~0.000000][251~0.000000~0.000000][311~0.000000~0.000000][331~0.000000~0.000000][451~0.000000~0.000000][3~0.000000~0.000000]|[323~4517.702200~0.390200][384~5310.000000~0.000000][443~4296.000000~0.000000][463~0.000000~0.000000][1024~10.535400~0.390200][1343~57.980000~0.000000][783~0.000000~0.000000][303~0.000000~0.000000][403~10.535400~0.390200][523~13790.000000~0.000000][1143~0.000000~0.000000][763~0.000000~0.000000]|

This is my mapper

case class SampleMap(
                   id: Long, //1
                   a_id_1: Long, //2
                   b_id_2: Long, //3
                   date_time: String, //4
                   subscriber_type: Int, //5
                   x_type: Int, //6
                   sub_id_2: String, //7
                   account_type: Int, //11
                   master_sub_id: String, //12
                   application_id: Int, //13
                   sup_type_id: Int, //14
                   unit_type_id: Int, //15
                   usage_amount: Long, //16
                   type_of_charge: String, //17
                   identity_id: Int, //18
                   group_id: String, //19
                   charge_code: String, //20
                   content_type: Int, //21
                   fund_usage_type: Int, //24
                   msc_id: String, //28
                   circle_id: Int, //29
                   sp_id: Int, //30
                   balance: List[(Int, Double, Double)], //31
                   z_info: List[(Int, Double, Double] //33

                 )

I have written the code to filter and map

 private def mappingSparkLoadedSMSData(sparkRdd:Dataset[String]): Dataset[SMSMap] = {

    import SparkFactory.spark.implicits._
    sparkRdd
      .map(_.split("\\|",-1))
      .filter(_.length==33)       //adding last empty string
      .map(
      data =>
        SMSMap(

          {if(data(0).nonEmpty) data(0).toLong else 0 },
          {if(data(1).nonEmpty) data(1).toLong else 0 },
          {if(data(2).nonEmpty) data(2).toLong else 0 },
          data(3),
          {if(data(4).nonEmpty) data(4).toInt else 0 },
          {if(data(5).nonEmpty) data(5).toInt else 0 },
          data(6),
          {if(data(10).nonEmpty) data(10).toInt else 0 },
          data(11),
          {if(data(12).nonEmpty) data(12).toInt else 0 },
          {if(data(13).nonEmpty) data(13).toInt else 0 },
          {if(data(14).nonEmpty) data(14).toInt else 0 },
          {if(data(15).nonEmpty) data(15).toLong else 0 },
          data(16),
          {if(data(17).nonEmpty) data(17).toInt else 0 },
          data(18),
          data(19),
          {if(data(20).nonEmpty) data(20).toInt else 0 },
          {if(data(23).nonEmpty) data(23).toInt else 0 },
          data(27),
          {if(data(28).nonEmpty) data(28).toInt else 0 },
          {if(data(29).nonEmpty) data(29).toInt else 0 },

          data(30)
            .drop(1)
            .dropRight(1)
            .split("\\]\\[")
            .map(_.split('~'))
            .filter(data =>  data.length > 2 && data(2).nonEmpty &&  data(2).toDouble != 0)
            .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
            .toList,

          data(31)
            .drop(1)
            .dropRight(1)
            .split("\\]\\[")
            .map(_.split('~'))
            .filter(data =>  data.length > 2 && data(2).nonEmpty &&  data(2).toDouble != 0)
            .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
            .toList


        )
    )
  }

And then I'm creating the temp view and trying to query like this

formattedRDD.createOrReplaceTempView("temp_table")  //formattedRDD is a val that stored after Mapping

spark.sql(
      s" select balance from temp_table group by balance"
    ).collectAsList()

when you look at the y_info: List[(Int, Double, Double)], //31

the first column is bal_id (Int) and second one is change_balance (Double) and third one is accumulated (Double) and it has more than one sets

Now I wanted to group by bal_id and get the sum of change_balance but I couldn't do that (of course can not do that because each one is value)

I had the idea to separate the balance ( balance: List[(Int, Double, Double)], //31 ) in to different dataset/table and mapping and grouping but to separate we need to add a auto_increment_id or any unique row identifier for both dataset/table for mapping purpose (note that id can be duplicate)

I'm really confused with this. Any one please help me. Thanks in advance

回答1:

If you separate the balance column to three different columns, it would be easy for you to groupBy on bal_id and sum change_balance.
you can create these three separate columns in your initial stage.
Here's the solution according to what I understood from your question:

You need to include the three column names in your case class as :

case class SampleMap(
                      id: Long, //1
                      a_id_1: Long, //2
                      b_id_2: Long, //3
                      date_time: String, //4
                      subscriber_type: Int, //5
                      x_type: Int, //6
                      sub_id_2: String, //7
                      account_type: Int, //11
                      master_sub_id: String, //12
                      application_id: Int, //13
                      sup_type_id: Int, //14
                      unit_type_id: Int, //15
                      usage_amount: Long, //16
                      type_of_charge: String, //17
                      identity_id: Int, //18
                      group_id: String, //19
                      charge_code: String, //20
                      content_type: Int, //21
                      fund_usage_type: Int, //24
                      msc_id: String, //28
                      circle_id: Int, //29
                      sp_id: Int, //30
                      balance: List[(Int, Double, Double)], //31
                      bal_id: Int,              //added by Ramesh
                      change_balance: Double,   //added by Ramesh
                      accumulated: Double,      //added by Ramesh
                      z_info: List[(Int, Double, Double)] //33
                    )

You have to separate those three values to separate columns while creating dataframe/dataset. Following is the improved version of your code :

val formattedRDD = sparkRdd.map(_.split("\\|",-1))
      .filter(_.length==33)       //adding last empty string
      .map( data => {
        val balance = Try(data(30)
          .drop(1)
          .dropRight(1)
          .split("\\]\\[")
          .map(_.split('~'))
          .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
          .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
          .toList) getOrElse List((0, 0.0, 0.0))

        SampleMap(
          Try(data(0).toLong) getOrElse 0,
          Try(data(1).toLong) getOrElse 0,
          Try(data(2).toLong) getOrElse 0,
          Try(data(3).toString) getOrElse "",
          Try(data(4).toInt) getOrElse 0,
          Try(data(5).toInt) getOrElse 0,
          Try(data(6).toString) getOrElse "",
          0,
          Try(data(11).toString) getOrElse "",
          Try(data(12).toInt) getOrElse 0,
          Try(data(13).toInt) getOrElse 0,
          Try(data(14).toInt) getOrElse 0,
          Try(data(15).toLong) getOrElse 0,
          Try(data(16).toString) getOrElse "",
          Try(data(17).toInt) getOrElse 0,
          Try(data(18).toString) getOrElse "",
          Try(data(19).toString) getOrElse "",
          Try(data(20).toInt) getOrElse 0,
          Try(data(23).toInt) getOrElse 0,
          Try(data(27).toString) getOrElse "",
          Try(data(28).toInt) getOrElse 0,
          Try(data(29).toInt) getOrElse 0,
          balance,               //this is the 30th value i.e. balance
          balance(0)._1,         //this is bal_id
          balance(0)._2,         //this is change_balance
          balance(0)._3,         //this is accumulator

          Try(data(31)
            .drop(1)
            .dropRight(1)
            .split("\\]\\[")
            .map(_.split('~'))
            .filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
            .map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
            .toList) getOrElse List.empty
        )
      }
    )
  .toDS()

Now all you need to do is call an aggregator

import org.apache.spark.sql.functions.sum
formattedRDD.groupBy("bal_id").agg(sum("change_balance")).show

I hope this is your required solution