Free Online Certification Courses – Learn Today. Lead Tomorrow. › Forums › Apache Spark › Explain coalesce() operation.
- This topic has 1 reply, 1 voice, and was last updated 5 years, 7 months ago by DataFlair Team.
-
AuthorPosts
-
-
September 20, 2018 at 2:29 pm #5172DataFlair TeamSpectator
Explain coalesce() operation.
-
September 20, 2018 at 2:29 pm #5175DataFlair TeamSpectator
It is a transformation.
> It’s in a package org.apache.spark.rdd.ShuffledRDD
def coalesce(numPartitions: Int, shuffle: Boolean = false, partitionCoalescer: Option[PartitionCoalescer] = Option.empty)(implicit ord: Ordering[(K, C)] = null): RDD[(K, C)]Return a new RDD that is reduced into numPartitions partitions.
This results in a narrow dependency, e.g. if you go from 1000 partitions to 100 partitions, there will not be a shuffle, instead, each of the 100 new partitions will claim 10 of the current partitions.
However, if you’re doing a drastic coalesce, e.g. to numPartitions = 1, this may result in your computation taking place on fewer nodes than you like (e.g. one node in the case of numPartitions = 1). To avoid this, you can pass shuffle = true. This will add a shuffle step but means the current upstream partitions will be executed in parallel (per whatever the current partitioning is).
Note: With shuffle = true, you can actually coalesce to a larger number of partitions. This is useful if you have a small number of partitions, say 100, potentially with a few partitions being abnormally large. Calling coalesce(1000, shuffle = true) will result in 1000 partitions with the data distributed using a hash partitioner.
From :
http://data-flair.training/blogs/rdd-transformations-actions-apis-apache-spark/#214_CoalesceIt changes a number of the partition where data is stored. It combines original partitions to a new number of partitions, so it reduces the number of partitions. It is an optimized version of repartition that allows data movement, but only if you are decreasing the number of RDD partitions. It runs operations more efficiently after filtering large datasets.
Example :
val myrdd1 = sc.parallelize(1 to 1000, 15) myrdd1.partitions.length val myrdd2 = myrdd1.coalesce(5,false) myrdd2.partitions.length Int = 5
Output :
Int = 15
Int = 5
-
-
AuthorPosts
- You must be logged in to reply to this topic.