PySpark RDD With Operations and Commands

1. Objective

In our last article, we discussed PySpark SparkContext. Today in this PySpark Tutorial, we will see PySpark RDD with operations. After installation and configuration of PySpark on our system, we can easily program in Python on Apache Spark. So, this document focus on manipulating PySpark RDD by applying operations (Transformation and Actions). Well to understand PySpark RDD, we have to learn the basic concept of Spark RDD. Moreover, we will see how to create PySpark RDD.
So, let’s begin PySpark RDD.

PySpark RDD

PySpark RDD With Operations and Commands

Let’s explore best PySpark Books

2. What is Spark RDD?

An Acronym RDD refers to Resilient Distributed Dataset. Basically, RDD is the key abstraction of Apache Spark. In order to do parallel processing on a cluster, these are the elements that run and operate on multiple nodes. Moreover, it is immutable in nature, that says as soon as we create an RDD we cannot change it.
In addition, RDDs have the best feature that is “fault tolerance”. It means if any failure occurs they recover automatically.
Learn about Spark RDDs in detail.

a. Ways to create Spark RDD

So,  to create Spark RDDs, there are 3 ways:
i. Parallelized collections
ii. External datasets
iii. Existing RDDs
To learn all three ways to create RDD in detail, follow the link.

b. Spark RDDs operations

Moreover, to achieve a certain task, we can apply multiple operations on these RDDs.
i. Transformation Operations
Transformation Operations creates a new Spark RDD from the existing one. In addition, this passes the dataset to the function and then returns new dataset as a result.
ii. Action Operations
And, this operation returns final result to driver program or also writes it to the external data store.
Learn RDD Operations in detail.

3. How to Create PySpark RDD?

At very first, we need to create a PySpark RDD to apply any operation in PySpark. For that, here is a code block which has the full detail of a PySpark RDD Class −

class pyspark.RDD (
  jrdd,
  ctx,
  jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

Further, let’s see the way to run a few basic operations using PySpark. So, here is the following code in a Python file creates RDD words, basically, that stores a set of words which is mentioned here.

Do you know PySpark Pros and Cons

words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)

4. Operations in PySpark RDD

Now, we will now run a few operations on those words:

PySpark RDD

Operations in PySpark RDD

Let’s revise PySpark Broadcast and Accumulator

i. count()

With this operation, the number of elements in the RDD is returned.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit count.py

Outcome
Number of elements in RDD → 8

ii. collect()

Basically, this operation returns all the elements in the RDD.

Have a look at PySpark Career Scope

---------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit collect.py

Output
Elements in RDD -> [
  ‘scala’,https://data-flair.training/blogs/python-tutorial-for-beginners/
  ‘java’,
  ‘hadoop’,
  ‘spark’,
  ‘akka’,
  ‘spark vs hadoop’,
  ‘pyspark’,
  ‘pyspark and spark’
]

iii. foreach(f)

foreach(f) operations returns only those elements which meet the condition of the function inside foreach. Here, to prints all the elements in the RDD, we will call a print function in foreach.

Do you know about PySpark Serializers

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit foreach.py

Output
scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

iv. cc

After applying this operation, we will get a new RDD which contains the elements, those satisfy the function inside the filter. Now, here we filter out the strings containing ”spark”, in the following example.

Let’s revise PySpark SparkFiles

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command

$SPARK_HOME/bin/spark-submit filter.py

Output

Fitered RDD -> [
  ‘spark’,
  ‘spark vs hadoop’,
  ‘pyspark’,
  ‘pyspark and spark’
]

v. map(f, preservesPartitioning = False)

By applying a function to each element in the RDD, a new RDD is returned. Now, here, we form a key-value pair and map every string with a value of 1 in the following example.

You must read about PySpark MLlib

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit map.py

Output
Key value pair -> [
  (‘scala’, 1),
  (‘java’, 1),
  (‘hadoop’, 1),
  (‘spark’, 1),
  (‘akka’, 1),
  (‘spark vs hadoop’, 1),
  (‘pyspark’, 1),
  (‘pyspark and spark’, 1)
]

vi. reduce(f)

Here, the element in the RDD is returned, after performing the specified commutative and associative binary operation. So, we are importing add package from the operator and also to carry out a simple addition operation we are applying it on ‘num’, in the following example.

Test how much you know about PySpark

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit reduce.py

Output
Adding all the elements -> 15

vii. join(other, numPartitions = None)

This operation returns RDD with a pair of elements with the matching keys as well as all the values for that particular key. So, there is two pair of elements in two different RDDs, in the following example. So, we get an RDD with elements having matching keys and their values, after joining these two RDDs.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit join.py

Output
Join RDD -> [
  (‘spark’, (1, 2)),  
  (‘hadoop’, (4, 5))
]

Let’s learn about PySpark Profiler

viii. cache()

Moreover, this command, with the default storage level (MEMORY_ONLY), Persist this RDD. Also, we can check if the RDD is cached or not with cache() command

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit cache.py

Output
Words got cached -> True
So, this was all about PySpark RDD and its operations. Hope you like our explanation.

5. Conclusion: PySpark RDD

Hence, we have seen the concept of PySpark RDD. Thus it includes some of the most important operations which are done on PySpark RDD. Also, we have seen the way to create a PySpark RDD in detail. Still, if any doubt, ask in the comment tab.

See also – 

PySpark Interview Questions
For reference

2 Responses

  1. Akshay verma says:

    Nice and an informative article!

    • DataFlair Team says:

      Hi Akshay,
      Thanks for the comment to Pyspark RDD tutorial. Please refer our side for more tutorial, to learn Pyspark.
      Regards,
      DataFlair

Leave a Reply

Your email address will not be published. Required fields are marked *