How to Create RDDs in Apache Spark?
1. Objective
In this Spark tutorial, we are going to understand different ways of how to create RDDs in Apache Spark. We will understand Spark RDDs and 3 ways of creating RDDs in Spark – Using parallelized collection, from existing Apache Spark RDDs and from external datasets. We will also understand Spark RDDs creation with examples to get in-depth knowledge of how to create RDDs in Spark.
2. How to Create RDDs in Apache Spark?
Resilient Distributed Datasets (RDD) is the fundamental data structure of Spark. RDDs are immutable and fault tolerant in nature. These are distributed collections of objects. The datasets are divided into a logical partition, which is further computed on different nodes over the cluster. Thus, RDD is just the way of representing dataset distributed across multiple machines, which can be operated around in parallel. RDDs are called resilient because they have the ability to always re-compute an RDD. Let us revise Spark RDDs in depth here.
Now as we have already seen what is RDD in Spark, let us see how to create Spark RDDs.
There are three ways to create an RDD in Spark.
- Parallelizing already existing collection in driver program.
- Referencing a dataset in an external storage system (e.g. HDFS, Hbase, shared file system).
- Creating RDD from already existing RDDs.
Learn: RDD Persistence and Caching Mechanism in Apache Spark
Let us learn these in details below:
i. Parallelized collection (parallelizing)
In the initial stage when we learn Spark, RDDs are generally created by parallelized collection i.e. by taking an existing collection in the program and passing it to SparkContext’s parallelize() method. This method is used in the initial stage of learning Spark since it quickly creates our own RDDs in Spark shell and performs operations on them. This method is rarely used outside testing and prototyping because this method requires entire dataset on one machine.
Consider the following example of sortByKey(). In this, the data to be sorted is taken through parallelized collection:
[php]val data=spark.sparkContext.parallelize(Seq((“maths”,52),(“english”,75),(“science”,82), (“computer”,65),(“maths”,85)))
val sorted = data.sortByKey()
sorted.foreach(println)[/php]
The key point to note in parallelized collection is the number of partition the dataset is cut into. Spark will run one task for each partition of cluster. We require two to four partitions for each CPU in cluster. Spark sets number of partition based on our cluster. But we can also manually set the number of partitions. This is achieved by passing number of partition as second parameter to parallelize .
e.g. sc.parallelize(data, 10), here we have manually given number of partition as 10.
Consider one more example, here we have used parallelized collection and manually given the number of partitions:
[php]val rdd1 = spark.sparkContext.parallelize(Array(“jan”,”feb”,”mar”,”april”,”may”,”jun”),3)
val result = rdd1.coalesce(2)
result.foreach(println)[/php]
ii. External Datasets (Referencing a dataset)
In Spark, the distributed dataset can be formed from any data source supported by Hadoop, including the local file system, HDFS, Cassandra, HBase etc. In this, the data is loaded from the external dataset. To create text file RDD, we can use SparkContext’s textFile method. It takes URL of the file and read it as a collection of line. URL can be a local path on the machine or a hdfs://, s3n://, etc.
The point to jot down is that the path of the local file system and worker node should be the same. The file should be present at same destinations both in the local file system and worker node. We can copy the file to the worker nodes or use a network mounted shared file system.
Learn: Spark Shell Commands to Interact with Spark-Scala
DataFrameReader Interface is used to load a Dataset from external storage systems (e.g. file systems, key-value stores, etc). Use SparkSession.read to access an instance of DataFrameReader.DataFrameReader supports many file formats-
a. csv (String path)
It loads a CSV file and returns the result as a Dataset<Row>.
Example:
[php]import org.apache.spark.sql.SparkSession
def main(args: Array[String]):Unit = {
object DataFormat {
val spark =Â SparkSession.builder.appName(“AvgAnsTime”).master(“local”).getOrCreate()
val dataRDD = spark.read.csv(“path/of/csv/file”).rdd[/php]
Note – Here .rdd method is used to convert Dataset<Row> to RDD<Row>.
b. json (String path)
It loads a JSON file (one object per line) and returns the result as a Dataset<Row>
[php]val dataRDD = spark.read.json(“path/of/json/file”).rdd[/php]
c. textFile (String path)
It loads text files and returns a Dataset of String.
[php]val dataRDD = spark.read.textFile(“path/of/text/file”).rdd[/php]
Learn: RDD lineage in Spark: ToDebugString Method
iii. Creating RDD from existing RDD
Transformation mutates one RDD into another RDD, thus transformation is the way to create an RDD from already existing RDD. This creates difference between Apache Spark and Hadoop MapReduce. Transformation acts as a function that intakes an RDD and produces one. The input RDD does not get changed, because RDDs are immutable in nature but it produces one or more RDD by applying operations. Some of the operations applied on RDD are: filter, count, distinct, Map, FlatMap etc.
Example:
[php]val words=spark.sparkContext.parallelize(Seq(“the”, “quick”, “brown”, “fox”, “jumps”, “over”, “the”, “lazy”, “dog”))
val wordPair = words.map(w => (w.charAt(0), w))
wordPair.foreach(println)[/php]
Note – In above code RDD “wordPair” is created from existing RDD “word” using map() transformation which contains word and its starting character together.
Read: Limitations of Spark RDD
4. Conclusion
Now as we have seen how to create RDDs in Apache Spark, let us learn RDD transformations and Actions in Apache Spark with the help of examples. If you like this blog or you have any query to create RDDs in Apache Spark, so let us know by leaving a comment in the comment box.
Reference:
http://spark.apache.org/
See Also-
Did we exceed your expectations?
If Yes, share your valuable feedback on Google
Amazing . Data flair blogs are one of the best and self learning content.
Great work, Keep it up team.
To be honest, your articles are clear & spot-on.
Can you clarify my below queries :
1. RDDs are partitioned in Driver? If it is partitioned in Driver, the RDDs are passed on to Executors thru network?
2. Can all the partitions of RDDs reside in the same executor & parallelly operated upon by many task?
1.Yes RDDs are partitioned in the Driver by the SparkContext (user can define the number of partitions or it does it automatically based on the number of partitions that can be run in each CPU.) 2.Then those partitions are sent to the Worker Node by the Cluster Manager where the actual computation takes place.
3.Suppose you have 1 Executor containing 1 CPU. 1 CPU can execute up to 4 partitions. Then the cluster manager will send 4 partitions to that Executor, 3 partitions will remain in the cache and at a time only 1 task process 1 partition. Next the second partition will be processed by the task while others wait in the cache.
4.This entire cache-task process takes place based on the Spark’s internal scheduling algorithm.
Good Work
It loads a JSON file (one object per line) and returns the result as a Dataset
val dataRDD = spark.read.json(“path/of/json/file”).rdd
Why is the result Dataset and not RDD like in the case of loading csv