I am using sparklyr to manipulate some data.
Given a,
a<-tibble(id = rep(c(1,10), each = 10),
attribute1 = rep(c("This", "That", 'These', 'Those', "The", "Other", "Test", "End", "Start", 'Beginning'), 2),
value = rep(seq(10,100, by = 10),2),
average = rep(c(50,100),each = 10),
upper_bound = rep(c(80, 130), each =10),
lower_bound = rep(c(20, 70), each =10))
I would like use "gather" to manipulate the data, like this:
b<- a %>%
gather(key = type_data, value = value_data, -c(id:attribute1))
However, "gather" is not available on sparklyr. I have seen some people using sdf_pivot to mimic "gather" (eg How to use sdf_pivot() in sparklyr and concatenate strings?) but I can’t see how to use it in this case.
Does anyone have an idea?
Cheers!
Here's a function to mimic gather
in sparklyr. This would gather the given columns while keeping everything else intact, but it can easily be extended if required.
# Function
sdf_gather <- function(tbl, gather_cols){
other_cols <- colnames(tbl)[!colnames(tbl) %in% gather_cols]
lapply(gather_cols, function(col_nm){
tbl %>%
select(c(other_cols, col_nm)) %>%
mutate(key = col_nm) %>%
rename(value = col_nm)
}) %>%
sdf_bind_rows() %>%
select(c(other_cols, 'key', 'value'))
}
# Example
spark_df %>%
select(col_1, col_2, col_3, col_4) %>%
sdf_gather(c('col_3', 'col_4'))
You can design an equivalent using map
/ explode
:
sdf_gather <- function(data, key = "key", value = "value", ...) {
cols <- list(...) %>% unlist()
# Explode with map (same as stack) requires multiple aliases so
# dplyr mutate won't work for us here.
expr <- list(paste(
"explode(map(",
paste("'", cols, "',`", cols, "`", sep = "", collapse = ","),
")) as (", key, ",", value, ")", sep = ""))
keys <- data %>% colnames() %>% setdiff(cols) %>% as.list()
data %>%
spark_dataframe() %>%
sparklyr::invoke("selectExpr", c(keys, expr)) %>%
sdf_register()
}
or Hive stack
function:
sdf_gather <- function(data, key = "key", value = "value", ...) {
cols <- list(...) %>% unlist()
expr <- list(paste(
"stack(", length(cols), ", ",
paste("'", cols, "',`", cols, "`", sep="", collapse=","),
") as (", key, ",", value, ")", sep=""))
keys <- data %>% colnames() %>% setdiff(cols) %>% as.list()
data %>%
spark_dataframe() %>%
sparklyr::invoke("selectExpr", c(keys, expr)) %>%
sdf_register()
}
Both should give the same result:
long <- sdf_gather(
df, "my_key", "my_value",
"value", "average", "upper_bound", "lower_bound")
long
# Source: table<sparklyr_tmp_7b8f5989ba4d> [?? x 4]
# Database: spark_connection
id attribute1 my_key my_value
<dbl> <chr> <chr> <dbl>
1 1 This value 10
2 1 This average 50
3 1 This upper_bound 80
4 1 This lower_bound 20
5 1 That value 20
6 1 That average 50
7 1 That upper_bound 80
8 1 That lower_bound 20
9 1 These value 30
10 1 These average 50
# ... with more rows
and can be modified to support non-standard evaluation.
Please note that both methods require homogeneous column types.
Notes
explode
version generates following query:
SELECT id, attribute1,
explode(map(
'value', `value`,
'average', `average`,
'upper_bound', `upper_bound`,
'lower_bound', `lower_bound`)) as (my_key,my_value)
FROM df
and optimized logical execution plan
org.apache.spark.sql.catalyst.plans.logical.Generate
Generate explode(map(value, value#16, average, average#17, upper_bound, upper_bound#18, lower_bound, lower_bound#19)), [2, 3, 4, 5], false, [my_key#226, my_value#227]
+- InMemoryRelation [id#14, attribute1#15, value#16, average#17, upper_bound#18, lower_bound#19], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[id#14,attribute1#15,value#16,average#17,upper_bound#18,lower_bound#19]
while stack
version generates
SELECT id, attribute1,
stack(4,
'value', `value`,
'average', `average`,
'upper_bound', `upper_bound`,
'lower_bound', `lower_bound`) as (my_key,my_value)
FROM df
and
org.apache.spark.sql.catalyst.plans.logical.Generate
Generate stack(4, value, value#16, average, average#17, upper_bound, upper_bound#18, lower_bound, lower_bound#19), [2, 3, 4, 5], false, [my_key#323, my_value#324]
+- InMemoryRelation [id#14, attribute1#15, value#16, average#17, upper_bound#18, lower_bound#19], StorageLevel(disk, memory, deserialized, 1 replicas)
+- Scan ExistingRDD[id#14,attribute1#15,value#16,average#17,upper_bound#18,lower_bound#19]
Single quoted values (i.e. 'value'
), in the generated SQL are literal strings, while backquoted values represent column reference.
Nope, no pivot
answer here.
I am also waiting for a better one.
library(sparklyr)
library(rlang)
library(dplyr)
#Given
sparkDf_a <- copy_to(dest = sc, df = a)
helper_fn <- function(df, key, val, ...){
quo_col <- enquo(val)
df %>%
dplyr::group_by(id, attribute1) %>%
dplyr::select(!!quo_col) %>%
mutate(type_data = key,
value_data = !!quo_col) %>%
dplyr::select(-!!quo_col)
}
b <- sdf_bind_rows(
helper_fn(df = sparkDf_a, key = 'value', val = value),
helper_fn(df = sparkDf_a, key = 'average', val = average),
helper_fn(df = sparkDf_a, key = 'upper_bound', val = upper_bound),
helper_fn(df = sparkDf_a, key = 'lower_bound', val = lower_bound)
)
Result
collect(b)
# A tibble: 80 x 4
# Groups: id, attribute1 [20]
id attribute1 type_data value_data
<dbl> <chr> <chr> <dbl>
1 1 End upper_bound 80
2 1 Other lower_bound 20
3 1 Start lower_bound 20
4 1 Test average 50
5 1 Test upper_bound 80
6 1 That average 50
7 1 That lower_bound 20
8 1 Those value 40
9 10 Start lower_bound 70
10 10 That average 100
# ... with 70 more rows