SparkR in Apache Spark | Operations & Algorithms in Spark and R
1. Objective
Today, in this article, we will learn the whole concept of SparkR. First, we will learn the process of creating SparkR DataFrames. Moreover, we will also learn some SparkR DataFrame operations. Further, we will learn MLlib algorithms exposed by Spark and R. Basically, an R package that provides a light-weight frontend to use Apache Spark from R is what we call SparkR.
2. What is SparkR?
SparkR is nothing but an R package. Basically, that provides a light-weight frontend to use Apache Spark from R . Initially, with Spark 1.4.x, it offers a distributed DataFrame implementation. Also, supports various operations. For example, selection, filtering, aggregation and many more. Moreover, using MLlib it also supports distributed machine learning.
3. SparkDataFrame in SparkR
Data is organized as a distributed collection of data into named columns. That we call on Spark DataFrame. Basically, it is as same as a table in a relational database or a data frame in R. Moreover, we can construct a DataFrame from a wide array of sources. For example structured data files, tables in Hive, external databases. Also, existing local R data frames are used for construction.
4. Starting Up: SparkSession
Basically, SparkSession is an entry point into SparkR. Also, connects your R program to a Spark cluster. In addition, By using sparkR.session, we can create a SparkSession. Also, pass in options such as the application name, any spark packages depended on and many more. Â Moreover, we can work with SparkDataFrames via SparkSession.
In addition, there is a condition that SparkSession should already be created to work from the sparkR shell. Hence, we would not need to call sparkR.session.
Moreover, there is a condition that SparkSession should already be created. Since we want to work from the SparkR shell. Hence, we would not need to call sparkR.session.
sparkR.session()
5. Creating SparkDataFrames in SparkR
Basically, by using a SparkSession, applications can create SparkDataFrames. For example data sources like local R data frame, Hive table, or other data sources.
i. From local dataframes
Basically, we can create a data frame in the very simplest way. By just only conversion of a local R data frame into a Spark DataFrame. Although, we can create by using as.DataFrame or createDataFrame. Also, by passing in the local R data frame to create a Spark DataFrame.
Hence, by using the faithful dataset from R, we are creating a SparkDataFrame based.
For Example
df <- as.DataFrame(faithful) # Displays the first part of the SparkDataFrame head(df) ## Â eruptions waiting ##1 Â Â Â Â 3.600 Â Â Â Â Â 79 ##2 Â Â Â Â 1.800 Â Â Â Â Â 54 ##3 Â Â Â Â 3.333 Â Â Â Â Â 74
ii. From Data Sources
Through the SparkDataFrame interface, SparkR supports operating on a variety of data sources. Basically, for creating SparkDataFrames, the general method from data sources is read.df. Generally, This method takes in the path for the file to load. Also, the type of data source and the currently active SparkSession will be automatically used. Moreover, It supports reading JSON, CSV and Parquet files natively.
In addition, we can add these packages by specifying two conditions. Such as, if packages with spark-submit or its commands. Else, if initializing SparkSession with the spark Packages parameter. Either in an interactive R shell or from RStudio.
sparkR.session(sparkPackages = “com.databricks:spark-avro_2.11:3.0.0”)
Basically, we have seen how to use data sources using an example JSON input file. Although the file that is used here is not a typical JSON file. Basically, each line in the file must contain a separate, valid JSON object.
For Example
people <- read.df("./examples/src/main/resources/people.json", "json") head(people) ##  age    name ##1  NA Michael ##2  30    Andy ##3  19  Justin # SparkR automatically infers the schema from the JSON file printSchema(people) # root #  |-- age: long (nullable = true) #  |-- name: string (nullable = true) # Similarly, multiple files can be read with read.json people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json")) In addition, the data sources API natively supports CSV formatted input files. df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
The data sources API can also be used to save out SparkDataFrames into multiple file formats.
For example, we can save the SparkDataFrame from the previous example to a Parquet file using write.df.
write.df(people, path = “people.parquet”, source = “parquet”, mode = “overwrite”)
iii. SparkDataFrames from Hive tables
Creation of SparkDataFrames is also possible from Hive tables. At first, we will need to create a SparkSession with Hive support. Basically, that helps to access tables in the Hive MetaStore. Moreover, there is only one need that Spark should have been built with Hive support. Although, SparkR attempt to create a SparkSession with Hive support enabled by default. (enableHiveSupport = TRUE).
sparkR.session()
For Example
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. results <- sql("FROM src SELECT key, value") # results is now a SparkDataFrame head(results) ##  key   value ## 1 238 val_238 ## 2  86  val_86 ## 3 311 val_311
6. SparkDataFrame Operations
Basically, for structured data processing SparkDataFrames support a number of functions. Some of the basic examples are:
i. Selecting rows, columns
For Example
# Create the SparkDataFrame df <- as.DataFrame(faithful) # Get basic information about the SparkDataFrame df ## SparkDataFrame[eruptions:double, waiting:double] # Select only the "eruptions" column head(select(df, df$eruptions)) ## Â eruptions ##1 Â Â Â Â 3.600 ##2 Â Â Â Â 1.800 ##3 Â Â Â Â 3.333 # You can also pass in column name as strings head(select(df, "eruptions")) # Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins head(filter(df, df$waiting < 50)) ## Â eruptions waiting ##1 Â Â Â Â 1.750 Â Â Â Â Â 47 ##2 Â Â Â Â 1.750 Â Â Â Â Â 47 ##3 Â Â Â Â 1.867 Â Â Â Â Â 48
ii. Grouping, Aggregation
Basically, to aggregate data by grouping, data frames support some commonly used functions. Moreover, we can compute a histogram of the waiting time in the faithful DataSet as below.
For Example
# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting))) ## Â waiting count ##1 Â Â Â Â Â 70 Â Â Â Â 4 ##2 Â Â Â Â Â 67 Â Â Â Â 1 ##3 Â Â Â Â Â 69 Â Â Â Â 2
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting)) head(arrange(waiting_counts, desc(waiting_counts$count))) ## Â Â waiting count ##1 Â Â Â Â Â 78 Â Â Â 15 ##2 Â Â Â Â Â 83 Â Â Â 14 ##3 Â Â Â Â Â 81 Â Â Â 13
iii. Operating on Columns
Basically, SparkR offers the number of functions. Â Moreover, those can directly apply to columns for data processing and during aggregation. Although, Here below example shows the use of basic arithmetic functions.
For Example
# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60 head(df) ## Â eruptions waiting waiting_secs ##1 Â Â Â Â 3.600 Â Â Â Â Â 79 Â Â Â Â Â Â Â Â 4740 ##2 Â Â Â Â 1.800 Â Â Â Â Â 54 Â Â Â Â Â Â Â Â 3240 ##3 Â Â Â Â 3.333 Â Â Â Â Â 74 Â Â Â Â Â Â Â Â 4440
7. Machine Learning Algorithms in Spark and R
There are following machine learning algorithms currently, supported by Spark and R.
i. Classification
spark.logit: Logistic Regression
spark.mlp: Multilayer Perceptron (MLP)
spark.naiveBayes: Naive Bayes
spark.svmLinear: Linear Support Vector Machine
ii. Regression
spark.survreg: Accelerated Failure Time (AFT) Survival Model
spark.glm or glm: Generalized Linear Model (GLM)
spark.isoreg: Isotonic Regression
iii. Tree
spark.gbt: Gradient Boosted Trees for Regression and Classification
spark.randomForest: Random Forest for Regression and Classification
iv. Clustering
spark.bisectingKmeans: Bisecting k-means
spark.gaussianMixture: Gaussian Mixture Model (GMM)
spark.kmeans: K-Means
spark.lda: Latent Dirichlet Allocation (LDA)
v. Collaborative Filtering
spark.als: Alternating Least Squares (ALS)
vi. Frequent Pattern Mining
spark.fpGrowth : FP-growth
vii. Statistics
spark.kstest: Kolmogorov-Smirnov Test
Basically, SparkR uses MLlib to train the model. Moreover, It supports a subset of the available R formula operators. For example, model fitting, including ‘~’, ‘.’, ‘:’, ‘+’, and ‘-‘.
viii. Model persistence
Basically, to define well, below example shows how to save/load an MLlib model by SparkR.
For Example
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Basically, Fit a generalized linear model of family “gaussian” with spark.glm
df_list <- randomSplit(training, c(7,3), 2) gaussianDF <- df_list[[1]] gaussianTestDF <- df_list[[2]] gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian") # Save and then load a fitted MLlib model modelPath <- tempfile(pattern = "ml", fileext = ".tmp") write.ml(gaussianGLM, modelPath) gaussianGLM2 <- read.ml(modelPath) # Check model summary summary(gaussianGLM2) # Check model prediction gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF) head(gaussianPredictions) unlink(modelPath)
8. Conclusion
Hence, we have seen, how through SparkR we can use Apache Spark. Basically, we have learned the whole concept regarding the creation of SparkR DataFrames. Moreover, we have seen all the operations of SparkR. Further, we have learned about MLlib Algorithms. Hence, we have learned the whole Concept of Spark and R.
Finally, if you want to ask any query, feel free to ask in the Comment section.
Your 15 seconds will encourage us to work even harder
Please share your happy experience on Google
Great Article but how to get the dataset which you have mentioned