How to “negative select” columns in spark's da

2019-01-23 13:50发布

问题:

I can't figure it out, but guess it's simple. I have a spark dataframe df. This df has columns "A","B" and "C". Now let's say I have an Array containing the name of the columns of this df:

column_names = Array("A","B","C")

I'd like to do a df.select() in such a way, that I can specify which columns not to select. Example: let's say I do not want to select columns "B". I tried

df.select(column_names.filter(_!="B"))

but this does not work, as

org.apache.spark.sql.DataFrame cannot be applied to (Array[String])

So, here it says it should work with a Seq instead. However, trying

df.select(column_names.filter(_!="B").toSeq)

results in

org.apache.spark.sql.DataFrame cannot be applied to (Seq[String]).

What am I doing wrong?

回答1:

Since Spark 1.4 you can use drop method:

Scala:

case class Point(x: Int, y: Int)
val df = sqlContext.createDataFrame(Point(0, 0) :: Point(1, 2) :: Nil)
df.drop("y")

Python:

df = sc.parallelize([(0, 0), (1, 2)]).toDF(["x", "y"])
df.drop("y")
## DataFrame[x: bigint]


回答2:

I had the same problem and solved it this way (oaffdf is a dataframe):

val dropColNames = Seq("col7","col121")
val featColNames = oaffdf.columns.diff(dropColNames)
val featCols = featColNames.map(cn => org.apache.spark.sql.functions.col(cn))
val featsdf = oaffdf.select(featCols: _*)

https://forums.databricks.com/questions/2808/select-dataframe-columns-from-a-sequence-of-string.html



回答3:

OK, it's ugly, but this quick spark shell session shows something that works:

scala> val myRDD = sc.parallelize(List.range(1,10))
myRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[17] at parallelize at <console>:21

scala> val myDF = myRDD.toDF("a")
myDF: org.apache.spark.sql.DataFrame = [a: int]

scala> val myOtherRDD = sc.parallelize(List.range(1,10))
myOtherRDD: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[20] at parallelize at <console>:21

scala> val myotherDF = myRDD.toDF("b")
myotherDF: org.apache.spark.sql.DataFrame = [b: int]

scala> myDF.unionAll(myotherDF)
res2: org.apache.spark.sql.DataFrame = [a: int]

scala> myDF.join(myotherDF)
res3: org.apache.spark.sql.DataFrame = [a: int, b: int]

scala> val twocol = myDF.join(myotherDF)
twocol: org.apache.spark.sql.DataFrame = [a: int, b: int]

scala> val cols = Array("a", "b")
cols: Array[String] = Array(a, b)

scala> val selectedCols = cols.filter(_!="b")
selectedCols: Array[String] = Array(a)

scala> twocol.select(selectedCols.head, selectedCols.tail: _*)
res4: org.apache.spark.sql.DataFrame = [a: int]

Providings varargs to a function that requires one is treated in other SO questions. The signature of select is there to ensure your list of selected columns is not empty – which makes the conversion from the list of selected columns to varargs a bit more complex.



回答4:

val columns = Seq("A","B","C")

df.select(columns.diff(Seq("B")))


回答5:

In pyspark you can do

df.select(list(set(df.columns) - set(["B"])))

Using more than one line you can also do

cols = df.columns
cols.remove("B")
df.select(cols)


回答6:

Will be possible to do through [SPARK-12139] REGEX Column Specification for Hive Queries

https://issues.apache.org/jira/browse/SPARK-12139



回答7:

//selectWithout allows you to specify which columns to omit:

df.selectWithout("B")