I have a file in CSV format which contains a table with column "id", "timestamp", "action", "value" and "location". I want to apply a function to each row of the table and I've already written the code in R as follows:
user <- read.csv(file_path,sep = ";")
num <- nrow(user)
curLocation <- "1"
for(i in 1:num) {
row <- user[i,]
if(user$action != "power")
curLocation <- row$value
user[i,"location"] <- curLocation
}
The R script works fine and now I want to apply it SparkR. However, I couldn't access the ith row directly in SparkR and I couldn't find any function to manipulate every row in SparkR documentation.
Which method should I use in order to achieve the same effect as in the R script?
In addition, as advised by @chateaur, I tried to code using dapply function as follows:
curLocation <- "1"
schema <- structType(structField("Sequence","integer"), structField("ID","integer"), structField("Timestamp","timestamp"), structField("Action","string"), structField("Value","string"), structField("Location","string"))
setLocation <- function(row, curLoc) {
if(row$Action != "power|battery|level"){
curLoc <- row$Value
}
row$Location <- curLoc
}
bw <- dapply(user, function(row) { setLocation(row, curLocation)}, schema)
head(bw)
Then I got an error:
I looked up the warning message the condition has length > 1 and only the first element will be used and I found something https://stackoverflow.com/a/29969702/4942713. It made me wonder whether the row parameter in the dapply function represent an entire partition of my data frame instead of one single row? Maybe dapply function is not a desirable solution?
Later, I tried to modify the function as advised by @chateaur. Instead of using dapply, I used dapplyCollect which saves me the effort of specifying the schema. It works!
changeLocation <- function(partitionnedDf) {
nrows <- nrow(partitionnedDf)
curLocation <- "1"
for(i in 1:nrows){
row <- partitionnedDf[i,]
if(row$action != "power") {
curLocation <- row$value
}
partitionnedDf[i,"location"] <- curLocation
}
partitionnedDf
}
bw <- dapplyCollect(user, changeLocation)