Spark MLlib Data Types | Apache Spark Machine Learning

1. Objective – Spark MLlib Data Types

Today, in this Spark tutorial, we will learn about all the Apache Spark MLlib Data Types. Machine learning library supports many Data Types. Moreover, in this Spark Machine Learning Data Types, we will discuss local vector, labeled points, local matrix, and distributed matrix.

So, let’s start Spark MLlib Data Types Tutorial.

Spark MLlib Data Types

Spark MLlib Data Types | Apache Spark Machine Learning

Test how much you learned in Spark so far.

Apache Spark Quiz

2. Spark MLlib Data Types – RDD-based API

Basically, Machine learning library supports many Data Types. Such as local vectors and matrices stored on a single machine. Similarly, distributed matrices backed by one or more RDDs. Moreover, local vectors and local matrices are simple data models. However, that serve as public interfaces.
In addition, the breeze offers the underlying linear algebra operations. A training example which we use in supervised learning is what we call a “labeled point” in MLlib.
Basically, Spark MLlib Data Types are of the following categories. Such as,

  1. Local vector
  2. Labeled point
  3. Local matrix
  4. Distributed matrix
  • RowMatrix
  • IndexedRowMatrix
  • CoordinateMatrix
  • BlockMatrix

So, let’s discuss these Spark MLlib Data Types in detail  –

a. Local Vector Data Types

Basically, it has integer-typed and 0-based indices and double-typed values. That is stored on a single machine. Moreover, there are two types of local vectors, which Spark MLlib supports, such as dense and sparse Vector.

i. Dense vector data types

A vector which is backed by a double array representing, its entry values is known as Dense Vector.

ii. Sparse vector data types

A vector which is backed by two parallel arrays like, indices and values are known as Sparse Vector.
For example,
We can represent a vector (1.0, 0.0, 3.0) in dense format as [1.0, 0.0, 3.0]. Whereas in sparse format as (3, [0, 2], [1.0, 3.0]). Here, 3 is the size of the vector.
Vector is base class of local vectors. Also, it provides two implementations, such as DenseVector and SparseVector. Moreover, to create local vectors we recommend using the factory methods implemented in Vectors.

Prepare yourself for Spark Interviews

import org.apache.spark.mllib.linalg.{Vector, Vectors}
// Create a dense vector (1.0, 0.0, 3.0).
val dv: Vector = Vectors.dense(1.0, 0.0, 3.0)
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its indices and values corresponding to nonzero entries.
val sv1: Vector = Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0))
// Create a sparse vector (1.0, 0.0, 3.0) by specifying its nonzero entries.
val sv2: Vector = Vectors.sparse(3, Seq((0, 1.0), (2, 3.0)))

By default Scala imports scala.collection.immutable.Vector. Hence, it is must to import org.apache.spark.mllib.linalg.Vector explicitly to use MLlib’s Vector.

b. Labeled Point Data Types in Spark

Either dense or sparse, associated with a label/response, a labeled point is a local vector. Basically, supervised learning algorithms use labeled points, in MLlib.  Moreover, we use a double to store a label. Therefore, we can use labeled points in both regressions as well as classification. Furthermore, a label should be either 0 (negative) or 1 (positive), for binary classification. However, for multiclass classification, labels should be class indices starting from zero: 0, 1, 2,…
In addition, a labeled point is represented by the case class LabeledPoint.

import org.apache.spark.mllib.linalg.Vectors
import org.apache.spark.mllib.regression.LabeledPoint
// Create a labeled point with a positive label and a dense feature vector.
val pos = LabeledPoint(1.0, Vectors.dense(1.0, 0.0, 3.0))
// Create a labeled point with a negative label and a sparse feature vector.
val neg = LabeledPoint(0.0, Vectors.sparse(3, Array(0, 2), Array(1.0, 3.0)))

c. Local Matrix Data Types

Basically, it has integer-typed row and column indices. Also, double-typed values. All those stored on a single machine. Moreover, MLlib supports dense matrices. In which entry values are stored in a single double array in column-major order also, sparse matrices. In which non-zero entry values are stored in the compressed sparse column (CSC) format in column-major order. Let’s understand this as an example. Here, a dense matrix is stored in a one-dimensional array. [1.0, 3.0, 5.0, 2.0, 4.0, 6.0]. In which the matrix size is (3, 2).

Spark MLlib Data Types

Local Matrix Spark MLlib Data Types

In addition, a matrix is the base class of local matrices. Moreover, it provides two implementations, such as DenseMatrix, and SparseMatrix. Here, to create local matrices we recommend using the factory methods implemented in matrices.
In MLlib, local matrices are stored in column-major order.

Let’s revise the Spark Machine learning algorithm

import org.apache.spark.mllib.linalg.{Matrix, Matrices}
// Create a dense matrix ((1.0, 2.0), (3.0, 4.0), (5.0, 6.0))
val dm: Matrix = Matrices.dense(3, 2, Array(1.0, 3.0, 5.0, 2.0, 4.0, 6.0))
// Create a sparse matrix ((9.0, 0.0), (0.0, 8.0), (0.0, 6.0))
val sm: Matrix = Matrices.sparse(3, 2, Array(0, 1, 3), Array(0, 2, 1), Array(9, 6, 8))

d. Distributed Matrix Data Types

A distributed matrix has long-typed row and column indices and double-typed values, stored distributively in one or more RDDs. It is very important to choose the right format to store large and distributed matrices. Converting a distributed matrix to a different format may require a global shuffle, which is quite expensive. There are four types of distributed matrices, such as 
The underlying RDDs of a distributed matrix must be deterministic because we cache the matrix size. In general, the use of non-deterministic RDDs can lead to errors.

i. RowMatrix in Spark MLlib

It is a row-oriented distributed matrix without meaningful row indices. Basically, it is backed by an RDD of its rows, here each row is a local vector. However, here local vector represents each row. Hence, integer range limits the number of columns. Although it should be much smaller in practice.
In addition, we can create it from an RDD[Vector] instance. Afterwards, we can compute its column summary statistics and decompositions. Moreover, QR decomposition is of the form A = QR. Here Q is an orthogonal matrix. Whereas R is an upper triangular matrix.

import org.apache.spark.mllib.linalg.Vector
import org.apache.spark.mllib.linalg.distributed.RowMatrix
val rows: RDD[Vector] = ... // an RDD of local vectors
// Create a RowMatrix from an RDD[Vector].
val mat: RowMatrix = new RowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// QR decomposition 
val qrResult = mat.tallSkinnyQR(true)

Have a look at Spark Machine Learning with R

ii. IndexedRowMatrix

It is as same as RowMatrix. The only difference here is, with meaningful row indices. Moreover, IndexRowMatrix is backed by an RDD of indexed rows. Hence, its index (long-typed) and a local vector represent its every row.
In addition, we can create it from an RDD[IndexedRow] instance. Here IndexedRow is a wrapper over (Long, Vector). Also, we can convert it to a RowMatrix by dropping its row indices.

import org.apache.spark.mllib.linalg.distributed.{IndexedRow, IndexedRowMatrix, RowMatrix}
val rows: RDD[IndexedRow] = ... // an RDD of indexed rows
// Create an IndexedRowMatrix from an RDD[IndexedRow].
val mat: IndexedRowMatrix = new IndexedRowMatrix(rows)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// Drop its row indices.
val rowMat: RowMatrix = mat.toRowMatrix()

iii. CoordinateMatrix

Basically, it is backed by an RDD of its entries. We can say here, each entry is a tuple of (i: Long, j: Long, value: Double). However, it is the row index, whereas j is the column index. Also, the value is the entry value. Moreover, we only use CoordinateMatrix when both dimensions of the matrix are huge. As well as the matrix is very sparse.
In addition, we can create it from an RDD[MatrixEntry] instance. Here MatrixEntry is a wrapper over (Long, Long, Double). Also, we can convert a CoordinateMatrix to an IndexedRowMatrix with sparse rows by calling to IndexedRowMatrix.
For CoordinateMatrix other computations do not support currently.

You must read fault tolerance in Apache Spark

import org.apache.spark.mllib.linalg.distributed.{CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val mat: CoordinateMatrix = new CoordinateMatrix(entries)
// Get its size.
val m = mat.numRows()
val n = mat.numCols()
// Convert it to an IndexRowMatrix whose rows are sparse vectors.
val indexedRowMatrix = mat.toIndexedRowMatrix()

iv. BlockMatrix

It is backed by an RDD of MatrixBlocks. Here a MatrixBlock is a tuple of ((Int, Int), Matrix). Also, the (Int, Int) is the index of the block. Moreover, here Matrix is the sub-matrix at the given index with size rowsPerBlock x colsPerBlock.
In addition, from an IndexedRowMatrix or CoordinateMatrix by calling toBlockMatrix, we can most easily create it. By default, to BlockMatrix creates blocks of size 1024 x 1024. Moreover, by supplying the values through to BlockMatrix(rowsPerBlock, colsPerBlock) users can change the block size.

import org.apache.spark.mllib.linalg.distributed.{BlockMatrix, CoordinateMatrix, MatrixEntry}
val entries: RDD[MatrixEntry] = ... // an RDD of (i, j, v) matrix entries
// Create a CoordinateMatrix from an RDD[MatrixEntry].
val coordMat: CoordinateMatrix = new CoordinateMatrix(entries)
// Transform the CoordinateMatrix to a BlockMatrix
val matA: BlockMatrix = coordMat.toBlockMatrix().cache()
// Validate whether the BlockMatrix is set up properly. Throws an Exception when it is not valid.
// Nothing happens if it is valid.
// Calculate A^T A.
val ata = matA.transpose.multiply(matA)

So, this was all in Apache Spark MLlib Data Types. Hope you like our explanation.

3. Conclusion

Hence, in this Spark MLlib Data Types, we have seen all Data Types of Machine Learning. However, if you want to ask any Query regarding Spark MLlin Data Types, feel free to ask in the comment section.

See also –

Data Type mapping Between R and Spark
For reference

Leave a Reply

Your email address will not be published. Required fields are marked *