Scala Spark. Create object with default value by D

2019-08-22 07:58发布

问题:

I have a list of org.apache.spark.sql.types.DataType objects, say,
val tps = [FloatType, LongType, FloatType, DoubleType], which I receive from dataframe like this:

val tps = dataFrame.schema
      .filter(f => f.dataType.isInstanceOf[NumericType])
      .map(f => f.dataType)

and for every type in this list I need to create an object of the corresponding type with default value:
[0.0, 0l, 0.0, 0.0]. How can I do that?

I tried doing

tps.map(t => t.getClass.newInstance())

, but it didn't work out, because private members (can not access a member of class org.apache.spark.sql.types.LongType$ with modifiers "private"), and because this statement tries to create objects of DataType, and I need objects of the corresponding types for them.

I'm fairly new to scala, can someone help?

回答1:

I have for testing purposes something like this

object RowSampleMaker {

  var makerRunNumber = 1

  def apply(schema: StructType): Row = new GenericRowWithSchema(schema.map(field => {
      makerRunNumber += 1
      field.dataType match {
        case ShortType => makerRunNumber.toShort
        case IntegerType => makerRunNumber
        case LongType => makerRunNumber.toLong
        case FloatType => makerRunNumber.toFloat
        case DecimalType() => d(makerRunNumber)
        case DateType => new Date(System.currentTimeMillis)
        case TimestampType => new Timestamp(System.currentTimeMillis)
        case StringType => s"arbitrary-$makerRunNumber"
        case BooleanType => false
        case StructType(fields) => apply(StructType(fields))
        case t => throw new Exception(s"Maker doesn't support generating $t")
      }
    }).toArray, schema)

  implicit class RowManipulation(row: Row) {

    def update(fieldName: String, value: Any): Row = new GenericRowWithSchema(
      row.toSeq.updated(row.fieldIndex(fieldName), value).toArray,
      row.schema
    )
  }
}

You can add types, and replace the randomness for 0. Or have another method call .zero which returns all neutral values. The update method on the implicit class is because I generally update a couple of the values for the purpose of testing.

You'd call RowSampleMaker(schema).update("field1", value1).update("field2", value2) For each row you want to generate and then create a dataframe off that



回答2:

I followed hints from @fd8s0's answer, and this is what I came up with:

  def mapToDefault(dataType: DataType): Number = {
    val defaultVal = 0.0
    dataType match {
      case ShortType => defaultVal.toShort
      case IntegerType => defaultVal.toInt
      case LongType => defaultVal.toLong
      case FloatType => defaultVal.toFloat
      case DoubleType => defaultVal.toDouble
      case t => null
    }
  }

...

val defaultValues = dataFrame.schema
    .filter(f => f.dataType.isInstanceOf[NumericType])
    .map(column => mapToDefault(column.dataType))

Thus, mapToDefault method will do the job of creating instance of given DataType with default value (for numeric types only, in my case).