Explain Accumulator in detail.

Viewing 2 reply threads
  • Author
    Posts
    • #5361
      DataFlair TeamDataFlair Team
      Spectator

      Explain Accumulator in detail.

    • #5363
      DataFlair TeamDataFlair Team
      Spectator
      • This discussion is in continuation with question, Name the two types of shared variable available in Apache Spark.

      Introduction of Accumulator :

        <li style=”list-style-type: none”>
      • Accumulator is a shared variable in Apache Spark, used to aggregating information across the cluster.
      • In other words, aggregating information / values from worker nodes back to the driver program. ( How we will see in below session)

      Why Accumulator :

      • When we use a function inside the operation like map(), filter() etc these functions can use the variables which defined outside these function scope in the driver program.
      • When we submit the task to cluster, each task running on the cluster gets a new copy of these variables and updates from these variable do not propagated back to the driver program.
      • Accumulator lowers this restriction.
        Use Cases :
      • One of the most common use of accumulator is count the events that occur during job execution for debugging purpose.
      • Meaning count the no. of blank lines from the input file, no. of bad packets from network during session, during Olympic data analysis we have to find age where we said (age != ‘NA’) in SQL query in short finding bad / corrupted records.
        Examples : 

        scala> val record = spark.read.textFile("/home/hdadmin/wc-data-blanklines.txt")
        record: org.apache.spark.sql.Dataset[String] = [value: string]</p>
        <p>scala> val emptylines = sc.accumulator(0)
        warning: there were two deprecation warnings; re-run with -deprecation for details
        emptylines: org.apache.spark.Accumulator[Int] = 0</p>
        <p>scala> val processdata = record.flatMap(x =>
        					        {
        							if(x == "")
        								 emptylines += 1
        							 x.split(" ")
        						 })</p>
        <p>processdata: org.apache.spark.sql.Dataset[String] = [value: string]
        scala> processdata.collect
        16/12/02 20:55:15 WARN SizeEstimator: Failed to check whether UseCompressedOops is set; assuming yes

        Output : 
        res0: Array[String] = Array(DataFlair, provides, training, on, cutting, edge, technologies., “”, DataFlair, is, the, leading, training, provider,, we, have, trained, 1000s, of, candidates., Training, focues, on, practical, aspects, which, industy, needs, rather, than, theoretical, knowledge., “”, DataFlair, helps, the, organizations, to, solve, BigData, Problems., “”, Javadoc, is, a, tool, for, generating, API, documentation, in, HTML, format, from, doc, comments, in, source, code., It, can, be, downloaded, only, as, part, of, the, Java, 2, SDK., To, see, documentation, generated, by, the, Javadoc, tool,, go, to, J2SE, 1.5.0, API, Documentation., “”, Javadoc, FAQ, -, This, FAQ, covers, where, to, download, the, Javadoc, tool,, how, to, find, a, list, of, known, bugs, and, feature, reque…
        scala> println(“No. of Empty Lines : ” + emptylines.value)
        No. of Empty Lines : 10

        Explanation and Conclusion of Program :

      • In above example, we create an Accumulator[Int] ’emptylines’
      • Here, we want to find the no. of blank lines during our processing.
      • After that, we applied flatMap() transformation to process our data but we want to find out no. of empty lines (blank lines) so in flatMap() function if we encounter any blank line, accumulator empty lines increase by 1 otherwise we split the line by space.
      • After that, we check the output as well as no. of blank lines.
      • We create the accumulator with the initial value in driver program, by calling sc.accumulator(0) i.e. spark Context.accumulator(initial Value) where the return type of type initalValue {org.apache.spark.Accumulator[T] where T is initalValue]
      • At the end we call the value() property on the accumulator to access its value.
      • Please note that, task(s) on worker nodes can not access the value property of accumulator so for in context of task(s), accumulator is write-only variable.
      • The value() property of accumulator is available only in the driver program.
      • We can also count the no. of blank lines with the help of transformation/actions but for that, we need an extra operation but with the help of accumulator, we can count the no. of blank lines (or events in broader terms) as we load /process our data.
    • #5366
      DataFlair TeamDataFlair Team
      Spectator

      Cares To Be Taken :

        <li style=”list-style-type: none”>
      • Transformations are lazily evaluated means unless and until some action happened on RDDcomputations inside the Transformation can not be executed.
      • Spark guarantees to update the accumulator inside actions only.
        Let us look at the practical scenario:
      • We have a cluster in which one node gets crashed and one node is too slow in both the cases, say both counting blank lines, now node which gets crashed has to recalculate the computation and for node which is too slow Spark will re-run the task on another node and takes it result if the another already finished the task.
      • Hence the same function/operation may run multiple times on the same data.
      • So now the question is blank lines value (i.e. accumulator value) update how many time?
      • Please keep in mind that, for accumulators which are used in action operation, Spark applies each task’s update only once. Hence we want to reliable result from accumulator, in that case, we must use it inside actions regardless of failure of multiple evaluations
      • If we use the accumulator in transformation, in that case, this guarantee does not exist. An accumulator update within transformation can occur more than once.

      I will put more example after some time

Viewing 2 reply threads
  • You must be logged in to reply to this topic.