sparklyr - Including null values in an Apache Spar

2019-07-17 07:00发布

The question Including null values in an Apache Spark Join has answers for Scala, PySpark and SparkR, but not for sparklyr. I've been unable to figure out how to have inner_join in sparklyr treat null values in a join column as equal. Does anyone know how this can be done in sparklyr?

1条回答
倾城 Initia
2楼-- · 2019-07-17 07:55

You can invoke an implicit cross join:

#' Return a Cartesian product of Spark tables
#'
#' @param df1 tbl_spark
#' @param df2 tbl_spark
#' @param explicit logical If TRUE use crossJoin otherwise 
#'   join without expression
#' @param suffix character suffixes to be used on duplicate names
cross_join <- function(df1, df2, 
    explicit = FALSE, suffix = c("_x", "_y")) {

  common_cols <- intersect(colnames(df1), colnames(df2))

  if(length(common_cols) > 0) {
    df1 <- df1 %>% rename_at(common_cols, funs(paste0(., suffix[1])))
    df2 <- df2 %>% rename_at(common_cols, funs(paste0(., suffix[2])))
  }

  sparklyr::invoke(
    spark_dataframe(df1), 
    if(explicit) "crossJoin" else "join", 
    spark_dataframe(df2)) %>% sdf_register()
}

and filter the result with IS NOT DISTINCT FROM

# Enable Cross joins
sc %>% 
  spark_session() %>% 
  sparklyr::invoke("conf") %>%
  sparklyr::invoke("set", "spark.sql.crossJoin.enabled", "true")

df1 <- copy_to(sc, tibble(id1 = c(NA, "foo", "bar"), val = 1:3))
df2 <- copy_to(sc, tibble(id2 = c(NA, "foo", "baz"), val = 4:6))

df1 %>%
  cross_join(df2) %>% 
  filter(id1 %IS NOT DISTINCT FROM% id2)
# Source: spark<?> [?? x 4]
  id1   val_x id2   val_y
* <chr> <int> <chr> <int>
1 NA        1 NA        4
2 foo       2 foo       5

The optimized execution plan:

<jobj[62]>
  org.apache.spark.sql.catalyst.plans.logical.Join
  Join Inner, (id1#10 <=> id2#76)
:- Project [id1#10, val#11 AS val_x#129]
:  +- InMemoryRelation [id1#10, val#11], StorageLevel(disk, memory, deserialized, 1 replicas)
:        +- Scan ExistingRDD[id1#10,val#11]
+- Project [id2#76, val#77 AS val_y#132]
   +- InMemoryRelation [id2#76, val#77], StorageLevel(disk, memory, deserialized, 1 replicas)
         +- Scan ExistingRDD[id2#76,val#77]

<=> operator should work the same way:

df1 %>%
  cross_join(df2) %>% 
  filter(id1 %<=>% id2)

Please note that:

  • Implicit cross join will fail, if it is not followed by selection which promotes the result to hash join / sort merge join, or cross join is enabled.
  • Explicit cross join shouldn't be used in this case, as it will take precedence over subsequent selections.
  • It is possible to use dplyr style cross join:

    mutate(df1, `_const` = TRUE) %>%  
      inner_join(mutate(df2, `_const` = TRUE), by = c("_const")) %>% 
      select(-`_const`) %>% 
      filter(id1 %IS NOT DISTINCT FROM% id2)
    

    but I'd advise against that, as it is less robust (depending on the context optimizer might be unable to recognize that _const is constant).

查看更多
登录 后发表回答