When I m trying to do the same thing in my code as mentioned below
dataframe.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == \"tesla\") \"S\" else row1
Row(row(0),make,row(2))
})
I have taken the above reference from here:
Scala: How can I replace value in Dataframs using scala
But I am getting encoder error as
Unable to find encoder for type stored in a Dataset. Primitive types
(Int, S tring, etc) and Product types (case classes) are supported by
importing spark.im plicits._ Support for serializing other types will
be added in future releases.
Note: I am using spark 2.0!
There is nothing unexpected here. You\'re trying to use code which has been written with Spark 1.x and is no longer supported in Spark 2.0:
- in 1.x
DataFrame.map
is ((Row) ⇒ T)(ClassTag[T]) ⇒ RDD[T]
- in 2.x
Dataset[Row].map
is ((Row) ⇒ T)(Encoder[T]) ⇒ Dataset[T]
To be honest it didn\'t make much sense in 1.x either. Independent of version you can simply use DataFrame
API:
import org.apache.spark.sql.functions.{when, lower}
val df = Seq(
(2012, \"Tesla\", \"S\"), (1997, \"Ford\", \"E350\"),
(2015, \"Chevy\", \"Volt\")
).toDF(\"year\", \"make\", \"model\")
df.withColumn(\"make\", when(lower($\"make\") === \"tesla\", \"S\").otherwise($\"make\"))
If you really want to use map
you should use statically typed Dataset
:
import spark.implicits._
case class Record(year: Int, make: String, model: String)
df.as[Record].map {
case tesla if tesla.make.toLowerCase == \"tesla\" => tesla.copy(make = \"S\")
case rec => rec
}
or at least return an object which will have implicit encoder:
df.map {
case Row(year: Int, make: String, model: String) =>
(year, if(make.toLowerCase == \"tesla\") \"S\" else make, model)
}
Finally if for some completely crazy reason you really want to map over Dataset[Row]
you have to provide required encoder:
import org.apache.spark.sql.catalyst.encoders.RowEncoder
import org.apache.spark.sql.types._
import org.apache.spark.sql.Row
// Yup, it would be possible to reuse df.schema here
val schema = StructType(Seq(
StructField(\"year\", IntegerType),
StructField(\"make\", StringType),
StructField(\"model\", StringType)
))
val encoder = RowEncoder(schema)
df.map {
case Row(year, make: String, model) if make.toLowerCase == \"tesla\" =>
Row(year, \"S\", model)
case row => row
} (encoder)
For scenario where dataframe schema is known in advance answer given by @zero323 is the solution
but for scenario with dynamic schema / or passing multiple dataframe to a generic function:
Following code has worked for us while migrating from 1.6.1 from 2.2.0
import org.apache.spark.sql.Row
val df = Seq(
(2012, \"Tesla\", \"S\"), (1997, \"Ford\", \"E350\"),
(2015, \"Chevy\", \"Volt\")
).toDF(\"year\", \"make\", \"model\")
val data = df.rdd.map(row => {
val row1 = row.getAs[String](1)
val make = if (row1.toLowerCase == \"tesla\") \"S\" else row1
Row(row(0),make,row(2))
})
this code executes on both the versions of spark.
disadvantage : optimization provided
by spark on dataframe/datasets api wont be applied.