我哈瓦一个RDD[List[Int]]
我不知道的计数list[Int]
我想转换我Rdd[List[Int]]
到DataFrame
,应该怎么办?
这是我输入:
val l1=Array(1,2,3,4)
val l2=Array(1,2,3,4)
val Lz=Seq(l1,l2)
val rdd1=sc.parallelize(Lz,2)
这是我期待的结果:
+---+---+---+---+
| _1| _2| _3| _4|
+---+---+---+---+
| 1| 2| 3| 4|
| 1| 2| 3| 4|
+---+---+---+---+
可能有一些其他的更好的功能性的方式来做到这一点,但这个工程太:
def getSchema(myArray : Array[Int]): StructType = {
var schemaArray = scala.collection.mutable.ArrayBuffer[StructField]()
for((el,idx) <- myArray.view.zipWithIndex){
schemaArray += StructField("col"+idx , IntegerType, true)
}
StructType(schemaArray)
}
val l1=Array(1,2,3,4)
val l2=Array(1,2,3,4)
val Lz=Seq(l1,l2)
val rdd1=sc.parallelize(Lz,2).map(Row.fromSeq(_))
val schema = getSchema(l1) //Since both arrays will be of same type and size
val df = sqlContext.createDataFrame(rdd1, schema)
df.show()
+----+----+----+----+
|col0|col1|col2|col3|
+----+----+----+----+
| 1| 2| 3| 4|
| 1| 2| 3| 4|
+----+----+----+----+
你可以做到以下几点:
val l1=Array(1,2,3,4)
val l2=Array(1,2,3,4)
val Lz=Seq(l1,l2)
val df = sc.parallelize(Lz,2).map{
case Array(val1, val2, val3, val4) => (val1, val2, val3, val4)
}.toDF
df.show
// +---+---+---+---+
// | _1| _2| _3| _4|
// +---+---+---+---+
// | 1| 2| 3| 4|
// | 1| 2| 3| 4|
// +---+---+---+---+
如果你有大量的列,您需要进行不同的,但你需要知道你的数据的方案,否则你将无法执行以下操作:
val sch = df.schema // I just took the schema from the old df but you can add one programmatically
val df2 = spark.createDataFrame(sc.parallelize(Lz,2).map{ Row.fromSeq(_) }, sch)
df2.show
// +---+---+---+---+
// | _1| _2| _3| _4|
// +---+---+---+---+
// | 1| 2| 3| 4|
// | 1| 2| 3| 4|
// +---+---+---+---+
除非你提供一个模式,你将无法做太多,除了具有阵列列:
val df3 = sc.parallelize(Lz,2).toDF
// df3: org.apache.spark.sql.DataFrame = [value: array<int>]
df3.show
// +------------+
// | value|
// +------------+
// |[1, 2, 3, 4]|
// |[1, 2, 3, 4]|
// +------------+
df3.printSchema
//root
// |-- value: array (nullable = true)
// | |-- element: integer (containsNull = false)