I'm having issue for grouping by nested column
my application scala version is is 2.11.7 and This is my sbt dependency
libraryDependencies ++= {
val akkaVersion = "2.4.10"
val sparkVersion = "2.1.1"
Seq(
"com.typesafe.akka" %% "akka-actor" % akkaVersion,
"com.typesafe" % "config" % "1.3.0" ,
"org.apache.spark" %% "spark-core" % sparkVersion,
"org.apache.spark" %% "spark-sql" % sparkVersion,
"com.typesafe.akka" %% "akka-slf4j" % akkaVersion,
"org.apache.spark" %% "spark-streaming" % sparkVersion
)
}
This is my sample data (1 row)
124567893|254887452|52448796|2017-02-22 00:00:02|1|4|0014551233548|N|0|0|2||2|44|4||1|1|||2|-1||1|USD|||1457784114521||7|[1~26.927900~0.390200][4~0.000000~0.000000][8~0.000000~0.000000][9~0.000000~0.000000][11~0.000000~0.000000][12~0.000000~0.000000][13~0.000000~0.000000][71~0.000000~0.000000][91~0.000000~0.000000][111~0.000000~0.000000][131~0.000000~0.000000][251~0.000000~0.000000][311~0.000000~0.000000][331~0.000000~0.000000][451~0.000000~0.000000][3~0.000000~0.000000]|[323~4517.702200~0.390200][384~5310.000000~0.000000][443~4296.000000~0.000000][463~0.000000~0.000000][1024~10.535400~0.390200][1343~57.980000~0.000000][783~0.000000~0.000000][303~0.000000~0.000000][403~10.535400~0.390200][523~13790.000000~0.000000][1143~0.000000~0.000000][763~0.000000~0.000000]|
This is my mapper
case class SampleMap(
id: Long, //1
a_id_1: Long, //2
b_id_2: Long, //3
date_time: String, //4
subscriber_type: Int, //5
x_type: Int, //6
sub_id_2: String, //7
account_type: Int, //11
master_sub_id: String, //12
application_id: Int, //13
sup_type_id: Int, //14
unit_type_id: Int, //15
usage_amount: Long, //16
type_of_charge: String, //17
identity_id: Int, //18
group_id: String, //19
charge_code: String, //20
content_type: Int, //21
fund_usage_type: Int, //24
msc_id: String, //28
circle_id: Int, //29
sp_id: Int, //30
balance: List[(Int, Double, Double)], //31
z_info: List[(Int, Double, Double] //33
)
I have written the code to filter and map
private def mappingSparkLoadedSMSData(sparkRdd:Dataset[String]): Dataset[SMSMap] = {
import SparkFactory.spark.implicits._
sparkRdd
.map(_.split("\\|",-1))
.filter(_.length==33) //adding last empty string
.map(
data =>
SMSMap(
{if(data(0).nonEmpty) data(0).toLong else 0 },
{if(data(1).nonEmpty) data(1).toLong else 0 },
{if(data(2).nonEmpty) data(2).toLong else 0 },
data(3),
{if(data(4).nonEmpty) data(4).toInt else 0 },
{if(data(5).nonEmpty) data(5).toInt else 0 },
data(6),
{if(data(10).nonEmpty) data(10).toInt else 0 },
data(11),
{if(data(12).nonEmpty) data(12).toInt else 0 },
{if(data(13).nonEmpty) data(13).toInt else 0 },
{if(data(14).nonEmpty) data(14).toInt else 0 },
{if(data(15).nonEmpty) data(15).toLong else 0 },
data(16),
{if(data(17).nonEmpty) data(17).toInt else 0 },
data(18),
data(19),
{if(data(20).nonEmpty) data(20).toInt else 0 },
{if(data(23).nonEmpty) data(23).toInt else 0 },
data(27),
{if(data(28).nonEmpty) data(28).toInt else 0 },
{if(data(29).nonEmpty) data(29).toInt else 0 },
data(30)
.drop(1)
.dropRight(1)
.split("\\]\\[")
.map(_.split('~'))
.filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
.map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
.toList,
data(31)
.drop(1)
.dropRight(1)
.split("\\]\\[")
.map(_.split('~'))
.filter(data => data.length > 2 && data(2).nonEmpty && data(2).toDouble != 0)
.map(data => (data(0).toInt, data(2).toDouble, data(2).toDouble))
.toList
)
)
}
And then I'm creating the temp view and trying to query like this
formattedRDD.createOrReplaceTempView("temp_table") //formattedRDD is a val that stored after Mapping
spark.sql(
s" select balance from temp_table group by balance"
).collectAsList()
when you look at the y_info: List[(Int, Double, Double)], //31
the first column is bal_id (Int) and second one is change_balance (Double) and third one is accumulated (Double) and it has more than one sets
Now I wanted to group by bal_id and get the sum of change_balance but I couldn't do that (of course can not do that because each one is value)
I had the idea to separate the balance ( balance: List[(Int, Double, Double)], //31 ) in to different dataset/table and mapping and grouping but to separate we need to add a auto_increment_id or any unique row identifier for both dataset/table for mapping purpose (note that id can be duplicate)
I'm really confused with this. Any one please help me. Thanks in advance