PySpark RDD With Operations and Commands

Boost your career with Free Big Data Courses!!

In our last article, we discussed PySpark SparkContext. Today in this PySpark Tutorial, we will see PySpark RDD with operations. After installation and configuration of PySpark on our system, we can easily program in Python on Apache Spark.

So, this document focus on manipulating PySpark RDD by applying operations (Transformation and Actions). Well to understand PySpark RDD, we have to learn the basic concept of Spark RDD. Moreover, we will see how to create PySpark RDD.

So, let’s begin PySpark RDD.

What is Spark RDD?

An Acronym RDD refers to Resilient Distributed Dataset. Basically, RDD is the key abstraction of Apache Spark.

In order to do parallel processing on a cluster, these are the elements that run and operate on multiple nodes. Moreover, it is immutable in nature, that says as soon as we create an RDD we cannot change it.

In addition, RDDs have the best feature that is “fault tolerance”. It means if any failure occurs they recover automatically.

a. Ways to create Spark RDD

So,  to create Spark RDDs, there are 3 ways:
i. Parallelized collections
ii. External datasets
iii. Existing RDDs

b. Spark RDDs operations

Moreover, to achieve a certain task, we can apply multiple operations on these RDDs.
i. Transformation Operations
Transformation Operations creates a new Spark RDD from the existing one. In addition, this passes the dataset to the function and then returns new dataset as a result.
ii. Action Operations
And, this operation returns final result to driver program or also writes it to the external data store.

How to Create PySpark RDD?

At very first, we need to create a PySpark RDD to apply any operation in PySpark. For that, here is a code block which has the full detail of a PySpark RDD Class −

class pyspark.RDD (
  jrdd,
  ctx,
  jrdd_deserializer = AutoBatchedSerializer(PickleSerializer())
)

Technology is evolving rapidly!
Stay updated with DataFlair on WhatsApp!!

Further, let’s see the way to run a few basic operations using PySpark. So, here is the following code in a Python file creates RDD words, basically, that stores a set of words which is mentioned here.

words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)

Operations in PySpark RDD

Now, we will now run a few operations on those words:

PySpark RDD

Operations in PySpark RDD

i. count()

With this operation, the number of elements in the RDD is returned.

----------------------------------------count.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "count app")
words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)
counts = words.count()
print "Number of elements in RDD -> %i" % (counts)
----------------------------------------count.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit count.py

Outcome
Number of elements in RDD → 8

ii. collect()

Basically, this operation returns all the elements in the RDD.

---------------------------------------collect.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Collect app")
words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)
coll = words.collect()
print "Elements in RDD -> %s" % (coll)
----------------------------------------collect.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit collect.py

Output
Elements in RDD -> [
  ‘scala’,https://data-flair.training/blogs/python-tutorial-for-beginners/
  ‘java’,
  ‘hadoop’,
  ‘spark’,
  ‘akka’,
  ‘spark vs hadoop’,
  ‘pyspark’,
  ‘pyspark and spark’
]

iii. foreach(f)

foreach(f) operations returns only those elements which meet the condition of the function inside foreach. Here, to prints all the elements in the RDD, we will call a print function in foreach.

----------------------------------------foreach.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "ForEach app")
words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)
def f(x): print(x)
fore = words.foreach(f)
----------------------------------------foreach.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit foreach.py

Output
scala
java
hadoop
spark
akka
spark vs hadoop
pyspark
pyspark and spark

iv. cc

After applying this operation, we will get a new RDD which contains the elements, those satisfy the function inside the filter. Now, here we filter out the strings containing ”spark”, in the following example.

----------------------------------------filter.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Filter app")
words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)
words_filter = words.filter(lambda x: 'spark' in x)
filtered = words_filter.collect()
print "Fitered RDD -> %s" % (filtered)
----------------------------------------filter.py----------------------------------------

Command

$SPARK_HOME/bin/spark-submit filter.py

Output

Fitered RDD -> [
  ‘spark’,
  ‘spark vs hadoop’,
  ‘pyspark’,
  ‘pyspark and spark’
]

v. map(f, preservesPartitioning = False)

By applying a function to each element in the RDD, a new RDD is returned. Now, here, we form a key-value pair and map every string with a value of 1 in the following example.

----------------------------------------map.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Map app")
words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)
words_map = words.map(lambda x: (x, 1))
mapping = words_map.collect()
print "Key value pair -> %s" % (mapping)
----------------------------------------map.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit map.py

Output
Key value pair -> [
  (‘scala’, 1),
  (‘java’, 1),
  (‘hadoop’, 1),
  (‘spark’, 1),
  (‘akka’, 1),
  (‘spark vs hadoop’, 1),
  (‘pyspark’, 1),
  (‘pyspark and spark’, 1)
]

vi. reduce(f)

Here, the element in the RDD is returned, after performing the specified commutative and associative binary operation. So, we are importing add package from the operator and also to carry out a simple addition operation we are applying it on ‘num’, in the following example.

----------------------------------------reduce.py---------------------------------------
from pyspark import SparkContext
from operator import add
sc = SparkContext("local", "Reduce app")
nums = sc.parallelize([1, 2, 3, 4, 5])
adding = nums.reduce(add)
print "Adding all the elements -> %i" % (adding)
----------------------------------------reduce.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit reduce.py

Output
Adding all the elements -> 15

vii. join(other, numPartitions = None)

This operation returns RDD with a pair of elements with the matching keys as well as all the values for that particular key. So, there is two pair of elements in two different RDDs, in the following example. So, we get an RDD with elements having matching keys and their values, after joining these two RDDs.

----------------------------------------join.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Join app")
x = sc.parallelize([("spark", 1), ("hadoop", 4)])
y = sc.parallelize([("spark", 2), ("hadoop", 5)])
joined = x.join(y)
final = joined.collect()
print "Join RDD -> %s" % (final)
----------------------------------------join.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit join.py

Output
Join RDD -> [
  (‘spark’, (1, 2)),  
  (‘hadoop’, (4, 5))
]

viii. cache()

Moreover, this command, with the default storage level (MEMORY_ONLY), Persist this RDD. Also, we can check if the RDD is cached or not with cache() command

----------------------------------------cache.py---------------------------------------
from pyspark import SparkContext
sc = SparkContext("local", "Cache app")
words = sc.parallelize (
  ["scala",
  "java",
  "hadoop",
  "spark",
  "akka",
  "spark vs hadoop",
  "pyspark",
  "pyspark and spark"]
)
words.cache()
caching = words.persist().is_cached
print "Words got chached > %s" % (caching)
----------------------------------------cache.py---------------------------------------

Command

$SPARK_HOME/bin/spark-submit cache.py

Output
Words got cached -> True
So, this was all about PySpark RDD and its operations. Hope you like our explanation.

Conclusion: PySpark RDD

Hence, we have seen the concept of PySpark RDD. Thus it includes some of the most important operations which are done on PySpark RDD. Also, we have seen the way to create a PySpark RDD in detail. Still, if any doubt, ask in the comment tab.

Did we exceed your expectations?
If Yes, share your valuable feedback on Google

follow dataflair on YouTube

2 Responses

  1. Akshay verma says:

    Nice and an informative article!

    • DataFlair Team says:

      Hi Akshay,
      Thanks for the comment to Pyspark RDD tutorial. Please refer our side for more tutorial, to learn Pyspark.
      Regards,
      DataFlair

Leave a Reply

Your email address will not be published. Required fields are marked *