How to change case of whole pyspark dataframe to l

2019-07-22 03:07发布

I am trying to apply pyspark sql functions hash algorithm for every row in two dataframes to identify the differences. Hash algorithm is case sensitive .i.e. if column contains 'APPLE' and 'Apple' are considered as two different values, so I want to change the case for both dataframes to either upper or lower. I am able to achieve only for dataframe headers but not for dataframe values.Please help

#Code for Dataframe column headers
self.df_db1 =self.df_db1.toDF(*[c.lower() for c in self.df_db1.columns])

3条回答
神经病院院长
2楼-- · 2019-07-22 03:14

You can generate an expression using list comprehension:

from pyspark.sql import functions as psf
expression = [ psf.lower(psf.col(x)).alias(x) for x in df.columns ]

And then just call it over your existing dataframe

>>> df.show()
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  A|  B|  C|  D|
+---+---+---+---+

>>> df.select(*select_expression).show()
+---+---+---+---+
| c1| c2| c3| c4|
+---+---+---+---+
|  a|  b|  c|  d|
+---+---+---+---+
查看更多
趁早两清
3楼-- · 2019-07-22 03:17

Assuming df is your dataframe, this should do the work:

from pyspark.sql import functions as F
for col in df.columns:
    df = df.withColumn(col, F.lower(F.col(col)))
查看更多
Anthone
4楼-- · 2019-07-22 03:25

Both answers seems to be ok with one exception - if you have numeric column, it will be converted to string column. To avoid this, try:

import org.apache.spark.sql.types._
import org.apache.spark.sql.functions._
val fields = df.schema.fields
val stringFields = df.schema.fields.filter(f => f.dataType == StringType)
val nonStringFields = df.schema.fields.filter(f => f.dataType != StringType).map(f => f.name).map(f => col(f))

val stringFieldsTransformed = stringFields .map (f => f.name).map(f => upper(col(f)).as(f))
val df = sourceDF.select(stringFieldsTransformed ++ nonStringFields: _*)

Now types are correct also when you have non-string fields, i.e. numeric fields). If you know that each column is of String type, use one of the other answers - they are correct in that cases :)

Python code in PySpark:

from pyspark.sql.functions import *
from pyspark.sql.types import *
sourceDF = spark.createDataFrame([(1, "a")], ['n', 'n1'])
 fields = sourceDF.schema.fields
stringFields = filter(lambda f: isinstance(f.dataType, StringType), fields)
nonStringFields = map(lambda f: col(f.name), filter(lambda f: not isinstance(f.dataType, StringType), fields))
stringFieldsTransformed = map(lambda f: upper(col(f.name)), stringFields) 
allFields = [*stringFieldsTransformed, *nonStringFields]
df = sourceDF.select(allFields)
查看更多
登录 后发表回答