Encoder error while trying to map dataframe row to

2019-01-01 11:28发布

问题:

When I m trying to do the same thing in my code as mentioned below

dataframe.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == \"tesla\") \"S\" else row1
  Row(row(0),make,row(2))
})

I have taken the above reference from here: Scala: How can I replace value in Dataframs using scala But I am getting encoder error as

Unable to find encoder for type stored in a Dataset. Primitive types (Int, S tring, etc) and Product types (case classes) are supported by importing spark.im plicits._ Support for serializing other types will be added in future releases.

Note: I am using spark 2.0!

回答1:

There is nothing unexpected here. You\'re trying to use code which has been written with Spark 1.x and is no longer supported in Spark 2.0:

  • in 1.x DataFrame.map is ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
  • in 2.x Dataset[Row].map is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]

To be honest it didn\'t make much sense in 1.x either. Independent of version you can simply use DataFrame API:

import org.apache.spark.sql.functions.{when, lower}

val df = Seq(
  (2012, \"Tesla\", \"S\"), (1997, \"Ford\", \"E350\"),
  (2015, \"Chevy\", \"Volt\")
).toDF(\"year\", \"make\", \"model\")

df.withColumn(\"make\", when(lower($\"make\") === \"tesla\", \"S\").otherwise($\"make\"))

If you really want to use map you should use statically typed Dataset:

import spark.implicits._

case class Record(year: Int, make: String, model: String)

df.as[Record].map {
  case tesla if tesla.make.toLowerCase == \"tesla\" => tesla.copy(make = \"S\")
  case rec => rec
}

or at least return an object which will have implicit encoder:

df.map {
  case Row(year: Int, make: String, model: String) => 
    (year, if(make.toLowerCase == \"tesla\") \"S\" else make, model)
}

Finally if for some completely crazy reason you really want to map over Dataset[Row] you have to provide required encoder:

import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row

// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
  StructField(\"year\", IntegerType),
  StructField(\"make\", StringType),
  StructField(\"model\", StringType)
))

val encoder = RowEncoder(schema)

df.map {
  case Row(year, make: String, model) if make.toLowerCase == \"tesla\" => 
    Row(year, \"S\", model)
  case row => row
} (encoder)


回答2:

For scenario where dataframe schema is known in advance answer given by @zero323 is the solution

but for scenario with dynamic schema / or passing multiple dataframe to a generic function: Following code has worked for us while migrating from 1.6.1 from 2.2.0

import org.apache.spark.sql.Row

val df = Seq(
   (2012, \"Tesla\", \"S\"), (1997, \"Ford\", \"E350\"),
   (2015, \"Chevy\", \"Volt\")
 ).toDF(\"year\", \"make\", \"model\")

val data = df.rdd.map(row => {
  val row1 = row.getAs[String](1)
  val make = if (row1.toLowerCase == \"tesla\") \"S\" else row1
  Row(row(0),make,row(2))
})

this code executes on both the versions of spark.

disadvantage : optimization provided by spark on dataframe/datasets api wont be applied.