How to Create RDDs in Apache Spark?

Boost your career with Free Big Data Courses!!

1. Objective

In this Spark tutorial, we are going to understand different ways of how to create RDDs in Apache Spark. We will understand Spark RDDs and 3 ways of creating RDDs in Spark – Using parallelized collection, from existing Apache Spark RDDs and from external datasets. We will also understand Spark RDDs creation with examples to get in-depth knowledge of how to create RDDs in Spark.

how to create RDDs in spark

2. How to Create RDDs in Apache Spark?

Resilient Distributed Datasets (RDD) is the fundamental data structure of Spark. RDDs are immutable and fault tolerant in nature. These are distributed collections of objects. The datasets are divided into a logical partition, which is further computed on different nodes over the cluster. Thus, RDD is just the way of representing dataset distributed across multiple machines, which can be operated around in parallel. RDDs are called resilient because they have the ability to always re-compute an RDD. Let us revise Spark RDDs in depth here.
Now as we have already seen what is RDD in Spark, let us see how to create Spark RDDs.
There are three ways to create an RDD in Spark.

  • Parallelizing already existing collection in driver program.
  • Referencing a dataset in an external storage system (e.g. HDFS, Hbase, shared file system).
  • Creating RDD from already existing RDDs.

Learn: RDD Persistence and Caching Mechanism in Apache Spark
Let us learn these in details below:

i. Parallelized collection (parallelizing)

In the initial stage when we learn Spark, RDDs are generally created by parallelized collection i.e. by taking an existing collection in the program and passing it to SparkContext’s parallelize() method. This method is used in the initial stage of learning Spark since it quickly creates our own RDDs in Spark shell and performs operations on them. This method is rarely used outside testing and prototyping because this method requires entire dataset on one machine.
Consider the following example of sortByKey(). In this, the data to be sorted is taken through parallelized collection:
[php]val data=spark.sparkContext.parallelize(Seq((“maths”,52),(“english”,75),(“science”,82), (“computer”,65),(“maths”,85)))
val sorted = data.sortByKey()
sorted.foreach(println)[/php]
The key point to note in parallelized collection is the number of partition the dataset is cut into. Spark will run one task for each partition of cluster. We require two to four partitions for each CPU in cluster. Spark sets number of partition based on our cluster. But we can also manually set the number of partitions. This is achieved by passing number of partition as second parameter to parallelize .
e.g. sc.parallelize(data, 10), here we have manually given number of partition as 10.
Consider one more example, here we have used parallelized collection and manually given the number of partitions:
[php]val rdd1 = spark.sparkContext.parallelize(Array(“jan”,”feb”,”mar”,”april”,”may”,”jun”),3)
val result = rdd1.coalesce(2)
result.foreach(println)[/php]

ii. External Datasets (Referencing a dataset)

In Spark, the distributed dataset can be formed from any data source supported by Hadoop, including the local file system, HDFS, Cassandra, HBase etc. In this, the data is loaded from the external dataset. To create text file RDD, we can use SparkContext’s textFile method. It takes URL of the file and read it as a collection of line. URL can be a local path on the machine or a hdfs://, s3n://, etc.
The point to jot down is that the path of the local file system and worker node should be the same. The file should be present at same destinations both in the local file system and worker node.  We can copy the file to the worker nodes or use a network mounted shared file system.
Learn: Spark Shell Commands to Interact with Spark-Scala
DataFrameReader Interface is used to load a Dataset from external storage systems (e.g. file systems, key-value stores, etc). Use SparkSession.read to access an instance of DataFrameReader.DataFrameReader supports many file formats-

a. csv (String path)

It loads a CSV file and returns the result as a Dataset<Row>.
Example:
[php]import org.apache.spark.sql.SparkSession
def main(args: Array[String]):Unit = {
object DataFormat {
val spark =  SparkSession.builder.appName(“AvgAnsTime”).master(“local”).getOrCreate()
val dataRDD = spark.read.csv(“path/of/csv/file”).rdd[/php]
Note – Here .rdd method is used to convert Dataset<Row> to RDD<Row>.

b. json (String path)

It loads a JSON file (one object per line) and returns the result as a Dataset<Row>
[php]val dataRDD = spark.read.json(“path/of/json/file”).rdd[/php]

c. textFile (String path)

It loads text files and returns a Dataset of String.
[php]val dataRDD = spark.read.textFile(“path/of/text/file”).rdd[/php]
Learn: RDD lineage in Spark: ToDebugString Method

iii. Creating RDD from existing RDD

Transformation mutates one RDD into another RDD, thus transformation is the way to create an RDD from already existing RDD. This creates difference between Apache Spark and Hadoop MapReduce. Transformation acts as a function that intakes an RDD and produces one. The input RDD does not get changed, because RDDs are immutable in nature but it produces one or more RDD by applying operations. Some of the operations applied on RDD are: filter, count, distinct, Map, FlatMap etc.
Example:
[php]val words=spark.sparkContext.parallelize(Seq(“the”, “quick”, “brown”, “fox”, “jumps”, “over”, “the”, “lazy”, “dog”))
val wordPair = words.map(w => (w.charAt(0), w))
wordPair.foreach(println)[/php]
Note – In above code RDD “wordPair” is created from existing RDD “word” using map() transformation which contains word and its starting character together.
Read: Limitations of Spark RDD

4. Conclusion

Now as we have seen how to create RDDs in Apache Spark, let us learn RDD transformations and Actions in Apache Spark with the help of examples. If you like this blog or you have any query to create RDDs in Apache Spark, so let us know by leaving a comment in the comment box.
Reference:
http://spark.apache.org/
See Also-

If you are Happy with DataFlair, do not forget to make us happy with your positive feedback on Google

follow dataflair on YouTube

5 Responses

  1. Naushad Shaikh says:

    Amazing . Data flair blogs are one of the best and self learning content.
    Great work, Keep it up team.

  2. Hari says:

    To be honest, your articles are clear & spot-on.

    Can you clarify my below queries :

    1. RDDs are partitioned in Driver? If it is partitioned in Driver, the RDDs are passed on to Executors thru network?
    2. Can all the partitions of RDDs reside in the same executor & parallelly operated upon by many task?

    • Nishanth Shanmugam says:

      1.Yes RDDs are partitioned in the Driver by the SparkContext (user can define the number of partitions or it does it automatically based on the number of partitions that can be run in each CPU.) 2.Then those partitions are sent to the Worker Node by the Cluster Manager where the actual computation takes place.
      3.Suppose you have 1 Executor containing 1 CPU. 1 CPU can execute up to 4 partitions. Then the cluster manager will send 4 partitions to that Executor, 3 partitions will remain in the cache and at a time only 1 task process 1 partition. Next the second partition will be processed by the task while others wait in the cache.
      4.This entire cache-task process takes place based on the Spark’s internal scheduling algorithm.

  3. senthil says:

    Good Work

  4. Ravi Hindocha says:

    It loads a JSON file (one object per line) and returns the result as a Dataset
    val dataRDD = spark.read.json(“path/of/json/file”).rdd

    Why is the result Dataset and not RDD like in the case of loading csv

Leave a Reply

Your email address will not be published. Required fields are marked *