Finding average value in spark scala gives blank r

2019-09-27 17:00发布

问题:

I have a input.txt file. Data looks as below.

1   1383260400000   0   0.08136262351125882             
1   1383260400000   39  0.14186425470242922 0.1567870050390246  0.16093793691701822 0.052274848528573205    11.028366381681026
1   1383261000000   0   0.13658782275823106         0.02730046487718618 
1   1383261000000   33                  0.026137424264286602
2241    1383324600000   0   0.16869936142032646             
2241    1383324600000   39  0.820500491400199   0.6518011299798726  1.658248219576473   3.4506242774863045  36.71096470849049
2241    1383324600000   49  0.16295028249496815

Assume the first column is id and other columns are col1,col2,col3,col4,col5,col6 and col7 respectively.I want to find average for the col7 for each id. Basically I want my results in, id, avg of col7 format.

This is the code I have tried so far. I read my data in txt file. Then I created a schema.

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("col1", DoubleType, true),
  StructField("col2", IntegerType, true),
  StructField("col3", DoubleType, true),
  StructField("col4", DoubleType, true),
  StructField("col5", DoubleType, true),
  StructField("col6", DoubleType, true),
  StructField("col7", DoubleType, true)
))

Then I created a data frame.

val data = text.map(line => line.split("\\t")).map(arr => Row.fromSeq(Seq(arr(0).toInt,Try(arr(1).asInstanceOf[DoubleType]) getOrElse(0.0),Try(arr(2).toInt) getOrElse(0),Try(arr(3).toDouble) getOrElse(0.0),Try(arr(4).toDouble) getOrElse(0.0),Try(arr(5).toDouble) getOrElse(0.0),Try(arr(6).toDouble) getOrElse(0.0),Try(arr(7).asInstanceOf[DoubleType]) getOrElse(0.0)))) 

Finally save in a txt file.

val res1 = df.groupBy("ID").agg(avg("col7"))

res1.rdd.saveAsTextFile("/stuaverage/spoutput12")

When I run this I get several file with blank results. e.g.

[1068,0.0]
[1198,0.0]
[1344,0.0]
[1404,0.0]
[1537,0.0]
[1675,0.0]
[1924,0.0]
[193,0.0]
[211,0.0]
[2200,0.0]
[2225,0.0]
[2663,0.0]
[2888,0.0]
[3152,0.0]
[3235,0.0]

First column is correct. But for the second column, I should get a value. (although values are missing for some rows)

Please help.

回答1:

I would suggest you to use sqlContext api and use the schema you have defined

val df = sqlContext.read
  .format("com.databricks.spark.csv")
  .option("delimiter", "\\t")
  .schema(schema)
  .load("path to your text file") 

the schema is

val schema = StructType(Seq(
  StructField("ID", IntegerType, true),
  StructField("col1", DoubleType, true),
  StructField("col2", IntegerType, true),
  StructField("col3", DoubleType, true),
  StructField("col4", DoubleType, true),
  StructField("col5", DoubleType, true),
  StructField("col6", DoubleType, true),
  StructField("col7", DoubleType, true)
))

After that all you need is to apply avg function on the grouped dataframe as

import org.apache.spark.sql.functions._
val res1 = df.groupBy("ID").agg(avg("col1"),avg("col2"),avg("col3"),avg("col4"),avg("col5"),avg("col6"),avg("col7"))

finally you can save directly to csv from dataframe. You don't need to convert to rdd

  res1.coalesce(1).write.csv("/stuaverage/spoutput12")


回答2:

The problem is that you convert col7 the wrong way, you try to cast it to DoubleType instead of parsing it to a scala Double (using .toDouble). Your cast will always throw an Exception, and thus col7 will always be 0.0. This works:

val rdd = sqlContext.textFile("input.txt")
  .map(line => line.split("\\t"))
    .map((arr: Array[String]) => Row(
    arr(0).toInt,
    Try(arr(1).toDouble) getOrElse (0.0),
    Try(arr(2).toInt) getOrElse (0),
    Try(arr(3).toDouble) getOrElse (0.0),
    Try(arr(4).toDouble) getOrElse (0.0),
    Try(arr(5).toDouble) getOrElse (0.0),
    Try(arr(6).toDouble) getOrElse (0.0),
    Try(arr(7).toDouble) getOrElse (0.0)
    )
  )


回答3:

Try this a bit more concise version (assuming you work from spark-shell). It should work.

val df = spark
  .read
  .option("header","false")
  .option("sep","\t")
  .option("inferSchema","true")
  .csv("...input...")
  .toDF("ID","col1","col2","col3","col4","col5","col6","col7")

val result = df.groupBy("ID").mean("col7")

result
  .write
  .option("header","true")
  .option("sep",";")
  .csv("...output...")