# Apache Spark Machine Learning Algorithm – Example & Clustering

## 1. Objective – Spark Machine Learning

Today, in this Spark Tutorial, we will see the concept of Spark Machine Learning. Moreover, we will discuss each and every detail in the algorithms of Apache Spark Machine Learning. Also, we will learn about MLlib, statistics in Machine learning algorithms with Spark. Along with this, we will see regression, classification, and filtering in the Spark Machine Learning Algorithm. At last, we are going to discuss the important term that is clustering in Machine Learning.

MLlib is Spark’s scalable machine learning library consisting of common machine learning algorithms in spark. For example, basic statistics, classification, regression, clustering, collaborative filtering.

So, let’s start to spark Machine Learning tutorial.

## 2. Machine Learning Algorithm (MLlib)

MLlib is nothing but a machine learning (ML) library of Apache Spark. Basically, it helps to make practical machine learning scalable and easy. Moreover, it provides the following ML Algorithms:

1. Basic statistics
2. Classification and Regression
3. Clustering
4. Collaborative filtering

Furthermore, let’s start discussing each Machine Learning algorithm one by one.

## 3. Spark Machine Learning Algorithm Statistics

This Machine Learning algorithm in spark consists of several algorithms, such as:

• Summary statistics
• Correlations
• Stratified sampling
• Hypothesis testing
• Random data generation

### a. Spark Machine Learning Algorithm – Summary Statistics

Basically, for RDD[Vector] we offer column summary statistics. Moreover, it is possible through the function colStats, available in statistics.
In addition, function colStats() returns an instance of MultivariateStatisticalSummary. That contains the column-wise max, min, mean, variance, and the number of nonzeros, as well as the total count.

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.stat.{MultivariateStatisticalSummary, Statistics}
val observations: RDD[Vector] = ... // an RDD of Vectors
// Compute column summary statistics.
val summary: MultivariateStatisticalSummary = Statistics.colStats(observations)
println(summary.mean) // a dense vector containing the mean value for each column
println(summary.variance) // column-wise variance
println(summary.numNonzeros) // number of nonzeros in each column

### b. Spark Machine Learning Algorithm – Correlations

In statistics, calculating the correlation of two series is a common operation. Although, MLlib offers some flexibility. Basically, it helps to calculate pairwise correlations among many series. Moreover, it currently supports two correlation methods. For example, Pearson’s and Spearman’s correlation.
In addition, statistics offers methods to calculate correlations between series. However, it depends on the type of input either two RDD[Double]s or an RDD[Vector]. Therefore, the output will be a double or the correlation matrix respectively.

import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.stat.Statistics
val sc: SparkContext = ...
val seriesX: RDD[Double] = ... // a series
val seriesY: RDD[Double] = ... // must have the same number of partitions and cardinality as seriesX
// compute the correlation using Pearson's method. Enter "spearman" for Spearman's method. If a
// method is not specified, Pearson's method will be used by default.
val correlation: Double = Statistics.corr(seriesX, seriesY, "pearson")
val data: RDD[Vector] = ... // note that each Vector is a row and not a column
// calculate the correlation matrix using Pearson's method. Use "spearman" for Spearman's method.
// If a method is not specified, Pearson's method will be used by default.
val correlMatrix: Matrix = Statistics.corr(data, "pearson")

### c.  Spark Machine Learning Algorithm – Stratified Sampling

Basically, stratified sampling methods, sampleByKey and sampleByKeyExact, can be performed on RDD’s of key-value pairs. However, for stratified sampling, we can consider keys as a label and the value as a specific attribute.
Moreover, let’s understand this by an example, consider the key as a man or woman, or document ids. Whereas respective values can be the list of ages of the people in the population or the list of words in the documents. However, to decide whether an observation will be sampled or not, the sampleByKey method will flip a coin. Therefore requires one pass over the data. Also provides an expected sample size.

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
import org.apache.spark.rdd.PairRDDFunctions
val sc: SparkContext = ...
val data = ... // an RDD[(K, V)] of any key value pairs
val fractions: Map[K, Double] = ... // specify the exact fraction desired from each key
// Get an exact sample from each stratum
val approxSample = data.sampleByKey(withReplacement = false, fractions)
val exactSample = data.sampleByKeyExact(withReplacement = false, fractions)

### d. Spark Machine Learning Algorithm – Hypothesis Testing

In statistics, we can determine whether a result is statistically significant, whether this result occurred by chance or not. Hence, to determine that a powerful tool is Hypothesis testing. Basically, MLlib currently supports Pearson’s chi-squared (χ2χ2). Although it tests for goodness of fit and independence. Moreover, the input data types determine whether the goodness of fit or the independence test is conducted. Also, The goodness of fit test requires an input type of vector, whereas the independence test requires a matrix as input.
In addition, MLlib also supports the input type RDD[LabeledPoint]. Moreover, it helps to enable feature selection via chred independence tests.
Moreover, statistics offers methods to run Pearson’s chi-squared tests. Furthermore, the example below demonstrates how to run and interpret hypothesis tests.
For Example

import org.apache.spark.SparkContext
import org.apache.spark.mllib.linalg._
import org.apache.spark.mllib.regression.LabeledPoint
import org.apache.spark.mllib.stat.Statistics._
val sc: SparkContext = ...
val vec: Vector = ... // a vector composed of the frequencies of events
// compute the goodness of fit. If a second vector to test against is not supplied as a parameter,
// the test runs against a uniform distribution.
val goodnessOfFitTestResult = Statistics.chiSqTest(vec)
println(goodnessOfFitTestResult) // summary of the test including the p-value, degrees of freedom,
// test statistic, the method used, and the null hypothesis.
val mat: Matrix = ... // a contingency matrix
// conduct Pearson's independence test on the input contingency matrix
val independenceTestResult = Statistics.chiSqTest(mat)
println(independenceTestResult) // summary of the test including the p-value, degrees of freedom...
val obs: RDD[LabeledPoint] = ... // (feature, label) pairs.
// The contingency table is constructed from the raw (feature, label) pairs and used to conduct
// the independence test. Returns an array containing the ChiSquaredTestResult for every feature
// against the label.
val featureTestResults: Array[ChiSqTestResult] = Statistics.chiSqTest(obs)
var i = 1
featureTestResults.foreach { result =>
println(s"Column $i:\n$result")
i += 1
} // summary of the test

### e. Spark Machine Learning Algorithm – Random Data Generation

Basically, for randomized algorithms, prototyping, and performance testing, we use Random data generation. Moreover, MLlib also supports generating random RDDs with i.i.d. Values. However, drawn from a given distribution, either uniform, standard normal, or Poisson.
In addition, to generate random double RDDs or vector RDDs, RandomRDDs offers factory methods. Let’s understand this in the following example. Moreover, it generates a random double RDD, whose values follow the standard normal distribution N(0, 1). Also map it to N(1, 4).

import org.apache.spark.SparkContext
import org.apache.spark.mllib.random.RandomRDDs._
val sc1: SparkContext = ...
// Generate a random double RDD that contains 1 million i.i.d. values drawn from the
// standard normal distribution N(0, 1), evenly distributed in 10 partitions.
val u1 = normalRDD(sc, 1000000L, 10)
// Apply a transform to get a random double RDD following N(1, 4).
val v1 = u.map(x => 1.0 + 2.0 * x)

## 4. Spark Machine Learning Algorithm – Classification and Regression

### a. Classification in Spark Machine Learning algorithm

#### i. Logistic regression

To predict a categorical response, logistic regression is a popular method. Basically, it is a special case of Generalized Linear models. Also helps to predict the probability of the outcomes. Moreover, to predict a binary outcome by using binomial logistic regression, we can use logistic regression in spark.ml. Also, we can use it to predict a multiclass outcome by using multinomial logistic regression.

#### ii. Decision tree classifier

Basically, A popular family of classification and regression methods is decision trees.
For Example
In the below examples load a dataset in LibSVM format, split it into training and test sets, train on the first dataset, and then evaluate the held-out test set. Although, we use two feature transformers to prepare the data. Basically, these help index categories for the label and categorical features. Also, helps for adding metadata to the DataFrame which the decision tree algorithm can recognize.

import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.classification.DecisionTreeClassificationModel
import org.apache.spark.ml.classification.DecisionTreeClassifier
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
// Load the data stored in LIBSVM format as a DataFrame.
val data = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
// Index labels, adding metadata to the label column.
// Fit on whole dataset to include all labels in index.
val labelIndexer = new StringIndexer()
.setInputCol("label")
.setOutputCol("indexedLabel")
.fit(data)
// Automatically identify categorical features, and index them.
val featureIndexer = new VectorIndexer()
.setInputCol("features")
.setOutputCol("indexedFeatures")
.setMaxCategories(4) // features with > 4 distinct values are treated as continuous.
.fit(data)
// Split the data into training and test sets (30% held out for testing).
val Array(trainingData, testData) = data.randomSplit(Array(0.7, 0.3))
// Train a DecisionTree model.
val dt = new DecisionTreeClassifier()
.setLabelCol("indexedLabel")
.setFeaturesCol("indexedFeatures")
// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
.setInputCol("prediction")
.setOutputCol("predictedLabel")
.setLabels(labelIndexer.labels)
// Chain indexers and tree in a Pipeline.
val pipeline = new Pipeline()
.setStages(Array(labelIndexer, featureIndexer, dt, labelConverter))
// Train model. This also runs the indexers.
val model = pipeline.fit(trainingData)
// Make predictions.
val predictions = model.transform(testData)
// Select example rows to display.
predictions.select("predictedLabel", "label", "features").show(5)
// Select (prediction, true label) and compute test error.
val evaluator = new MulticlassClassificationEvaluator()
.setLabelCol("indexedLabel")
.setPredictionCol("prediction")
.setMetricName("accuracy")
val accuracy = evaluator.evaluate(predictions)
println("Test Error = " + (1.0 - accuracy))
val treeModel = model.stages(2).asInstanceOf[DecisionTreeClassificationModel]
println("Learned classification tree model:\n" + treeModel.toDebugString)

### b. Regression in Spark Machine Learning algorithm

#### i. Linear regression

Basically, for working with linear regression models and model summaries, the interface is similar to the logistic regression case.
Example for Regression in Machine Learning algorithm
For Example
Moreover, Below example shows training an elastic net regularized linear regression model and extracting model summary statistics.

import org.apache.spark.ml.regression.LinearRegression
// Load training data
val training = spark.read.format("libsvm")
.load("data/mllib/sample_linear_regression_data.txt")
val lr = new LinearRegression()
.setMaxIter(10)
.setRegParam(0.3)
.setElasticNetParam(0.8)
// Fit the model
val lrModel = lr.fit(training)
// Print the coefficients and intercept for linear regression
println(s"Coefficients: ${lrModel.coefficients} Intercept:${lrModel.intercept}")
// Summarize the model over the training set and print out some metrics
val trainingSummary = lrModel.summary
println(s"numIterations: ${trainingSummary.totalIterations}") println(s"objectiveHistory: [${trainingSummary.objectiveHistory.mkString(",")}]")
trainingSummary.residuals.show()
println(s"RMSE: ${trainingSummary.rootMeanSquaredError}") println(s"r2:${trainingSummary.r2}")

## 5. Collaborative filtering in Spark Machine Learning algorithm

For recommender systems, we commonly use collaborative filtering. Also, to fill in the missing entries of a user-item association matrix, we use these techniques. Although, spark.ml currently supports model-based collaborative filtering.
In addition, these filtering users and products are described by a small set of latent factors. Basically, those we can use to predict missing entries. Also, spark.ml uses the alternating least squares (ALS) algorithm. Moreover, it helps to learn these latent factors. There are following parameters of the implementation in MLlib:

1. Here, numBlocks is the number of blocks. It is used to parallelize computation (set to -1 to auto-configure).
2. Moreover, rank is the number of latent factors in the model.
3. And, iterations is the number of iterations to run.
4. In ALS, lambda specifies the regularization parameter.
5. Moreover, implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data.
6. Basically, alpha is a parameter applicable to the implicit feedback variant of ALS. Also, helps to govern the baseline confidence in preference observations.

### a. Explicit vs. Implicit Feedback

Basically, the standard approach to matrix factorization-based collaborative filtering treats the entries in the user-item matrix as explicit preferences given by the user to the item.
Although, in many real-world use cases, it is common to only have access to implicit feedback. For example, views, clicks, purchases, likes, shares and many more. Moreover, to deal with such data in MLlib, the approach used is taken from collaborative filtering for implicit feedback datasets.

### b. Scaling of the regularization parameter

By the number of ratings the user generated in updating user factors, or the number of ratings the product received in updating product factors, we scale the regularization parameter lambda. Basically, this approach is named “ALS-WR”. Moreover, it makes lambda less dependent on the scale of the dataset. Hence, we can apply the best parameter learned from a sampled subset to the full dataset.

### c. Example for Collaborative Filtering in Machine Learning Algorithm in Spark

For Example
In the example below, we load rating data. Basically, each row consists of a user, a product, and a rating. Moreover, we will use the default ALS.train() method, that assumes ratings are explicit. Also, by measuring the Mean Squared Error of rating prediction we will evaluate the recommendation model.

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.Rating
// Load and parse the data
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
Rating(user.toInt, item.toInt, rate.toDouble)
})
// Build the recommendation model using ALS
val rank = 10
val numIterations = 20
val model = ALS.train(ratings, rank, numIterations, 0.01)
// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
(user, product)
}
val predictions =
model.predict(usersProducts).map { case Rating(user, product, rate) =>
((user, product), rate)
}
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
val err = (r1 - r2)
err * err
}.mean()
println("Mean Squared Error = " + MSE)
In addition, we can use the trainImplicit method to get better results, if the rating matrix is derived from another source of information.
val alpha = 0.01
val model = ALS.trainImplicit(ratings, rank, numIterations, alpha)

## 6. Clustering in Machine Learning algorithm in Spark

Basically, it is an unsupervised learning problem. Here we aim to group subsets of entities with one another on the basis of the notion of similarity. Moreover, we use clustering for exploratory analysis. Also, use it as a component of a hierarchical supervised learning pipeline. However, in that distinct classifiers or regression models are trained for each cluster.
In addition, MLlib supports k-means clustering. However, it is the most commonly used clustering algorithms. Basically, it clusters the data points into the predefined number of clusters. Moreover, here MLlib implementation includes a parallelized variant of the k-means++ method called kmeans||. Moreover, there are following implementation parameters in MLlib:

1. Here, k is the number of desired clusters.
2. Although, maxIterations is the maximum number of iterations to run.
3. Moreover, initializationMode specifies either random initialization or initialization via k-means||.
4. Basically, runs is the number of times to run the k-means algorithm
5. Further, to determines the number of steps in the k-means|| algorithm, we use initializationSteps.
6. Moreover, to determines the distance threshold within which we consider k-means to have converged we use epsilon.

In spark-shell, it is possible to execute the following code snippets.

### a. Example for Clustering in Machine Learning Algorithm in Spark

For Example
Basically, after loading and parsing data, we use the KMeans object in the example to cluster the data into two clusters. Moreover, the number of desired clusters is passed to the algorithm. Afterwards, we compute Within Set Sum of Squared Error (WSSSE). Also, can reduce this error measure by increasing k.

import org.apache.spark.mllib.clustering.KMeans
import org.apache.spark.mllib.linalg.Vectors
// Load and parse the data
val data1 = sc.textFile("data1/mllib/kmeans_data.txt")
val parsedData1 = data.map(s => Vectors.dense(s.split(' ').map(_.toDouble)))
// Cluster the data into two classes using KMeans
val numClusters1 = 2
val numIterations1 = 20
val clusters1 = KMeans.train(parsedData1 numClusters1, numIterations1)
// Evaluate clustering by computing Within Set Sum of Squared Errors
val WSSSE1 = clusters1.computeCost(parsedData)
println("Within Set Sum of Squared Errors = " + WSSSE1)

So, this was all in Apache Spark Machine Learning Algorithm. Hope you like our explanation.

## 7. Conclusion

Hence, in this Spark Machine Learning tutorial, we have seen all the algorithms of Machine Learning in Spark. Moreover, we have learned several examples to understand this well. Also, we discussed classifications, regressions, and clustering in Apache Spark Machine Learning. Still, if you have any query, feel free to ask in the comment section. We will definitely get back to you.
Best books to learn Spark
For reference