I am trying to implement a custom UDT and be able to reference it from Spark SQL (as explained in the Spark SQL whitepaper, section 4.4.2).
The real example is to have a custom UDT backed by an off-heap data structure using Cap'n Proto, or similar.
For this posting, I have made up a contrived example. I know that I could just use Scala case classes and not have to do any work at all, but that isn't my goal.
For example, I have a Person
containing several attributes and I want to be able to SELECT person.first_name FROM person
. I'm running into the error Can't extract value from person#1
and I'm not sure why.
Here is the full source (also available at https://github.com/andygrove/spark-sql-udt)
package com.theotherandygrove
import org.apache.spark.sql.types._
import org.apache.spark.sql.{Row, SQLContext}
import org.apache.spark.{SparkConf, SparkContext}
object Example {
def main(arg: Array[String]): Unit = {
val conf = new SparkConf()
.setAppName("Example")
.setMaster("local[*]")
val sc = new SparkContext(conf)
val sqlContext = new SQLContext(sc)
val schema = StructType(List(
StructField("person_id", DataTypes.IntegerType, true),
StructField("person", new MockPersonUDT, true)))
// load initial RDD
val rdd = sc.parallelize(List(
MockPersonImpl(1),
MockPersonImpl(2)
))
// convert to RDD[Row]
val rowRdd = rdd.map(person => Row(person.getAge, person))
// convert to DataFrame (RDD + Schema)
val dataFrame = sqlContext.createDataFrame(rowRdd, schema)
// register as a table
dataFrame.registerTempTable("person")
// selecting the whole object works fine
val results = sqlContext.sql("SELECT person.first_name FROM person WHERE person.age < 100")
val people = results.collect
people.map(row => {
println(row)
})
}
}
trait MockPerson {
def getFirstName: String
def getLastName: String
def getAge: Integer
def getState: String
}
class MockPersonUDT extends UserDefinedType[MockPerson] {
override def sqlType: DataType = StructType(List(
StructField("firstName", StringType, nullable=false),
StructField("lastName", StringType, nullable=false),
StructField("age", IntegerType, nullable=false),
StructField("state", StringType, nullable=false)
))
override def userClass: Class[MockPerson] = classOf[MockPerson]
override def serialize(obj: Any): Any = obj.asInstanceOf[MockPersonImpl].getAge
override def deserialize(datum: Any): MockPerson = MockPersonImpl(datum.asInstanceOf[Integer])
}
@SQLUserDefinedType(udt = classOf[MockPersonUDT])
@SerialVersionUID(123L)
case class MockPersonImpl(n: Integer) extends MockPerson with Serializable {
def getFirstName = "First" + n
def getLastName = "Last" + n
def getAge = n
def getState = "AK"
}
If I simply SELECT person FROM person
then the query works. I just can't reference the attributes in SQL, even though they are defined in the schema.