我有一个RDD[HbaseRecord]
它包含一个自定义的复杂类型Name
。 这两个类被定义如下:
class HbaseRecord(
val uuid: String,
val timestamp: String,
val name: Name
)
class Name(
val firstName: String,
val middleName: String,
val lastName: String
)
在我的代码中某些时候,我想生成从RDD一个数据帧,这样我就可以将其保存为文件的Avro。 我试过如下:
//I get an Object from Hbase here
val objectRDD : RDD[HbaseRecord] = ...
//I convert the RDD[HbaseRecord] into RDD[Row]
val rowRDD : RDD[Row] = objectRDD .map(
hbaseRecord => {
val uuid : String = hbaseRecord.uuid
val timestamp : String = hbaseRecord.timestamp
val name : Name = hbaseRecord.name
Row(uuid, timestamp, name)
})
//Here I define the schema
val schema = new StructType()
.add("uuid",StringType)
.add("timestamp", StringType)
.add("name", new StructType()
.add("firstName",StringType)
.add("middleName",StringType)
.add("lastName",StringType)
//Now I try to create a Dataframe using the RDD[Row] and the schema
val dataFrame = sqlContext.createDataFrame(rowRDD , schema)
不过,我收到以下错误:
scala.MatchError:(的类java.lang.String)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter .toCatalystImpl(CatalystTypeConverters.scala:250)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters .scala:260)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102 )在org.apache.spark.sql.catalyst.CatalystTypeConverters $$ anonfun $ createToCatalystConverter $ 2.适用(CatalystTypeConverters.scala:401)在org.apache.spark.sql.SQLContext $$ anonfun $ 7.适用(SQLContext.scala:492 )在org.apache.spa rk.sql.SQLContext $$ anonfun $ 7.适用(SQLContext.scala:492)在scala.collection.Iterator $$不久$ 11.next(Iterator.scala:328)在scala.collection.Iterator $$不久$ 11.next( Iterator.scala:328)在scala.collection.Iterator $$匿名$ 10.next(Iterator.scala:312)在scala.collection.Iterator $ class.foreach(Iterator.scala:727)在scala.collection.AbstractIterator.foreach (Iterator.scala:1157)在scala.collection.generic.Growable $类$加$加$ EQ(Growable.scala:48)。在scala.collection.mutable.ArrayBuffer $加$加$ EQ(ArrayBuffer.scala :103)在scala.collection.mutable.ArrayBuffer $ $加加$当量(ArrayBuffer.scala:在scala.collection.TraversableOnce $ class.to 47)(TraversableOnce.scala:273)在scala.collection.AbstractIterator.to (Iterator.scala:1157)在scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)在scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)在scala.collection.TraversableOnce $ class.toArray( TraversableOnce.scala:252)在scala.collection.AbstractIterator.toAr 射线(Iterator.scala:1157)在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.apply(SparkPlan.scala:212)在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858):适用(212 SparkPlan.scala)在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext。阶:1858)在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)在org.apache.spark.scheduler.Task.run(Task.scala:89)在org.apache.spark.executor .Executor $ TaskRunner.run(Executor.scala:213)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617)在java.lang.Thread.run(Thread.java:745)
我试图从行移除复杂类型,因此这将是Row[String, String]
并且没有错误,则。 所以,我认为这个问题是复杂的类型。
我究竟做错了什么? 或者我可以遵循什么其他的方法来生成具有复杂类型数据帧?