How to change column types in Spark SQL's Data

2019-01-03 11:36发布

Suppose I'm doing something like:

val df = sqlContext.load("com.databricks.spark.csv", Map("path" -> "cars.csv", "header" -> "true"))
df.printSchema()

root
 |-- year: string (nullable = true)
 |-- make: string (nullable = true)
 |-- model: string (nullable = true)
 |-- comment: string (nullable = true)
 |-- blank: string (nullable = true)

df.show()
year make  model comment              blank
2012 Tesla S     No comment                
1997 Ford  E350  Go get one now th...  

but I really wanted the year as Int (and perhaps transform some other columns).

The best I could come up with is

df.withColumn("year2", 'year.cast("Int")).select('year2 as 'year, 'make, 'model, 'comment, 'blank)
org.apache.spark.sql.DataFrame = [year: int, make: string, model: string, comment: string, blank: string]

which is a bit convoluted.

I'm coming from R, and I'm used to being able to write, e.g.

df2 <- df %>%
   mutate(year = year %>% as.integer, 
          make = make %>% toupper)

I'm likely missing something, since there should be a better way to do this in spark/scala...

16条回答
再贱就再见
2楼-- · 2019-01-03 12:06

[EDIT: March 2016: thanks for the votes! Though really, this is not the best answer, I think the solutions based on withColumn, withColumnRenamed and cast put forward by msemelman, Martin Senne and others are simpler and cleaner].

I think your approach is ok, recall that a Spark DataFrame is an (immutable) RDD of Rows, so we're never really replacing a column, just creating new DataFrame each time with a new schema.

Assuming you have an original df with the following schema:

scala> df.printSchema
root
 |-- Year: string (nullable = true)
 |-- Month: string (nullable = true)
 |-- DayofMonth: string (nullable = true)
 |-- DayOfWeek: string (nullable = true)
 |-- DepDelay: string (nullable = true)
 |-- Distance: string (nullable = true)
 |-- CRSDepTime: string (nullable = true)

And some UDF's defined on one or several columns:

import org.apache.spark.sql.functions._

val toInt    = udf[Int, String]( _.toInt)
val toDouble = udf[Double, String]( _.toDouble)
val toHour   = udf((t: String) => "%04d".format(t.toInt).take(2).toInt ) 
val days_since_nearest_holidays = udf( 
  (year:String, month:String, dayOfMonth:String) => year.toInt + 27 + month.toInt-12
 )

Changing column types or even building a new DataFrame from another can be written like this:

val featureDf = df
.withColumn("departureDelay", toDouble(df("DepDelay")))
.withColumn("departureHour",  toHour(df("CRSDepTime")))
.withColumn("dayOfWeek",      toInt(df("DayOfWeek")))              
.withColumn("dayOfMonth",     toInt(df("DayofMonth")))              
.withColumn("month",          toInt(df("Month")))              
.withColumn("distance",       toDouble(df("Distance")))              
.withColumn("nearestHoliday", days_since_nearest_holidays(
              df("Year"), df("Month"), df("DayofMonth"))
            )              
.select("departureDelay", "departureHour", "dayOfWeek", "dayOfMonth", 
        "month", "distance", "nearestHoliday")            

which yields:

scala> df.printSchema
root
 |-- departureDelay: double (nullable = true)
 |-- departureHour: integer (nullable = true)
 |-- dayOfWeek: integer (nullable = true)
 |-- dayOfMonth: integer (nullable = true)
 |-- month: integer (nullable = true)
 |-- distance: double (nullable = true)
 |-- nearestHoliday: integer (nullable = true)

This is pretty close to your own solution. Simply, keeping the type changes and other transformations as separate udf vals make the code more readable and re-usable.

查看更多
Luminary・发光体
3楼-- · 2019-01-03 12:06

As the cast operation is available for Spark Column's (and as I personally do not favour udf's as proposed by @Svend at this point), how about:

df.select( df("year").cast(IntegerType).as("year"), ... )

to cast to the requested type? As a neat side effect, values not castable / "convertable" in that sense, will become null.

In case you need this as a helper method, use:

object DFHelper{
  def castColumnTo( df: DataFrame, cn: String, tpe: DataType ) : DataFrame = {
    df.withColumn( cn, df(cn).cast(tpe) )
  }
}

which is used like:

import DFHelper._
val df2 = castColumnTo( df, "year", IntegerType )
查看更多
贼婆χ
4楼-- · 2019-01-03 12:06

This method will drop the old column and create new columns with same values and new datatype. My original datatypes when the DataFrame was created were:-

root
 |-- id: integer (nullable = true)
 |-- flag1: string (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag3: string (nullable = true)

After this I ran following code to change the datatype:-

df=df.withColumnRenamed(<old column name>,<dummy column>) // This was done for both flag1 and flag3
df=df.withColumn(<old column name>,df.col(<dummy column>).cast(<datatype>)).drop(<dummy column>)

After this my result came out to be:-

root
 |-- id: integer (nullable = true)
 |-- flag2: string (nullable = true)
 |-- name: string (nullable = true)
 |-- flag1: boolean (nullable = true)
 |-- flag3: boolean (nullable = true)
查看更多
时光不老,我们不散
5楼-- · 2019-01-03 12:09

So this only really works if your having issues saving to a jdbc driver like sqlserver, but it's really helpful for errors you will run into with syntax and types.

import org.apache.spark.sql.jdbc.{JdbcDialects, JdbcType, JdbcDialect}
import org.apache.spark.sql.jdbc.JdbcType
val SQLServerDialect = new JdbcDialect {
  override def canHandle(url: String): Boolean = url.startsWith("jdbc:jtds:sqlserver") || url.contains("sqlserver")

  override def getJDBCType(dt: DataType): Option[JdbcType] = dt match {
    case StringType => Some(JdbcType("VARCHAR(5000)", java.sql.Types.VARCHAR))
    case BooleanType => Some(JdbcType("BIT(1)", java.sql.Types.BIT))
    case IntegerType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case LongType => Some(JdbcType("BIGINT", java.sql.Types.BIGINT))
    case DoubleType => Some(JdbcType("DOUBLE PRECISION", java.sql.Types.DOUBLE))
    case FloatType => Some(JdbcType("REAL", java.sql.Types.REAL))
    case ShortType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case ByteType => Some(JdbcType("INTEGER", java.sql.Types.INTEGER))
    case BinaryType => Some(JdbcType("BINARY", java.sql.Types.BINARY))
    case TimestampType => Some(JdbcType("DATE", java.sql.Types.DATE))
    case DateType => Some(JdbcType("DATE", java.sql.Types.DATE))
    //      case DecimalType.Fixed(precision, scale) => Some(JdbcType("NUMBER(" + precision + "," + scale + ")", java.sql.Types.NUMERIC))
    case t: DecimalType => Some(JdbcType(s"DECIMAL(${t.precision},${t.scale})", java.sql.Types.DECIMAL))
    case _ => throw new IllegalArgumentException(s"Don't know how to save ${dt.json} to JDBC")
  }
}

JdbcDialects.registerDialect(SQLServerDialect)
查看更多
三岁会撩人
6楼-- · 2019-01-03 12:09

Generate a simple dataset containing five values and convert int to string type:

val df = spark.range(5).select( col("id").cast("string") )
查看更多
仙女界的扛把子
7楼-- · 2019-01-03 12:10

You can use below code.

df.withColumn("year", df("year").cast(IntegerType))

Which will convert year column to IntegerType column.

查看更多
登录 后发表回答