How to write a custom partitioner for a Hadoop MapReduce job?

Free Online Certification Courses – Learn Today. Lead Tomorrow. Forums Apache Hadoop How to write a custom partitioner for a Hadoop MapReduce job?

Viewing 3 reply threads
  • Author
    Posts
    • #5145
      DataFlair TeamDataFlair Team
      Spectator

      How to write a custom partitioner for a MapReduce job?

    • #5146
      DataFlair TeamDataFlair Team
      Spectator

      Firstly lets understand why we need partitioning in MapReduce Framework:

      As we know that Map task take inputsplit as input and produces key,value pair as output. This key-value pairs are then feed to reduce task. But before the reduce phase , one more phase know as partitioning phase runs. This phase partition the map output based on key and keeps the record of the same key into the same partitions.

      Lets take an example of Employee Analysis:
      We want to find the highest paid Female and male employee from the data set.
      Data Set:
      Name Age Dept Gender Salary
      A 23 IT Male 35
      B 35 Finance Female 50
      C 29 IT Male 40

      Considering two map tasks gives following <k,v> as output:

      Map1 o/p:

      Key Value
      Gender Value

      Male A 23 IT Male 35
      Female B 35 Finance Female 50

      Map2 o/p
      Key Value
      Gender Value

      Male C 29 IT Male 40

      So if you observe the output from two map tasks you should have noticed that the gender ‘Male’ is in outputs of both map tasks and it will be processed twice if sent to two different reducers So here partitioning plays the role.

      Before it sends outputs to reducers it will partition the intermediate key value pairs based on key and send the same key to the same partition.

      How the number of partitions are decided ??

      Hadoop decides it at the time when the map reduce job starts that how may partitions will be there which is controlled by the JobConf.setNumReduceTasks() method, suppose if decide 5 reduce tasks, the 5 partitions will be there and must be filled.

      So, lets see by default how it happens.

      By default the partitioner implementation is called HashPartitioner. It uses the hashCode() method of the key objects modulo the number of partitions total to determine which partition to send a given (key, value) pair to.

      Partitioner provides the getPartition() method that you can implement yourself if you want to declare the custom partition for your job.

      The getPartition() method takes a <K,V> pair and the number of partitions to split the data, a number in the range [0, numPartitions) must be returned by this method,

      So , lets understand how to implement custom partitioner:

      Our custom partitioner will send all <K,V> by Gender Male to one partition and <K,V> with Female to other partition .

      here is the code:

      public static class MyPartitioner extends Partitioner<Text,Text>{
      public int getPartition(Text key, Text value, int numReduceTasks){
      if(numReduceTasks==0)
      return 0;
      if(key.equals(new Text(“Male”)) )
      return 0;
      if(key.equals(new Text(“Female”)))
      return 1;
      }
      }

      Here , the getPartition() will return 0 if the key is Male and 1 if key is Female.

      We can check our output in two files:
      part-r-0000 and part-r-0001.

    • #5148
      DataFlair TeamDataFlair Team
      Spectator

      Custom Partitioners are written in a MapReduce job whenever there is a requirement to divide the data set more than two times. Custom Partitioner is a process that allows you to store the results in different reducers, based on the user condition. By setting a partitioner to partition by the key, we can guarantee that, records for the same key will go to the same reducer. A partitioner ensures that only one reducer receives all the records for that particular key. For example, if there is a requirement to find the eldest person, from each flight of an Airlines company, we must use a Custom Partitioner.

      First, we need to analyze the data set as to what are the fields needed to achieve this task. We will first write a map function to generate (Flight number like JET123, JET545) as the key and rest of the passenger detail will be the values (like name, age, gender, DOB, etc). We will pass this <key, value> pair to the Reducer. Before that, it must be passed to the Custom Partitioner, so that we can write our logic to implement the requirement.

      Custom Partitioner can be written by overriding the getPartition method. The getPartition method takes two parameters which is the key and value.

      For the example above, to find the eldest person in each flight of an Airlines company, we can write the Custom Partitioner as below:

      public class FlightPartitioner extends Partitioner<Text, Text>
      {
      public static int getPartition(Text key, Text value, int numReduceTasks)
      {
      if(numReduceTasks==0) { return 0; }
      if(key.equals(new Text(“JET123”)) { return 0; }
      else if(key.equals(new Text(“JET545”)) { return 1%numReduceTasks; }
      else if(key.equals(new Text(“JET789”)) { return 2%numReduceTasks; }
      else { return 3%numReduceTasks; }
      }
      }

      The above code creates Partitioners based on the number of reduce tasks specified in the driver class using job.setNumReduceTasks(int). For our example, since there are four flights which needs to be considered, we need to have four reduce tasks. Accordingly, four partitioners needs to be specified as in the above code. The number of reduce tasks can be set in the driver class as:

      job.setNumReduceTasks(4)

      .
      In the Reducer, we just need to collect the <key,value> pairs from the Custom Partitioner and write a logic to find the highest age in each flight and print out the result. The number of output files created will be equal to the number of reducers. In our case, there will be four output files in which the final output will be written, which are part-r-0000, part-r-0001, part-r-0002, part-r-0003.

    • #5149
      DataFlair TeamDataFlair Team
      Spectator

      Partitioning phase takes place after map phase and before reduce phase.Partitioner class takes intermediate values produced by map phase and produce output which will be used as input to reduce phase.Number of partitions is equals to number of reducers.Default partitioner use hash code as key value to partition the data but when we want to partition the data according to our logic then we have override the method getPartition(Text key, Text value, int numReduceTasks) in Partitioner class..
      This method will return partition number and all the data corresponding to one partition will go to same reducer.

      for example: for input format name<tab>age<tab>gender<tab>score below is the data:

      Alice<tab>23<tab>female<tab>45
      Bob<tab>34<tab>male<tab>89
      Chris<tab>67<tab>male<tab>97
      Kristine<tab>38<tab>female<tab>53
      Connor<tab>25<tab>male<tab>27
      Daniel<tab>78<tab>male<tab>95

      Suppose if we want to find the maximum score for each gender in the age groups 00-25,25- 50,50-75,75-100 then we will partition data according to some logic into four groups and each group data will processed on same reducer.

Viewing 3 reply threads
  • You must be logged in to reply to this topic.