SparkR in Apache Spark | Operations & Algorithms in Spark and R
Today, in this article, we will learn the whole concept of SparkR. First, we will learn the process of creating SparkR DataFrames. Moreover, we will also learn some SparkR DataFrame operations. Further, we will learn MLlib algorithms exposed by Spark and R. Basically, an R package that provides a light-weight frontend to use Apache Spark from R is what we call SparkR.
2. What is SparkR?
SparkR is nothing but an R package. Basically, that provides a light-weight frontend to use Apache Spark from R . Initially, with Spark 1.4.x, it offers a distributed DataFrame implementation. Also, supports various operations. For example, selection, filtering, aggregation and many more. Moreover, using MLlib it also supports distributed machine learning.
3. SparkDataFrame in SparkR
Data is organized as a distributed collection of data into named columns. That we call on Spark DataFrame. Basically, it is as same as a table in a relational database or a data frame in R. Moreover, we can construct a DataFrame from a wide array of sources. For example structured data files, tables in Hive, external databases. Also, existing local R data frames are used for construction.
4. Starting Up: SparkSession
Basically, SparkSession is an entry point into SparkR. Also, connects your R program to a Spark cluster. In addition, By using sparkR.session, we can create a SparkSession. Also, pass in options such as the application name, any spark packages depended on and many more. Moreover, we can work with SparkDataFrames via SparkSession.
In addition, there is a condition that SparkSession should already be created to work from the sparkR shell. Hence, we would not need to call sparkR.session.
Moreover, there is a condition that SparkSession should already be created. Since we want to work from the SparkR shell. Hence, we would not need to call sparkR.session.
5. Creating SparkDataFrames in SparkR
Basically, by using a SparkSession, applications can create SparkDataFrames. For example data sources like local R data frame, Hive table, or other data sources.
i. From local dataframes
Basically, we can create a data frame in the very simplest way. By just only conversion of a local R data frame into a Spark DataFrame. Although, we can create by using as.DataFrame or createDataFrame. Also, by passing in the local R data frame to create a Spark DataFrame.
Hence, by using the faithful dataset from R, we are creating a SparkDataFrame based.
df <- as.DataFrame(faithful) # Displays the first part of the SparkDataFrame head(df) ## eruptions waiting ##1 3.600 79 ##2 1.800 54 ##3 3.333 74
ii. From Data Sources
Through the SparkDataFrame interface, SparkR supports operating on a variety of data sources. Basically, for creating SparkDataFrames, the general method from data sources is read.df. Generally, This method takes in the path for the file to load. Also, the type of data source and the currently active SparkSession will be automatically used. Moreover, It supports reading JSON, CSV and Parquet files natively.
In addition, we can add these packages by specifying two conditions. Such as, if packages with spark-submit or its commands. Else, if initializing SparkSession with the spark Packages parameter. Either in an interactive R shell or from RStudio.
sparkR.session(sparkPackages = “com.databricks:spark-avro_2.11:3.0.0”)
Basically, we have seen how to use data sources using an example JSON input file. Although the file that is used here is not a typical JSON file. Basically, each line in the file must contain a separate, valid JSON object.
people <- read.df("./examples/src/main/resources/people.json", "json") head(people) ## age name ##1 NA Michael ##2 30 Andy ##3 19 Justin # SparkR automatically infers the schema from the JSON file printSchema(people) # root # |-- age: long (nullable = true) # |-- name: string (nullable = true) # Similarly, multiple files can be read with read.json people <- read.json(c("./examples/src/main/resources/people.json", "./examples/src/main/resources/people2.json")) In addition, the data sources API natively supports CSV formatted input files. df <- read.df(csvPath, "csv", header = "true", inferSchema = "true", na.strings = "NA")
The data sources API can also be used to save out SparkDataFrames into multiple file formats.
For example, we can save the SparkDataFrame from the previous example to a Parquet file using write.df.
write.df(people, path = “people.parquet”, source = “parquet”, mode = “overwrite”)
iii. SparkDataFrames from Hive tables
Creation of SparkDataFrames is also possible from Hive tables. At first, we will need to create a SparkSession with Hive support. Basically, that helps to access tables in the Hive MetaStore. Moreover, there is only one need that Spark should have been built with Hive support. Although, SparkR attempt to create a SparkSession with Hive support enabled by default. (enableHiveSupport = TRUE).
sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING)") sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src") # Queries can be expressed in HiveQL. results <- sql("FROM src SELECT key, value") # results is now a SparkDataFrame head(results) ## key value ## 1 238 val_238 ## 2 86 val_86 ## 3 311 val_311
6. SparkDataFrame Operations
Basically, for structured data processing SparkDataFrames support a number of functions. Some of the basic examples are:
i. Selecting rows, columns
# Create the SparkDataFrame df <- as.DataFrame(faithful) # Get basic information about the SparkDataFrame df ## SparkDataFrame[eruptions:double, waiting:double] # Select only the "eruptions" column head(select(df, df$eruptions)) ## eruptions ##1 3.600 ##2 1.800 ##3 3.333 # You can also pass in column name as strings head(select(df, "eruptions")) # Filter the SparkDataFrame to only retain rows with wait times shorter than 50 mins head(filter(df, df$waiting < 50)) ## eruptions waiting ##1 1.750 47 ##2 1.750 47 ##3 1.867 48
ii. Grouping, Aggregation
Basically, to aggregate data by grouping, data frames support some commonly used functions. Moreover, we can compute a histogram of the waiting time in the faithful DataSet as below.
# We use the `n` operator to count the number of times each waiting time appears
head(summarize(groupBy(df, df$waiting), count = n(df$waiting))) ## waiting count ##1 70 4 ##2 67 1 ##3 69 2
# We can also sort the output from the aggregation to get the most common waiting times
waiting_counts <- summarize(groupBy(df, df$waiting), count = n(df$waiting)) head(arrange(waiting_counts, desc(waiting_counts$count))) ## waiting count ##1 78 15 ##2 83 14 ##3 81 13
iii. Operating on Columns
Basically, SparkR offers the number of functions. Moreover, those can directly apply to columns for data processing and during aggregation. Although, Here below example shows the use of basic arithmetic functions.
# Convert waiting time from hours to seconds.
# Note that we can assign this to a new column in the same SparkDataFrame
df$waiting_secs <- df$waiting * 60 head(df) ## eruptions waiting waiting_secs ##1 3.600 79 4740 ##2 1.800 54 3240 ##3 3.333 74 4440
7. Machine Learning Algorithms in Spark and R
There are following machine learning algorithms currently, supported by Spark and R.
spark.logit: Logistic Regression
spark.mlp: Multilayer Perceptron (MLP)
spark.naiveBayes: Naive Bayes
spark.svmLinear: Linear Support Vector Machine
spark.survreg: Accelerated Failure Time (AFT) Survival Model
spark.glm or glm: Generalized Linear Model (GLM)
spark.isoreg: Isotonic Regression
spark.gbt: Gradient Boosted Trees for Regression and Classification
spark.randomForest: Random Forest for Regression and Classification
spark.bisectingKmeans: Bisecting k-means
spark.gaussianMixture: Gaussian Mixture Model (GMM)
spark.lda: Latent Dirichlet Allocation (LDA)
v. Collaborative Filtering
spark.als: Alternating Least Squares (ALS)
vi. Frequent Pattern Mining
spark.fpGrowth : FP-growth
spark.kstest: Kolmogorov-Smirnov Test
Basically, SparkR uses MLlib to train the model. Moreover, It supports a subset of the available R formula operators. For example, model fitting, including ‘~’, ‘.’, ‘:’, ‘+’, and ‘-‘.
viii. Model persistence
Basically, to define well, below example shows how to save/load an MLlib model by SparkR.
training <- read.df("data/mllib/sample_multiclass_classification_data.txt", source = "libsvm")
# Basically, Fit a generalized linear model of family “gaussian” with spark.glm
df_list <- randomSplit(training, c(7,3), 2) gaussianDF <- df_list[] gaussianTestDF <- df_list[] gaussianGLM <- spark.glm(gaussianDF, label ~ features, family = "gaussian") # Save and then load a fitted MLlib model modelPath <- tempfile(pattern = "ml", fileext = ".tmp") write.ml(gaussianGLM, modelPath) gaussianGLM2 <- read.ml(modelPath) # Check model summary summary(gaussianGLM2) # Check model prediction gaussianPredictions <- predict(gaussianGLM2, gaussianTestDF) head(gaussianPredictions) unlink(modelPath)
Hence, we have seen, how through SparkR we can use Apache Spark. Basically, we have learned the whole concept regarding the creation of SparkR DataFrames. Moreover, we have seen all the operations of SparkR. Further, we have learned about MLlib Algorithms. Hence, we have learned the whole Concept of Spark and R.
Finally, if you want to ask any query, feel free to ask in the Comment section.