PySpark Broadcast and Accumulator With Examples
As we know, Apache Spark uses shared variables, for parallel processing. Well, Shared Variables are of two types, Broadcast & Accumulator. So, in this PySpark article, “PySpark Broadcast and Accumulator” we will learn the whole concept of Broadcast & Accumulator using PySpark.
So, let’s start the PySpark Broadcast and Accumulator.
PySpark Broadcast and Accumulator
On defining parallel processing, Â when the driver sends a task to the executor on the cluster a copy of shared variable goes on each node of the cluster, so we can use it for performing tasks.
Shared variables supported by Apache Spark in PySpark are two types of −
- Broadcast
- Accumulator
Let’s learn PySpark Broadcast and Accumulator in detail:
Broadcast Variables – PySpark
Basically, to save the copy of data across all nodes, Broadcast variables are used. However, on all the machines this variable is cached, not sent on machines.
Also, we can use it to broadcast some information to all the executors. Although, it can be of any type, either preliminary type or a hash map. For Example,
- Single value
Single value refers to the Common value for all the products.
- Hashmap
Whereas, Hashmap means, look up or map side join.
Moreover, Â broadcasting dimension can have considerable performance improvement, when very large data set (fact) is tried to join with smaller data set (dimension). In addition, these variables are immutable.
Technology is evolving rapidly!
Stay updated with DataFlair on WhatsApp!!
For PySpark, following code block has the details of a Broadcast class:
class pyspark.Broadcast ( sc = None, value = None, pickle_registry = None, path = None )
To use a Broadcast variable, here is an example, showing a Broadcast variable, it has an attribute called value, this attribute stores the data and then it is used to return a broadcasted value, such as:
----------------------------------------broadcast.py-------------------------------------- from pyspark import SparkContext sc = SparkContext("local", "Broadcast app") words_new = sc.broadcast(["shreyash", "rishabh", "Shubham", "Sagar", "Mehul"]) data = words_new.value print "Stored data -> %s" % (data) elem = words_new.value[2] print "Printing a particular element in RDD -> %s" % (elem) ----------------------------------------broadcast.py--------------------------------------
Command
$SPARK_HOME/bin/spark-submit broadcast.py
Output
Stored data -> [
  ‘shreyash’, Â
  ‘rishabh’,
  ‘shubham’,
  ‘Sagar’,
  ‘Mehul’
]
Printing a particular element in RDD -> hadoop
Accumulators – Pyspark
For aggregating the information through associative and commutative operations, Accumulator variables are used. As an example, for a sum operation or counters (in MapReduce), we can use an accumulator. In addition, we can use Accumulators in any Spark APIs.
For PySpark, following code block has the details of an Accumulator class:
class pyspark.Accumulator(aid, value, accum_param)
Here is an example, it also has an attribute called value as same as the broadcast variable, this attribute also stores the data and then it is used to return an accumulator value. However, Â only in a driver program, it is usable.
So, an accumulator variable is used by multiple workers and returns an accumulated value, in this example.
----------------------------------------accumulator.py------------------------------------ from pyspark import SparkContext sc = SparkContext("local", "Accumulator app") num = sc.accumulator(1) def f(x): global num num+=x rdd = sc.parallelize([2,3,4,5]) rdd.foreach(f) final = num.value print "Accumulated value is -> %i" % (final) ----------------------------------------accumulator.py------------------------------------
Command
$SPARK_HOME/bin/spark-submit accumulator.py
Output
The accumulated value is 15.
So, this was all about PySpark Broadcast and Accumulator. Hope you like our explanation.
Conclusion
Hence, we have seen the concept of PySpark Broadcast and Accumulator along with their examples. Hence, it will definitely help you to understand it well. Still, if any doubt, ask in the comment tab.
Your 15 seconds will encourage us to work even harder
Please share your happy experience on Google
Accumulators – not an helping example, since all code runs on the driver. I hope for a real world example.
Output for broadcast example is wrong. It will display shubham not hadoop. As hadoop nowhere there in the list