How do I add an persistent column of row ids to Sp

2019-01-08 17:30发布

问题:

This question is not new, however I am finding surprising behavior in Spark. I need to add a column of row IDs to a DataFrame. I used the DataFrame method monotonically_increasing_id() and It does give me an additional col of uniques row IDs (that are NOT consecutive by the way, but are unique).

The problem I'm having is that when I filter the DataFrame the row Ids in the resulting DataFrame are re-assigned. The two DataFrames are shown below.

  • the first one is the initial DataFrame with row IDs added as follows:

    df.withColumn("rowId", monotonically_increasing_id()) 
    
  • the second DataFrame is the one obtained after filtering on the col P via df.filter(col("P")).

The problem is illustrated by the rowId for custId 169, which was 5 in the initial DataFrame, but after filtering that rowId (5) was re-assigned to custmId 773 when custId 169 was filtered out! I don't know why this is the default behavior.

I would want the rowIds to be "sticky"; if I remove rows from the DataFrame I do not want their IDs "re-used", I want them gone too along with their rows. Is it possible to do that? I don't see any flags to request this behavior from monotonically_increasing_id method.

+---------+--------------------+-------+
| custId  |    features|    P  |rowId|
+---------+--------------------+-------+
|806      |[50,5074,...|   true|    0|
|832      |[45,120,1...|   true|    1|
|216      |[6691,272...|   true|    2|
|926      |[120,1788...|   true|    3|
|875      |[54,120,1...|   true|    4|
|169      |[19406,21...|  false|    5|

after filtering on P:
+---------+--------------------+-------+
|   custId|    features|    P  |rowId|
+---------+--------------------+-------+
|      806|[50,5074,...|   true|    0|
|      832|[45,120,1...|   true|    1|
|      216|[6691,272...|   true|    2|
|      926|[120,1788...|   true|    3|
|      875|[54,120,1...|   true|    4|
|      773|[3136,317...|   true|    5|

回答1:

Spark 2.0

  • This is issue has been resolved in Spark 2.0 with SPARK-14241.

  • Another similar issue has been resolved in Spark 2.1 with SPARK-14393

Spark 1.x

Problem you experience is rather subtle but can be reduced to a simple fact monotonically_increasing_id is an extremely ugly function. It is clearly not pure and its value depends on something that is completely out your control.

It doesn't take any parameters so from an optimizer perspective it doesn't matter when it is called and can be pushed after all other operations. Hence the behavior you see.

If you take look at the code you'll find out this is explicitly marked by extending MonotonicallyIncreasingID expression with Nondeterministic.

I don't think there is any elegant solution but one way you can handle this is to add an artificial dependency on the filtered value. For example with an UDF like this:

from pyspark.sql.types import LongType
from pyspark.sql.functions import udf

bound = udf(lambda _, v: v, LongType()) 

(df
  .withColumn("rn", monotonically_increasing_id())
  # Due to nondeterministic behavior it has to be a separate step
  .withColumn("rn", bound("P", "rn"))  
  .where("P"))

In general it could be cleaner to add indices using zipWithIndex on a RDD and then convert it back to a DataFrame.


* Workaround shown above is no longer a valid solution (nor required) in Spark 2.x where Python UDFs are subject of the execution plan optimizations.



回答2:

I couldn't reproduce this. I'm using Spark 2.0 though so maybe the behaviour has changed, or I am not doing the same thing as you.

val df = Seq(("one", 1,true),("two", 2,false),("three", 3,true),("four", 4,true))
.toDF("name", "value","flag")
.withColumn("rowd", monotonically_increasing_id())

df.show

val df2 = df.filter(col("flag")=== true)

df2.show

df: org.apache.spark.sql.DataFrame = [name: string, value: int ... 2 more fields]
+-----+-----+-----+----+
| name|value| flag|rowd|
+-----+-----+-----+----+
|  one|    1| true|   0|
|  two|    2|false|   1|
|three|    3| true|   2|
| four|    4| true|   3|
+-----+-----+-----+----+
df2: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [name: string, value: int ... 2 more fields]
+-----+-----+----+----+
| name|value|flag|rowd|
+-----+-----+----+----+
|  one|    1|true|   0|
|three|    3|true|   2|
| four|    4|true|   3|
+-----+-----+----+----+


回答3:

I was recently working on a similar problem. Although monotonically_increasing_id() is very fast, it is not reliable and will not give you consecutive row numbers, only increasing unique integers.

Creating a windows partition and then using row_number().over(some_windows_partition) is extremely time consuming.

The best solution so far is using zipped with index and then converting the zipped file back to the original dataframe, with the new schema including the index column.

Try this:

from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, LongType

new_schema = StructType(**original_dataframe**.schema.fields[:] + [StructField("index", LongType(), False)])
zipped_rdd = **original_dataframe**.rdd.zipWithIndex()
indexed = (zipped_rdd.map(lambda ri: row_with_index(*list(ri[0]) + [ri[1]])).toDF(new_schema))

Where original_dataframe is the dataframe you have to add an index on and row_with_index is the new schema with the column index which you can write as

row_with_index = Row(
"calendar_date"
,"year_week_number"
,"year_period_number"
,"realization"
,"index"
)

Here, calendar_date, year_week_number, year_period_number, and realization were the columns of my original dataframe. You can replace the names with the names of your columns. The index is the new column name you had to add for the row numbers.

This process is largely more efficient and smoother when compared with row_number().over(some_windows_partition) method.

Hope this helps.



回答4:

To get around the shifting evaluation of monotonically_increasing_id(), you could try writing the dataframe to disk, and re-reading. Then the id column is now simply a data field that is being read, rather than dynamically calculated at some point in the pipeline. Although it's a pretty ugly solution, it worked when i did a quick test.



回答5:

This worked for me. Created another identity column and used window function row_number

import org.apache.spark.sql.functions.{row_number}
import org.apache.spark.sql.expressions.Window

val df1: DataFrame = df.withColumn("Id",lit(1))

df1
.select(
...,
row_number()
.over(Window
.partitionBy("Id"
.orderBy(col("...").desc))
)
.alias("Row_Nbr")
)