Explain cogroup() operation

Viewing 1 reply thread
  • Author
    Posts
    • #5439
      DataFlair Team
      Moderator

      Explain cogroup() operation

    • #5442
      DataFlair Team
      Moderator

      It’s a transformation.
      > It’s in package org.apache.spark.rdd.PairRDDFunctions

      def cogroup[W1, W2, W3](other1: RDD[(K, W1)], other2: RDD[(K, W2)], other3: RDD[(K, W3)]): RDD[(K, (Iterable[V], Iterable[W1], Iterable[W2], Iterable[W3]))]

      For each key k in this or other1 or other2 or other3, return a resulting RDD that contains a tuple with the list of values for that key in this, other1, other2 and other3.

      Example:

      val myrdd1 = sc.parallelize(List((1,"spark"),(2,"HDFS"),(3,"Hive"),(4,"Flink"),(6,"HBase")))
      val myrdd2 = sc.parallelize(List((4,"RealTime"),(5,"Kafka"),(6,"NOSQL"),(1,"stream"),(1,"MLlib")))
      val result = myrdd1.cogroup(myrdd2)
      result.collect


      Output :

      Array[(Int, (Iterable[String], Iterable[String]))] =
      Array((4,(CompactBuffer(Flink),CompactBuffer(RealTime))),
      (1,(CompactBuffer(spark),CompactBuffer(stream, MLlib))),
      (6,(CompactBuffer(HBase),CompactBuffer(NOSQL))),
      (3,(CompactBuffer(Hive),CompactBuffer())),
      (5,(CompactBuffer(),CompactBuffer(Kafka))),
      (2,(CompactBuffer(HDFS),CompactBuffer())))

      For more Transformation in RDD refer to Operations on RDD.

Viewing 1 reply thread
  • You must be logged in to reply to this topic.