This topic contains 1 reply, has 1 voice, and was last updated by  dfbdteam5 10 months ago.

Viewing 2 posts - 1 through 2 (of 2 total)
  • Author
    Posts
  • #5172

    dfbdteam5
    Moderator

    Explain coalesce() operation.

    #5175

    dfbdteam5
    Moderator

    It is a transformation.
    > It’s in a package org.apache.spark.rdd.ShuffledRDD


    def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[(K, C)] = null): RDD[(K, C)]

    Return a new RDD that is reduced into numPartitions partitions.

    This results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead, each of the 100 new partitions will claim 10 of the current partitions.

    However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).

    Note: With shuffle = true, you can actually coalesce to a larger number of partitions. This is useful if you have a small number of partitions, say 100, potentially with a few partitions being abnormally large. Calling coalesce(1000, shuffle = true) will result in 1000 partitions with the data distributed using a hash partitioner.

    From :
    http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#214_Coalesce

    It changes a number of the partition where data is stored. It combines original partitions to a new number of partitions, so it reduces the number of partitions. It is an optimized version of repartition that allows data movement, but only if you are decreasing the number of RDD partitions. It runs operations more efficiently after filtering large datasets.

    Example :

    val myrdd1 = sc.parallelize(1 to 1000, 15)
    myrdd1.partitions.length
    val myrdd2 = myrdd1.coalesce(5,false)
    myrdd2.partitions.length
    Int = 5


    Output :

    Int = 15
    Int = 5

Viewing 2 posts - 1 through 2 (of 2 total)

You must be logged in to reply to this topic.