Well, I work it around using two temp tables:
drop table if exists administrator_tmp1;
drop table if exists administrator_tmp2;
set hive.exec.dynamic.partition=true;
set hive.exec.dynamic.partition.mode=nonstrict;
--review_administrator
CREATE TABLE if not exists review_administrator(
admin_id bigint ,
admin_name string,
create_time string,
email string ,
password string,
status_description string,
token string ,
expire_time string ,
granter_user_id bigint ,
admin_time string ,
effect_start_date string ,
effect_end_date string
)
partitioned by (current_row_indicator string comment 'current, expired')
stored as parquet;
--tmp1 is used for saving origin data
CREATE TABLE if not exists administrator_tmp1(
admin_id bigint ,
admin_name string,
create_time string,
email string ,
password string ,
status_description string ,
token string ,
expire_time string ,
granter_user_id bigint ,
admin_time string ,
effect_start_date string ,
effect_end_date string
)
partitioned by (current_row_indicator string comment 'current, expired:')
stored as parquet;
--tmp2 saving the scd data
CREATE TABLE if not exists administrator_tmp2(
admin_id bigint ,
admin_name string,
create_time string,
email string ,
password string ,
status_description string ,
token string ,
expire_time string ,
granter_user_id bigint ,
admin_time string ,
effect_start_date string ,
effect_end_date string
)
partitioned by (current_row_indicator string comment 'current, expired')
stored as parquet;
--insert origin data into tmp1
INSERT OVERWRITE TABLE administrator_tmp1 PARTITION(current_row_indicator)
SELECT
user_id as admin_id,
name as admin_name,
time as create_time,
email as email,
password as password,
status as status_description,
token as token,
expire_time as expire_time,
admin_id as granter_user_id,
admin_time as admin_time,
'{{ ds }}' as effect_start_date,
'9999-12-31' as effect_end_date,
'current' as current_row_indicator
FROM
ks_db_origin.gifshow_administrator_origin
;
--insert scd data into tmp2
--for the data unchanged
INSERT INTO TABLE administrator_tmp2 PARTITION(current_row_indicator)
SELECT
t2.admin_id,
t2.admin_name,
t2.create_time,
t2.email,
t2.password,
t2.status_description,
t2.token,
t2.expire_time,
t2.granter_user_id,
t2.admin_time,
t2.effect_start_date,
t2.effect_end_date as effect_end_date,
t2.current_row_indicator
FROM
administrator_tmp1 t1
INNER JOIN
(
SELECT * FROM review_administrator
WHERE current_row_indicator = 'current'
) t2
ON
t1.admin_id = t2.admin_id
AND t1.admin_name = t2.admin_name
AND t1.create_time = t2.create_time
AND t1.email = t2.email
AND t1.password = t2.password
AND t1.status_description = t2.status_description
AND t1.token = t2.token
AND t1.expire_time = t2.expire_time
AND t1.granter_user_id = t2.granter_user_id
AND t1.admin_time = t2.admin_time
;
--for the data changed , update the effect_end_date
INSERT INTO TABLE administrator_tmp2 PARTITION(current_row_indicator)
SELECT
t2.admin_id,
t2.admin_name,
t2.create_time,
t2.email,
t2.password,
t2.status_description,
t2.token,
t2.expire_time,
t2.granter_user_id,
t2.admin_time,
t2.effect_start_date as effect_start_date,
'{{ yesterday_ds }}' as effect_end_date,
'expired' as current_row_indicator
FROM
administrator_tmp1 t1
INNER JOIN
(
SELECT * FROM review_administrator
WHERE current_row_indicator = 'current'
) t2
ON
t1.admin_id = t2.admin_id
WHERE NOT
(
t1.admin_name = t2.admin_name
AND t1.create_time = t2.create_time
AND t1.email = t2.email
AND t1.password = t2.password
AND t1.status_description = t2.status_description
AND t1.token = t2.token
AND t1.expire_time = t2.expire_time
AND t1.granter_user_id = t2.granter_user_id
AND t1.admin_time = t2.admin_time
)
;
--for the changed data and the new data
INSERT INTO TABLE administrator_tmp2 PARTITION(current_row_indicator)
SELECT
t1.admin_id,
t1.admin_name,
t1.create_time,
t1.email,
t1.password,
t1.status_description,
t1.token,
t1.expire_time,
t1.granter_user_id,
t1.admin_time,
t1.effect_start_date,
t1.effect_end_date,
t1.current_row_indicator
FROM
administrator_tmp1 t1
LEFT OUTER JOIN
(
SELECT * FROM review_administrator
WHERE current_row_indicator = 'current'
) t2
ON
t1.admin_id = t2.admin_id
AND t1.admin_name = t2.admin_name
AND t1.create_time = t2.create_time
AND t1.email = t2.email
AND t1.password = t2.password
AND t1.status_description = t2.status_description
AND t1.token = t2.token
AND t1.expire_time = t2.expire_time
AND t1.granter_user_id = t2.granter_user_id
AND t1.admin_time = t2.admin_time
WHERE t2.admin_id IS NULL
;
--for the data already marked by 'expired'
INSERT INTO TABLE administrator_tmp2 PARTITION(current_row_indicator)
SELECT
t1.admin_id,
t1.admin_name,
t1.create_time,
t1.email,
t1.password,
t1.status_description,
t1.token,
t1.expire_time,
t1.granter_user_id,
t1.admin_time,
t1.effect_start_date,
t1.effect_end_date,
t1.current_row_indicator
FROM
review_administrator t1
WHERE t1.current_row_indicator = 'expired'
;
--populate the dim table
INSERT OVERWRITE TABLE review_administrator PARTITION(current_row_indicator)
SELECT
t1.admin_id,
t1.admin_name,
t1.create_time,
t1.email,
t1.password,
t1.status_description,
t1.token,
t1.expire_time,
t1.granter_user_id,
t1.admin_time,
t1.effect_start_date,
t1.effect_end_date,
t1.current_row_indicator
FROM
administrator_tmp2 t1
;
--drop the two temp table
drop table administrator_tmp1;
drop table administrator_tmp2;
-- --example data
-- --2017-01-01
-- insert into table review_administrator PARTITION(current_row_indicator)
-- SELECT '1','a','2016-12-31','a@ks.com','password','open','token1','2017-12-31',
-- 0,'2017-12-31','2017-01-01','9999-12-31','current'
-- FROM default.sample_07 limit 1;
-- --2017-01-02
-- insert into table administrator_tmp1 PARTITION(current_row_indicator)
-- SELECT '1','a','2016-12-31','a01@ks.com','password','open','token1','2017-12-31',
-- 0,'2017-12-31','2017-01-02','9999-12-31','current'
-- FROM default.sample_07 limit 1;
-- insert into table administrator_tmp1 PARTITION(current_row_indicator)
-- SELECT '2','b','2016-12-31','a@ks.com','password','open','token1','2017-12-31',
-- 0,'2017-12-31','2017-01-02','9999-12-31','current'
-- FROM default.sample_07 limit 1;
-- --2017-01-03
-- --id 1 is changed
-- insert into table administrator_tmp1 PARTITION(current_row_indicator)
-- SELECT '1','a','2016-12-31','a03@ks.com','password','open','token1','2017-12-31',
-- 0,'2017-12-31','2017-01-03','9999-12-31','current'
-- FROM default.sample_07 limit 1;
-- --id 2 is not changed at all
-- insert into table administrator_tmp1 PARTITION(current_row_indicator)
-- SELECT '2','b','2016-12-31','a@ks.com','password','open','token1','2017-12-31',
-- 0,'2017-12-31','2017-01-03','9999-12-31','current'
-- FROM default.sample_07 limit 1;
-- --id 3 is a new record
-- insert into table administrator_tmp1 PARTITION(current_row_indicator)
-- SELECT '3','c','2016-12-31','c@ks.com','password','open','token1','2017-12-31',
-- 0,'2017-12-31','2017-01-03','9999-12-31','current'
-- FROM default.sample_07 limit 1;
-- --now dim table will show you the right SCD.
drop table if exists harsha.emp;
drop table if exists harsha.emp_tmp1;
drop table if exists harsha.emp_tmp2;
drop table if exists harsha.init_load;
show databases;
use harsha;
show tables;
create table harsha.emp (eid int,ename string,sal int,loc string,dept int,start_date timestamp,end_date timestamp,current_status string)
comment "emp scd implementation"
row format delimited
fields terminated by ','
lines terminated by '\n'
;
create table harsha.emp_tmp1 (eid int,ename string,sal int,loc string,dept int,start_date timestamp,end_date timestamp,current_status string)
comment "emp scd implementation"
row format delimited
fields terminated by ','
lines terminated by '\n'
;
create table harsha.emp_tmp2 (eid int,ename string,sal int,loc string,dept int,start_date timestamp,end_date timestamp,current_status string)
comment "emp scd implementation"
row format delimited
fields terminated by ','
lines terminated by '\n'
;
create table harsha.init_load (eid int,ename string,sal int,loc string,dept int)
row format delimited
fields terminated by ','
lines terminated by '\n'
;
show tables;
insert into table harsha.emp select 101 as eid,'aaaa' as ename,3400 as sal,'chicago' as loc,10 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;
insert into table harsha.emp select 102 as eid,'abaa' as ename,6400 as sal,'ny' as loc,10 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;
insert into table harsha.emp select 103 as eid,'abca' as ename,2300 as sal,'sfo' as loc,20 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;
insert into table harsha.emp select 104 as eid,'afga' as ename,3000 as sal,'seattle' as loc,10 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;
insert into table harsha.emp select 105 as eid,'ikaa' as ename,1400 as sal,'LA' as loc,30 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;
insert into table harsha.emp select 106 as eid,'cccc' as ename,3499 as sal,'spokane' as loc,20 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;
insert into table harsha.emp select 107 as eid,'toiz' as ename,4000 as sal,'WA.DC' as loc,40 as did,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from (select '123')x;
load data local inpath 'Documents/hadoop_scripts/t3.txt' into table harsha.emp;
load data local inpath 'Documents/hadoop_scripts/t4.txt' into table harsha.init_load;
insert into table harsha.emp_tmp1 select eid,ename,sal,loc,dept,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status
from harsha.init_load;
insert into table harsha.emp_tmp2
select a.eid,a.ename,a.sal,a.loc,a.dept,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'updated' as current_status from emp_tmp1 a
left outer join emp b on
a.eid=b.eid and
a.ename=b.ename and
a.sal=b.sal and
a.loc = b.loc and
a.dept = b.dept
where b.eid is null
union all
select a.eid,a.ename,a.sal,a.loc,a.dept,from_unixtime(unix_timestamp()) as start_date,from_unixtime(unix_timestamp('9999-12-31 23:59:59','yyyy-mm-dd hh:mm:ss')) as end_date,'current' as current_status from emp_tmp1 a
left outer join emp b on
a.eid = b.eid and
a.ename=b.ename and
a.sal=b.sal and
a.loc=b.loc and
a.dept=b.dept
where b.eid is not null
union all
select b.eid,b.ename,b.sal,b.loc,b.dept,b.start_date as start_date,from_unixtime(unix_timestamp()) as end_date,'expired' as current_status from emp b
inner join emp_tmp1 a on
a.eid=b.eid
where
a.ename <> b.ename or
a.sal <> b.sal or
a.loc <> b.loc or
a.dept <> b.dept
;
insert into table harsha.emp select eid,ename,sal,loc,dept,start_date,end_date,current_status from emp_tmp2;
records including expired:
select * from harsha.emp order by eid;
latest recods:
select a.* from emp a inner join (select eid ,max(start_date) as start_date from emp where current_status <> 'expired' group by eid) b on a.eid=b.eid and a.start_date=b.start_date;
I did use another approach when it come to managing data with SCDs
:
Never update data that does exist inside your historical file or table.
Make sure that new rows will be compared to the most recent generation, for instance the load logic will add control columns : loaded_on
, checksum
and if needed a sequence column that would be used if multiple loads does occur the same day then comparing new data to the most recent generation will use both control columns and a key column that does exist inside your data like a customer or product key.
Now, the magic does take place by computing the checksum
of all the column involved but the control columns, creating a unique finger print for each row. The finger print (checksum
) column then will be used to determine if any columns have changed compared to the most recent generation (most recent generation is based on the latest state of the data based on the key, loaded_on and sequence).
Now, you know if a row coming from your daily update is new because there is no previous generation or if a row coming from your daily update will require to create a new row (new generation) inside your historical file or table and last if a row coming from your daily update does not have any changes therefore no need to create a row because there is no difference compared to previous generation.
The type of logic needed can be build using Apache Spark
, in a single statement you can ask Spark
to concatenate any number of columns of any datatypes
then compute a hash
value that is used to finger print it.
All together now you can develop a utility based on spark
that will accept any data source and output a well organized, clean and slow dimensions aware historical file, table,... last, never update append only!