如何创建RDD [行]星火据帧当行包含复杂的类型(How to create Spark DataF

2019-10-29 18:23发布

我有一个RDD[HbaseRecord]它包含一个自定义的复杂类型Name 。 这两个类被定义如下:

class HbaseRecord(
      val uuid: String,
      val timestamp: String,
      val name: Name
)

class Name(    
    val firstName:                String,     
    val middleName:               String,       
    val lastName:                 String
)

在我的代码中某些时候,我想生成从RDD一个数据帧,这样我就可以将其保存为文件的Avro。 我试过如下:

//I get an Object from Hbase here
val objectRDD : RDD[HbaseRecord] = ... 

//I convert the RDD[HbaseRecord] into RDD[Row]
val rowRDD : RDD[Row] = objectRDD .map(
    hbaseRecord => {
      val uuid : String = hbaseRecord.uuid
      val timestamp : String = hbaseRecord.timestamp
      val name : Name = hbaseRecord.name

      Row(uuid, timestamp, name)
    })

//Here I define the schema
   val schema = new StructType()
                  .add("uuid",StringType)
                  .add("timestamp", StringType)
                  .add("name", new StructType()
                                  .add("firstName",StringType)
                                  .add("middleName",StringType)
                                  .add("lastName",StringType)

//Now I try to create a Dataframe using the RDD[Row] and the schema
val dataFrame = sqlContext.createDataFrame(rowRDD , schema)

不过,我收到以下错误:

scala.MatchError:(的类java.lang.String)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:255)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter .toCatalystImpl(CatalystTypeConverters.scala:250)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters .scala:260)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ StructConverter.toCatalystImpl(CatalystTypeConverters.scala:250)在org.apache.spark.sql.catalyst.CatalystTypeConverters $ CatalystTypeConverter.toCatalyst(CatalystTypeConverters.scala:102 )在org.apache.spark.sql.catalyst.CatalystTypeConverters $$ anonfun $ createToCatalystConverter $ 2.适用(CatalystTypeConverters.scala:401)在org.apache.spark.sql.SQLContext $$ anonfun $ 7.适用(SQLContext.scala:492 )在org.apache.spa rk.sql.SQLContext $$ anonfun $ 7.适用(SQLContext.scala:492)在scala.collection.Iterator $$不久$ 11.next(Iterator.scala:328)在scala.collection.Iterator $$不久$ 11.next( Iterator.scala:328)在scala.collection.Iterator $$匿名$ 10.next(Iterator.scala:312)在scala.collection.Iterator $ class.foreach(Iterator.scala:727)在scala.collection.AbstractIterator.foreach (Iterator.scala:1157)在scala.collection.generic.Growable $类$加$加$ EQ(Growable.scala:48)。在scala.collection.mutable.ArrayBuffer $加$加$ EQ(ArrayBuffer.scala :103)在scala.collection.mutable.ArrayBuffer $ $加加$当量(ArrayBuffer.scala:在scala.collection.TraversableOnce $ class.to 47)(TraversableOnce.scala:273)在scala.collection.AbstractIterator.to (Iterator.scala:1157)在scala.collection.TraversableOnce $ class.toBuffer(TraversableOnce.scala:265)在scala.collection.AbstractIterator.toBuffer(Iterator.scala:1157)在scala.collection.TraversableOnce $ class.toArray( TraversableOnce.scala:252)在scala.collection.AbstractIterator.toAr 射线(Iterator.scala:1157)在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5.apply(SparkPlan.scala:212)在org.apache.spark.sql.execution.SparkPlan $$ anonfun $ 5在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext.scala:1858):适用(212 SparkPlan.scala)在org.apache.spark.SparkContext $$ anonfun $ runJob $ 5.apply(SparkContext。阶:1858)在org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:66)在org.apache.spark.scheduler.Task.run(Task.scala:89)在org.apache.spark.executor .Executor $ TaskRunner.run(Executor.scala:213)在java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)在java.util.concurrent.ThreadPoolExecutor中的$ Worker.run(ThreadPoolExecutor.java:617)在java.lang.Thread.run(Thread.java:745)

我试图从行移除复杂类型,因此这将是Row[String, String]并且没有错误,则。 所以,我认为这个问题是复杂的类型。

我究竟做错了什么? 或者我可以遵循什么其他的方法来生成具有复杂类型数据帧?

Answer 1:

我只是用简单的case class为这个而不是类。 name列不符合定义的架构。 转换的name列列式,它应该工作。

val rowRDD : RDD[Row] = objectRDD .map(
    hbaseRecord => {
      val uuid : String = hbaseRecord.uuid
      val timestamp : String = hbaseRecord.timestamp
      val name = Row(hbaseRecord.name.firstName,
                     hbaseRecord.name.middleName,hbaseRecord.name.lastName)
      Row(uuid, timestamp, name)
    })


文章来源: How to create Spark DataFrame from RDD[Row] when Row contains complex types