How to Create RDDs in Apache Spark?


1. Objective

In this Spark tutorial, we are going to understand different ways of how to create RDDs in Apache Spark. We will understand Spark RDDs and 3 ways of creating RDDs in Spark – Using parallelized collection, from existing Apache Spark RDDs and from external datasets. We will also understand Spark RDDs creation with examples to get in-depth knowledge of how to create RDDs in Spark.

Learn How to create RDDs in Apache Spark in detail.

2. How to Create RDDs in Apache Spark?

Resilient Distributed Datasets (RDD) is the fundamental data structure of Spark. RDDs are immutable and fault tolerant in nature. These are distributed collections of objects. The datasets are divided into a logical partition, which is further computed on different nodes over the cluster. Thus, RDD is just the way of representing dataset distributed across multiple machines, which can be operated around in parallel. RDDs are called resilient because they have the ability to always re-compute an RDD. Let us revise Spark RDDs in depth here.

Now as we have already seen what is RDD in Spark, let us see how to create Spark RDDs.

There are three ways to create an RDD in Spark.

  • Parallelizing already existing collection in driver program.
  • Referencing a dataset in an external storage system (e.g. HDFS, Hbase, shared file system).
  • Creating RDD from already existing RDDs.

Let us learn these in details below:

2.1. Parallelized collection (parallelizing)

In the initial stage when we learn Spark, RDDs are generally created by parallelized collection i.e. by taking an existing collection in the program and passing it to SparkContext’s parallelize() method. This method is used in the initial stage of learning Spark since it quickly creates our own RDDs in Spark shell and performs operations on them. This method is rarely used outside testing and prototyping because this method requires entire dataset on one machine.

Consider the following example of sortByKey(). In this, the data to be sorted is taken through parallelized collection:

val data=spark.sparkContext.parallelize(Seq(("maths",52),("english",75),("science",82), ("computer",65),("maths",85)))
val sorted = data.sortByKey()
sorted.foreach(println)

The key point to note in parallelized collection is the number of partition the dataset is cut into. Spark will run one task for each partition of cluster. We require two to four partitions for each CPU in cluster. Spark sets number of partition based on our cluster. But we can also manually set the number of partitions. This is achieved by passing number of partition as second parameter to parallelize .

e.g. sc.parallelize(data, 10), here we have manually given number of partition as 10.

Consider one more example, here we have used parallelized collection and manually given the number of partitions:

val rdd1 = spark.sparkContext.parallelize(Array("jan","feb","mar","april","may","jun"),3)
val result = rdd1.coalesce(2)
result.foreach(println)

2.2. External Datasets (Referencing a dataset)

In Spark, distributed dataset can be formed from any data source supported by Hadoop, including the local file system, HDFS, Cassandra, HBase etc. In this, the data is loaded from the external dataset. To create text file RDD, we can use SparkContext’s textFile method. It takes URL of the file and read it as a collection of line. URL can be a local path on the machine or a hdfs://, s3n://, etc.

The point to jot down is that the path of the local file system and worker node should be same. The file should be present at same destinations both in local file system and worker node.  We can copy the file to the worker nodes or use a network mounted shared file system.

DataFrameReader Interface is used to load a Dataset from external storage systems (e.g. file systems, key-value stores, etc). Use SparkSession.read to access an instance of DataFrameReader.DataFrameReader supports many file formats-

i) csv (String path)

It loads a CSV file and returns the result as a Dataset<Row>.

Example:

import org.apache.spark.sql.SparkSession
def main(args: Array[String]):Unit = {
object DataFormat {
val spark =  SparkSession.builder.appName("AvgAnsTime").master("local").getOrCreate()
val dataRDD = spark.read.csv("path/of/csv/file").rdd

Note – Here .rdd method is used to convert Dataset<Row> to RDD<Row>.

ii) json (String path)

It loads a JSON file (one object per line) and returns the result as a Dataset<Row>

val dataRDD = spark.read.json("path/of/json/file").rdd

iii) textFile (String path)

It loads text files and returns a Dataset of String.

val dataRDD = spark.read.textFile("path/of/text/file").rdd

2.3. Creating RDD from existing RDD

Transformation mutates one RDD into another RDD, thus transformation is the way to create an RDD from already existing RDD. This creates difference between Apache Spark and Hadoop MapReduce. Transformation acts as a function that intakes an RDD and produces one. The input RDD does not get changed, because RDDs are immutable in nature but it produces one or more RDD by applying operations. Some of the operations applied on RDD are: filter, count, distinct, Map, FlatMap etc.

Example:

val words=spark.sparkContext.parallelize(Seq("the", "quick", "brown", "fox", "jumps", "over", "the", "lazy", "dog"))
val wordPair = words.map(w => (w.charAt(0), w))
wordPair.foreach(println)

Note – In above code RDD “wordPair” is created from existing RDD “word” using map() transformation which contains word and its starting character together.

4. Conclusion

Now as we have seen how to create RDDs in Apache Spark, let us learn RDD transformations and Actions in Apache Spark with the help of examples. If you like this blog or you have any query to create RDDs in Apache Spark, so let us know by leaving a comment in the comment box.

Reference:

http://spark.apache.org/

See Also-

Leave a comment

Your email address will not be published. Required fields are marked *