Find missing rows by timestamp + ID with sparklyr

2019-07-26 07:07发布

I try to find missing timestamp. Here are a lot of solutions to fix this single problem. Nevertheless I also want to find "where" timestamp by ID is missing.

So for example the test-dataset would look like this:

elemuid timestamp
1232    2018-02-10 23:00:00
1232    2018-02-10 23:01:00
1232    2018-02-10 22:58:00
1674    2018-02-10 22:40:00
1674    2018-02-10 22:39:00
1674    2018-02-10 22:37:00
1674    2018-02-10 22:35:00

And the solution should be like:

elemuid timestamp
1232    2018-02-10 22:59:00
1674    2018-02-10 22:38:00
1674    2018-02-10 22:36:00

My problem is that I can only use dplyr, because I would like to use this code also in sparklyr. I would be really happy for your help!

2条回答
姐就是有狂的资本
2楼-- · 2019-07-26 07:11

For the simplicity let's assume you've already followed the instructions from your previous question, and computed minimum and maximum (min_max) Epoch time in seconds.

The remaining steps are quite similar to the ones we followed before:

  • Generate range of values:

    epoch_range <- spark_session(sc) %>% 
      invoke("range", as.integer(min_max[1]), as.integer(min_max[2]), 60L) %>%
      invoke("withColumnRenamed", "id", "timestamp")
    
  • Compute distinct elemuid

    elemuids <- df %>% select(elemuid) %>% distinct() %>% spark_dataframe()
    

Now, we want to generate a reference table as a Cartesian product of the range and unique ids:

ref <- epoch_range %>% 
  invoke("crossJoin", elemuids) %>% 
  sdf_register() %>%
  mutate(timestamp = from_unixtime(timestamp, "yyyy-MM-dd HH:mm:ss.SSS"))

You might be tempted to use more dplyr-ish method:

sdf_register(epoch_range) %>% mutate(dummy = 1) %>% 
  left_join(sdf_register(elemuids) %>% mutate(dummy = 1), by = "dummy") %>%
  select(-dummy)

This would work fine if size of the product is small (in that case Spark should use broadcast join), but will cause complete data skew otherwise so it is not safe to use in general.

Finally we'll outer join data as before:

ref %>% left_join(df, by = c("timestamp", "elemuid"))

to fill out things, or (as already explained in the answer provided by akrun) anti join to remove missing points:

ref %>% anti_join(df, by = c("timestamp", "elemuid"))
查看更多
混吃等死
3楼-- · 2019-07-26 07:34

Here is one option with anti_join. Assuming that the 'timestamp' column is not a Datetime object, we convert it to POSIXct

library(tidyverse)
df1 <- df1 %>%
          mutate(timestamp = ymd_hms(timestamp)) 

grouped by 'elemuid', use complete to expand the 'timestamp' by 1 minute and do an anti_join with the original dataset

df1 %>%
    group_by(elemuid) %>% 
    complete(timestamp = seq(min(timestamp), max(timestamp), by = "1 min")) %>% 
    anti_join(df1)
# A tibble: 3 x 2
# Groups: elemuid [?]
#   elemuid timestamp          
#     <int> <dttm>             
#1    1232 2018-02-10 22:59:00
#2    1674 2018-02-10 22:36:00
#3    1674 2018-02-10 22:38:00
查看更多
登录 后发表回答