Explain coalesce() operation.

Viewing 1 reply thread
  • Author
    Posts
    • #5172
      DataFlair TeamDataFlair Team
      Spectator

      Explain coalesce() operation.

    • #5175
      DataFlair TeamDataFlair Team
      Spectator

      It is a transformation.
      > It’s in a package org.apache.spark.rdd.ShuffledRDD


      def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[(K, C)] = null): RDD[(K, C)]

      Return a new RDD that is reduced into numPartitions partitions.

      This results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead, each of the 100 new partitions will claim 10 of the current partitions.

      However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

      Note: With shuffle = true, you can actually coalesce to a larger number of partitions. This is useful if you have a small number of partitions, say 100, potentially with a few partitions being abnormally large. Calling coalesce(1000, shuffle = true) will result in 1000 partitions with the data distributed using a hash partitioner.

      From :
      http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#214_Coalesce

      It changes a number of the partition where data is stored. It combines original partitions to a new number of partitions, so it reduces the number of partitions. It is an optimized version of repartition that allows data movement, but only if you are decreasing the number of RDD partitions. It runs operations more efficiently after filtering large datasets.

      Example :

      val myrdd1 = sc.parallelize(1 to 1000, 15)
      myrdd1.partitions.length
      val myrdd2 = myrdd1.coalesce(5,false)
      myrdd2.partitions.length
      Int = 5


      Output :

      Int = 15
      Int = 5

Viewing 1 reply thread
  • You must be logged in to reply to this topic.