SparkR in Apache Spark | Operations & Algorithms in Spark and R

1. Objective

Today, in this article, we will learn the whole concept of SparkR. First, we will learn the process of creating SparkR DataFrames. Moreover, we will also learn some SparkR DataFrame operations. Further, we will learn MLlib algorithms exposed by Spark and R. Basically, an R package that provides a light-weight frontend to use Apache Spark from R is what we call SparkR.

SparkR in Apache Spark

SparkR in Apache Spark

2. What is SparkR?

SparkR is nothing but an R package. Basically, that provides a light-weight frontend to use Apache Spark from R . Initially, with Spark 1.4.x, it offers a distributed DataFrame implementation. Also, supports various operations. For example, selection, filtering, aggregation and many more. Moreover, using MLlib it also supports distributed machine learning.

3. SparkDataFrame in SparkR

Data is organized as a distributed collection of data into named columns. That we call on Spark DataFrame. Basically, it is as same as a table in a relational database or a data frame in R. Moreover, we can construct a DataFrame from a wide array of sources. For example structured data files, tables in Hive, external databases. Also, existing local R data frames are used for construction.

4. Starting Up: SparkSession

Basically, SparkSession is an entry point into SparkR. Also, connects your R program to a Spark cluster. In addition, By using sparkR.session, we can create a SparkSession. Also, pass in options such as the application name, any spark packages depended on and many more.  Moreover, we can work with SparkDataFrames via SparkSession.
In addition, there is a condition that SparkSession should already be created to work from the sparkR shell. Hence, we would not need to call sparkR.session.
Moreover, there is a condition that SparkSession should already be created. Since we want to work from the SparkR shell. Hence, we would not need to call sparkR.session.

5. Creating SparkDataFrames in SparkR


Creating SparkDataFrames in Spark and R

Basically, by using a SparkSession, applications can create SparkDataFrames. For example data sources like local R data frame, Hive table, or other data sources.

i. From local dataframes

Basically, we can create a data frame in the very simplest way. By just only conversion of a local R data frame into a Spark DataFrame. Although, we can create by using as.DataFrame or createDataFrame. Also, by passing in the local R data frame to create a Spark DataFrame.
Hence, by using the faithful dataset from R, we are creating a SparkDataFrame based.
For Example

df <- as.DataFrame(faithful)
# Displays the first part of the SparkDataFrame
##  eruptions waiting
##1     3.600      79
##2     1.800      54
##3     3.333      74

ii. From Data Sources

Through the SparkDataFrame interface, SparkR supports operating on a variety of data sources. Basically, for creating SparkDataFrames, the general method from data sources is read.df. Generally, This method takes in the path for the file to load. Also, the type of data source and the currently active SparkSession will be automatically used. Moreover, It supports reading JSON, CSV and Parquet files natively.
In addition, we can add these packages by specifying two conditions. Such as, if packages with spark-submit or its commands. Else, if initializing SparkSession with the spark Packages parameter. Either in an interactive R shell or from RStudio.
sparkR.session(sparkPackages = “com.databricks:spark-avro_2.11:3.0.0”)
Basically, we have seen how to use data sources using an example JSON input file. Although the file that is used here is not a typical JSON file. Basically, each line in the file must contain a separate, valid JSON object.
For Example

people <- read.df("./examples/src/main/resources/people.json", "json")
##  age    name
##1  NA Michael
##2  30    Andy
##3  19  Justin
# SparkR automatically infers the schema from the JSON file
# root
#  |-- age: long (nullable = true)
#  |-- name: string (nullable = true)
# Similarly, multiple files can be read with read.json
people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json"))
In addition, the data sources API natively supports CSV formatted input files.
df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")

The data sources API can also be used to save out SparkDataFrames into multiple file formats.
For example, we can save the SparkDataFrame from the previous example to a Parquet file using write.df.
write.df(people, path = “people.parquet”, source = “parquet”, mode = “overwrite”)

iii. SparkDataFrames from Hive tables

Creation of SparkDataFrames is also possible from Hive tables. At first, we will need to create a SparkSession with Hive support. Basically, that helps to access tables in the Hive MetaStore. Moreover, there is only one need that Spark should have been built with Hive support. Although, SparkR attempt to create a SparkSession with Hive support enabled by default. (enableHiveSupport = TRUE).
For Example

sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
# Queries can be expressed in HiveQL.
results <- sql("FROM src SELECT key, value")
# results is now a SparkDataFrame
##  key   value
## 1 238 val_238
## 2  86  val_86
## 3 311 val_311

6. SparkDataFrame Operations


SparkDataFrame Operations

Basically, for structured data processing SparkDataFrames support a number of functions. Some of the basic examples are:

i. Selecting rows, columns

For Example

# Create the SparkDataFrame
df <- as.DataFrame(faithful)
# Get basic information about the SparkDataFrame
## SparkDataFrame[eruptions:double, waiting:double]
# Select only the "eruptions" column
head(select(df, df$eruptions))
##  eruptions
##1     3.600
##2     1.800
##3     3.333
# You can also pass in column name as strings
head(select(df, "eruptions"))
# Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins
head(filter(df, df$waiting < 50))
##  eruptions waiting
##1     1.750      47
##2     1.750      47
##3     1.867      48

ii. Grouping, Aggregation

Basically, to aggregate data by grouping, data frames support some commonly used functions. Moreover, we can compute a histogram of the waiting time in the faithful DataSet as below.
For Example
# We use the `n` operator to count the number of times each waiting time appears

head(summarize(groupBy(df, df$waiting), count = n(df$waiting)))
##  waiting count
##1      70     4
##2      67     1
##3      69     2

# We can also sort the output from the aggregation to get the most common waiting times

waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting))
head(arrange(waiting_counts, desc(waiting_counts$count)))
##   waiting count
##1      78    15
##2      83    14
##3      81    13

iii. Operating on Columns

Basically, SparkR offers the number of functions.  Moreover, those can directly apply to columns for data processing and during aggregation. Although, Here below example shows the use of basic arithmetic functions.
For Example
# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame

df$waiting_secs <- df$waiting * 60
##  eruptions waiting waiting_secs
##1     3.600      79         4740
##2     1.800      54         3240
##3     3.333      74         4440
Apache Spark Quiz

7. Machine Learning Algorithms in Spark and R


Machine Learning Algorithms in SparkR

There are following machine learning algorithms currently, supported by Spark and R.

i. Classification

spark.logit: Logistic Regression
spark.mlp: Multilayer Perceptron (MLP)
spark.naiveBayes: Naive Bayes
spark.svmLinear: Linear Support Vector Machine

ii. Regression

spark.survreg: Accelerated Failure Time (AFT) Survival Model
spark.glm or glm: Generalized Linear Model (GLM)
spark.isoreg: Isotonic Regression

iii. Tree

spark.gbt: Gradient Boosted Trees for Regression and Classification
spark.randomForest: Random Forest for Regression and Classification

iv. Clustering

spark.bisectingKmeans: Bisecting k-means
spark.gaussianMixture: Gaussian Mixture Model (GMM)
spark.kmeans: K-Means
spark.lda: Latent Dirichlet Allocation (LDA)

v. Collaborative Filtering

spark.als: Alternating Least Squares (ALS)

vi. Frequent Pattern Mining

spark.fpGrowth : FP-growth

vii. Statistics

spark.kstest: Kolmogorov-Smirnov Test
Basically, SparkR uses MLlib to train the model. Moreover, It supports a subset of the available R formula operators. For example, model fitting, including ‘~’, ‘.’, ‘:’, ‘+’, and ‘-‘.

viii. Model persistence

Basically, to define well, below example shows how to save/load an MLlib model by SparkR.
For Example

training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")

# Basically, Fit a generalized linear model of family “gaussian” with spark.glm

df_list <- randomSplit(training, c(7,3), 2)
gaussianDF <- df_list[[1]]
gaussianTestDF <- df_list[[2]]
gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian")
# Save and then load a fitted MLlib model
modelPath <- tempfile(pattern = "ml", fileext = ".tmp"), modelPath)
gaussianGLM2 <-
# Check model summary
# Check model prediction
gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF)

8. Conclusion

Hence, we have seen, how through SparkR we can use Apache Spark. Basically, we have learned the whole concept regarding the creation of SparkR DataFrames. Moreover, we have seen all the operations of SparkR. Further, we have learned about MLlib Algorithms. Hence, we have learned the whole Concept of Spark and R.
Finally, if you want to ask any query, feel free to ask in the Comment section.

Reference for Spark

Leave a Reply

Your email address will not be published. Required fields are marked *