4

I am relatively new to SPARKR. I downloaded SPARK 1.4 and setup RStudio to use SPARKR library. However I want to know how I can apply a function to each value in a column of a distributed DataFrame, can someone please help? For example,

This works perfectly

myFunc <- function(x) { paste(x , "_hello")}
c <- c("a", "b", "c")
d <- lapply(c, myFunc)

How to make this work for a Distributed DataFrame. The intention is to append "_hello" to each value of column Name of DF

DF <- read.df(sqlContext, "TV_Flattened_2.csv", source = "com.databricks.spark.csv", header="true")
SparkR:::lapply(DF$Name, myFunc)

In the alpha version of SPARKR before SPARK 1.4 release there seems to have been this ability, why is this now missing in SPARK 1.4 official release?

2
  • I have no knowledge of sparkr but could it be you rather need name(DF) instead of DF$Name ? Commented Aug 12, 2015 at 10:13
  • Under the hood, the lapply function is still part of SparkR 1.4, but for the moment it is no global function, I have no idea why. You should as well look to the map function. Commented Aug 13, 2015 at 6:53

3 Answers 3

2

Using flatMap, you create an RDD from a DataFrame with the function applied on all items.

c <- c("a", "b", "c")
df <- createDataFrame(sqlContext, as.data.frame(c))
myFunc <- function(x) { paste(x , "_hello")}
d <- flatMap(df, myFunc)
e <- createDataFrame(sqlContext, d)

The disadvantage however, is that only does what you expect on the first column of the DataFrame, it skips all other columns. This is seen in the following example:

c <- c("a", "b", "c")
df <- createDataFrame(sqlContext, as.data.frame(c,u=c(1,2,3)))
myFunc <- function(x) { paste(x , "_hello")}
d <- flatMap(df, myFunc)
e <- createDataFrame(sqlContext, d)

which gives exactly the same output as the first example, however df started with an extra column.

Sign up to request clarification or add additional context in comments.

1 Comment

It seems that the flatMap is not exported in 2.1, I had to do: SparkR:::flatMap(...)
1

Spark 2.x now has a function called dapply which allows you to run a R function on each partition of a SparkR dataframe.

Code sample from docs:

# Convert waiting time from hours to seconds.
# Note that we can apply UDF to DataFrame.
schema <- structType(structField("eruptions", "double"), structField("waiting", "double"),
                     structField("waiting_secs", "double"))
df1 <- dapply(df, function(x) { x <- cbind(x, x$waiting * 60) }, schema)
head(collect(df1))
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
##4     2.283      62         3720
##5     4.533      85         5100
##6     2.883      55         3300

See here for more information: http://spark.apache.org/docs/latest/sparkr.html#run-a-given-function-on-a-large-dataset-using-dapply-or-dapplycollect

Just note that if you are using any external R libraries, you will need to have these installed on the worker nodes

2 Comments

I tried to get gapply to work but it was so tricky but yours worked perfect. Thanks!!
Regarding external libraries, you can load libraries in the function as well by adding library(x) at the beginning of the function.
0

I played with this quite a bit and don't have a clean solution to apply the function directly to column elements and frankly am not sure this is currently possible. Nonetheless, using the COLLECT method we can do the following:

Note am using a Windows and type into powershell

cd D:\Spark\spark-1.4.1-bin-hadoop2.6
./bin/sparkR
c <- c("a", "b", "c")
df <- createDataFrame(sqlContext, as.data.frame(c))
c1 <- collect(df)
myFunc <- function(x) { paste(x , "_hello")}
d <- lapply(c1, myFunc)
df2 <- createDataFrame(sqlContext, as.data.frame(d))
head(df2)

Produces what you'd print in R: 1 a _hello 2 b _hello 3 c _hello

Here are useful resources:

https://spark.apache.org/docs/latest/api/R/index.html

https://spark.apache.org/docs/latest/sparkr.html

https://databricks.com/blog/2015/08/12/from-pandas-to-apache-sparks-dataframe.html

1 Comment

This approach is serial. Calling collect on the dataframe means this will not distribute the work among the executors.

Your Answer

By clicking “Post Your Answer”, you agree to our terms of service and acknowledge you have read our privacy policy.

Start asking to get answers

Find the answer to your question by asking.

Ask question

Explore related questions

See similar questions with these tags.