How to use User Defined Types in Spark 2.0?

2020-07-11 09:55发布

问题:

In Spark 2.0, the one example I've found of creating a UDT in Scala seems to no longer be applicable. The UserDefinedType class has been set as private, with the comment:

Note: This was previously a developer API in Spark 1.x. We are making this private in Spark 2.0 because we will very likely create a new version of this that works better with Datasets.

It might be the intent of UDTRegistration to be the new mechanism of declaring UDT, but it's also private.

So far, my research tells me that there is no way to declare your own UDTs in Spark 2.0; is this conclusion correct?

回答1:

Well you are right for now, the Spark 2.x has no more any kind of UDT to use as an API that was like in Spark 1.x.

You can see here in this ticket SPARK-14155 that they make it privet to create a new API. That we have a ticket open since Spark 1.5 that we wish that will be closed in Spark 2.2 SPARK-7768.

Well, types are not good for now to create your UDT but... There few tricks that you can set your custom objects to a DataSet. Here is one example.



回答2:

You can get UDTs to work with Spark using UDTRegistration but you have to use a private API to do it which may not be supported in the future. Use this approach with great caution and only when absolutely necessary. For some use-cases, unfortunately, there is no other option.

Say you want to use a Polymorphic Record:

trait CustomPoly
case class FooPoly(id:Int) extends CustomPoly
case class BarPoly(value:String, secondValue:Long) extends CustomPoly

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

You can write a custom UDT that encodes everything to bytes (I'm using java serialization here but it's probably better to instrument Spark's Kryo context).

First define the UDT class:

class CustomPolyUDT extends UserDefinedType[CustomPoly] {
  val kryo = new Kryo()

  override def sqlType: DataType = org.apache.spark.sql.types.BinaryType
  override def serialize(obj: CustomPoly): Any = {
    val bos = new ByteArrayOutputStream()
    val oos = new ObjectOutputStream(bos)
    oos.writeObject(obj)

    bos.toByteArray
  }
  override def deserialize(datum: Any): CustomPoly = {
    val bis = new ByteArrayInputStream(datum.asInstanceOf[Array[Byte]])
    val ois = new ObjectInputStream(bis)
    val obj = ois.readObject()
    obj.asInstanceOf[CustomPoly]
  }

  override def userClass: Class[CustomPoly] = classOf[CustomPoly]
}

Then register it:

// NOTE: The file you do this in has to be inside of the org.apache.spark package!
UDTRegistration.register(classOf[CustomPoly].getName, classOf[CustomPolyUDT].getName)

Then you can use it!

// As shown above:
case class UsingPoly(id:Int, poly:CustomPoly)

Seq(
  UsingPoly(1, new FooPoly(1)),
  UsingPoly(2, new BarPoly("Blah", 123)),
  UsingPoly(3, new FooPoly(1))
).toDS

polySeq.filter(_.poly match {
  case FooPoly(value) => value == 1
  case _ => false
}).show()

Check out my original post here, it has an additional example: How to store custom objects in Dataset?

Edit: This post was down-voted for understandable reasons. I included a caveat-emptor at the top hopefully to prevent misunderstandings.