Looking at the new spark dataframe api, it is unclear whether it is possible to modify dataframe columns.
How would I go about changing a value in row x
column y
of a dataframe?
In pandas
this would be df.ix[x,y] = new_value
Edit: Consolidating what was said below, you can't modify the existing dataframe as it is immutable, but you can return a new dataframe with the desired modifications.
If you just want to replace a value in a column based on a condition, like np.where
:
from pyspark.sql import functions as F
update_func = (F.when(F.col('update_col') == replace_val, new_value)
.otherwise(F.col('update_col')))
df = df.withColumn('new_column_name', update_func)
If you want to perform some operation on a column and create a new column that is added to the dataframe:
import pyspark.sql.functions as F
import pyspark.sql.types as T
def my_func(col):
do stuff to column here
return transformed_value
# if we assume that my_func returns a string
my_udf = F.UserDefinedFunction(my_func, T.StringType())
df = df.withColumn('new_column_name', my_udf('update_col'))
If you want the new column to have the same name as the old column, you could add the additional step:
df = df.drop('update_col').withColumnRenamed('new_column_name', 'update_col')
DataFrames
are based on RDDs. RDDs are immutable structures and do not allow updating elements on-site. To change values, you will need to create a new DataFrame by transforming the original one either using the SQL-like DSL or RDD operations likemap
.A highly recommended slide deck: Introducing DataFrames in Spark for Large Scale Data Science.
While you cannot modify a column as such, you may operate on a column and return a new DataFrame reflecting that change. For that you'd first create a
UserDefinedFunction
implementing the operation to apply and then selectively apply that function to the targeted column only. In Python:new_df
now has the same schema asold_df
(assuming thatold_df.target_column
was of typeStringType
as well) but all values in columntarget_column
will benew_value
.Just as maasg says you can create a new DataFrame from the result of a map applied to the old DataFrame. An example for a given DataFrame
df
with two rows:Note that if the types of the columns change, you need to give it a correct schema instead of
df.schema
. Check out the api oforg.apache.spark.sql.Row
for available methods: https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/Row.html[Update] Or using UDFs in Scala:
and if the column name needs to stay the same you can rename it back:
Commonly when updating a column, we want to map an old value to a new value. Here's a way to do that in pyspark without UDF's: