Given a query, select * from ...
(that might be part of CTAS statement)
The goal is to add an additional column, ID
, where ID
is a unique integer.
select ... as ID,* from ...
P.s.
ID
does not have to be sequential (there could be gaps)
- The ID could be arbitrary (doesn't have to represent a specific order within the result set)
row_number logically solves the problem -
select row_number() over () as ID,* from ...
The problem is, that at least for now, global row_number (no partition by) is being implemented using a single reducer (hive) / task (spark).
hive
set mapred.reduce.tasks=1000;
set hivevar:buckets=10000;
hivevar:buckets
should be high enough relatively to the number of reducers (mapred.reduce.tasks
), so the rows will be evenly distributed between the reduces.
select 1 + x + (row_number() over (partition by x) - 1) * ${hivevar:buckets} as id
,t.*
from (select t.*
,abs(hash(rand())) % ${hivevar:buckets} as x
from t
) t
spark-sql
select 1 + x + (row_number() over (partition by x) - 1) * 10000 as id
,t.*
from (select t.*
,abs(hash(rand())) % 10000 as x
from t
) t
For both hive and spark-sql
The rand()
is used to generate a good distribution.
If You already have in your query a column / combination of columns with good distribution (might be unique, not a must) you might use it instead, e.g. -
select 1 + (abs(hash(col1,col)) % 10000)
+ (row_number() over (partition by abs(hash(col1,col)) % 10000) - 1) * 10000 as id
,t.*
from t
If you are using Spark-sql your best bet would be to use the inbuilt function
monotonically_increasing_id
which generates unique random id in a separate column.
And as you said you don't need it to be sequential so this should ideally suffice your requirement.
Check this solution from Manoj Kumar: https://github.com/manojkumarvohra/hive-hilo
- A stateful UDF is created which maintains a HI/LO counters to
increment the sequences.
- The HI value is maintained as distribute atomic long in zookeeper.
- The HI value is incremented & fetched for every n LO (default 200)
iterations.
- The UDF supports a single String argument which is the sequence name
used to maintain zNodes in zookeeper.
Usage:
FunctionName( sequenceName, lowvalue[optional], seedvalue[optional])
Check this out for a globally unique id service https://github.com/spinaki/distributed-unique-id
It has a docker image too which you can test quickly.