Spark-Monotonically increasing id not working as e

2020-02-06 06:47发布

问题:

I have a dataframe df in Spark which looks something like this:

scala> df.show()
+--------+--------+
|columna1|columna2|
+--------+--------+
|     0.1|     0.4|
|     0.2|     0.5|
|     0.1|     0.3|
|     0.3|     0.6|
|     0.2|     0.7|
|     0.2|     0.8|
|     0.1|     0.7|
|     0.5|     0.5|
|     0.6|    0.98|
|     1.2|     1.1|
|     1.2|     1.2|
|     0.4|     0.7|
+--------+--------+

I tried to include an id column with the following code

val df_id = df.withColumn("id",monotonicallyIncreasingId)

but the id column is not what I expect:

scala> df_id.show()
+--------+--------+----------+
|columna1|columna2|        id|
+--------+--------+----------+
|     0.1|     0.4|         0|
|     0.2|     0.5|         1|
|     0.1|     0.3|         2|
|     0.3|     0.6|         3|
|     0.2|     0.7|         4|
|     0.2|     0.8|         5|
|     0.1|     0.7|8589934592|
|     0.5|     0.5|8589934593|
|     0.6|    0.98|8589934594|
|     1.2|     1.1|8589934595|
|     1.2|     1.2|8589934596|
|     0.4|     0.7|8589934597|
+--------+--------+----------+

As you can see, it goes well from 0 to 5 but then the next id is 8589934592 instead of 6 and so on.

So what is wrong here? Why is the id column not properly indexed here?

回答1:

It works as expected. This function is not intended for generating consecutive values. Instead it encodes partition number and index by partition

The generated ID is guaranteed to be monotonically increasing and unique, but not consecutive. The current implementation puts the partition ID in the upper 31 bits, and the record number within each partition in the lower 33 bits. The assumption is that the data frame has less than 1 billion partitions, and each partition has less than 8 billion records.

As an example, consider a DataFrame with two partitions, each with 3 records. This expression would return the following IDs:

0, 1, 2, 8589934592 (1L << 33), 8589934593, 8589934594.

If you want consecutive numbers, use RDD.zipWithIndex.