HCatalog InputOutput Interface | HCatalog MapReduce Integration

Boost your career with Free Big Data Courses!!

In our last HCatalog Tutorial, we discussed HCatalog Applications. Today, we will see Hcatalog InputOutput Interfaces. In this HCatalog InputOutputtutorial, we will discuss the input and output formats of HCatalog. Moreover, we will learn HCatalog Mapreduce Integration.

In this HCatalog tutorial, we will see HCatalog Mapreduce examples, filters, and authentication. Basically, to read data from HDFS and write the resultant data into HDFS after processing especially using MapReduce job we use HCatalog InputOutput Format interfaces. 

So, let’s start HCatalog InputOutput Interface,

HCatInputFormat

In order to read data from HCatalog-managed tables, we can use the HCatInputFormat along with MapReduce jobs.

In addition, for reading data, HCatInputFormat exposes a Hadoop 0.20 MapReduce API as if it had been published to a table.

a. API of HCatInputFormat

HCatInputFormat APIs are:

  1. setInput
  2. setOutputSchema
  3. getTableSchema

At very first, if HCatInputFormat.setInput has not been called instantiate an InputJobInfo, in order to use HCatInputFormat to read data, afterward call setInput with the InputJobInfo.

Further, to specify the output fields, we can use the setOutputSchema method to include a projection schema. So, all the columns in the table will be returned, if a schema is not specified.

Moreover, to determine the table schema for a specified input table we can use the getTableSchema method.

/**
  * Set the input to use for the Job. This queries the metadata server with
  * the specified partition predicates, gets the matching partitions, puts
  * the information in the conf object. The inputInfo object is updated with
  * information needed in the client context
  * @param job the job object
  * @param inputJobInfo the input info for the table to read
  * @throws IOException the exception in communicating with the metadata server
  */
 public static void setInput(Job job,
     InputJobInfo inputJobInfo) throws IOException;
/**
 * by HCatInputFormat, set the schema for the HCatRecord data returned.
 * @param job the job object
 * the schema to use as the consolidated schema @param hcatSchema
 */
public static void setOutputSchema(Job job,HCatSchema hcatSchema)
  throws IOException;  
/**
  * for the table specified in the HCatInputFormat.setInput, get the HCatTable schema
  * then call on the specified job context. it is available only after
  * HCatInputFormat.setInput has been called for a JobContext.
  * @param context the context
  * @return the table schema
  * if HCatInputFormat.setInput has not been called @throws IOException
  *                   for the current context
  */
 public static HCatSchema getTableSchema(JobContext context)
   throws IOException;

HCatOutputFormat

Now, to write data to HCatalog-managed tables we use HCatOutputFormat with MapReduce jobs.
In addition, for writing data to a table, HCatOutputFormat exposes a Hadoop 0.20 MapReduce API.

So, basically, we can use the default OutputFormat which is configured for the table and then after the job completes new partition is published to the table at the time a MapReduce job uses HCatOutputFormat to write output.

a. API of HCatOutputFormat

HCatOutputFormat APIs are:

  1. setOutput
  2. setSchema
  3. getTableSchema

It is must that setOutput is the first call on HCatOutputFormat, because any call other than that will throw an exception, which will say that the output format is not initialized. Moreover, by the setSchema method, the schema for the data being written out is specified.

Also, we can use HCatOutputFormat.getTableSchema() to get the table schema and then pass that along to setSchema(), if our data has the same schema as the table schema.

/**
  * to write for the job, set the information about the output. This queries the metadata
  * server to find the StorageHandler to use for the table. It throws an error if the
  * partition is already published.
  * @param job the job object
  * @param outputJobInfo the table output information for the job
  * @throws IOException the exception in communicating with the metadata server
  */
 @SuppressWarnings("unchecked")
 public static void setOutput(Job job, OutputJobInfo outputJobInfo) throws IOException;
/**
  * Set the schema for the data being written out to the partition. The
  * table schema is used by default for the partition if this is not called.
  * @param job the job object
  * @param schema the schema for the data
  * @throws IOException
  */
 public static void setSchema(final Job job, final HCatSchema schema) throws IOException; 
/**
  * Get the table schema for the table specified in the HCatOutputFormat.setOutput call
  * on the specified job context.
  * @param context the context
  * @return the table schema
  * @throws IOException if HCatOutputFormat.setOutput has not been called
  *                   for the passed context
  */
 public static HCatSchema getTableSchema(JobContext context) throws IOException;

HCatalog InputOutput – HCatRecord

Basically, for storing values in HCatalog tables, HCatRecord is supported type in HCatalog InputOutput Interface.

In addition, for different fields in HCatRecord, the types in an HCatalog table schema determine the types of objects returned. Now, here you can see the mappings between Java classes for MapReduce programs as well as HCatalog data types:

1. HCatalog data types- TINYINT
Java classes for MapReduce- java.lang.Byte
Values- -128 to 127
2. HCatalog data types-SMALLINT
Java classes for MapReduce-  java.lang.Short
Values- -(2^15) to (2^15)-1, which is -32,768 to 32,767
3. HCatalog data types- INT
Java classes for MapReduce- java.lang.Integer
Values- -(2^31) to (2^31)-1, which is -2,147,483,648 to 2,147,483,647
4. HCatalog data types- BIGINT
Java classes for MapReduce- java.lang.Long
Values- -(2^63) to (2^63)-1, which is -9,223,372,036,854,775,808 to 9,223,372,036,854,775,807
5. HCatalog data types- BOOLEAN
Java classes for MapReduce- java.lang.Boolean
Values- true or fals
6. HCatalog data types- FLOAT
Java classes for MapReduce- java.lang.Float
Values- single-precision floating-point value
7.  HCatalog data types- DOUBLE
Java classes for MapReduce- java.lang.Double
Values- double-precision floating-point value
8.  HCatalog data types- DECIMAL
Java classes for MapReduce- java.math.BigDecimal
Values-exact floating-point value with 38-digit precision
9. HCatalog data types- BINARY
Java classes for MapReduce- byte []
Values-Binary Data
10. HCatalog data types-STRING
Java classes for MapReduce- java.lang.String
Values- character string
11. HCatalog data types- STRUCT
Java classes for MapReduce- java.util.List
Values- structured data
12. HCatalog data types- ARRAY
Java classes for MapReduce- java.util.List
Values- values of one data type
13. HCatalog data types- MAP
Java classes for MapReduce- java.util.Map
Values- key-value pairs

Running MapReduce With HCatalog

For Running MapReduce with HCatalog, our MapReduce program must know where the Thrift server is. So, the best way to do this is that we can pass the location as an argument to our Java program. Then we need to pass the Hive as well as HCatalog jars to MapReduce as well, with the -libjars argument.

export HADOOP_HOME=<path_to_hadoop_install>
export HCAT_HOME=<path_to_hcat_install>
export HIVE_HOME=<path_to_hive_install>
export LIB_JARS=$HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar,
$HIVE_HOME/lib/hive-metastore-0.10.0.jar,
$HIVE_HOME/lib/libthrift-0.7.0.jar,
$HIVE_HOME/lib/hive-exec-0.10.0.jar,
$HIVE_HOME/lib/libfb303-0.7.0.jar,
$HIVE_HOME/lib/jdo2-api-2.3-ec.jar,
$HIVE_HOME/lib/slf4j-api-1.6.1.jar
export HADOOP_CLASSPATH=$HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar:
$HIVE_HOME/lib/hive-metastore-0.10.0.jar:
$HIVE_HOME/lib/libthrift-0.7.0.jar:
$HIVE_HOME/lib/hive-exec-0.10.0.jar:
$HIVE_HOME/lib/libfb303-0.7.0.jar:
$HIVE_HOME/lib/jdo2-api-2.3-ec.jar:
$HIVE_HOME/conf:$HADOOP_HOME/conf:
$HIVE_HOME/lib/slf4j-api-1.6.1.jar
$HADOOP_HOME/bin/hadoop --config $HADOOP_HOME/conf jar <path_to_jar>
<main_class> -libjars $LIB_JARS <program_arguments>

Although Hadoop will ship libjars every time we run the MapReduce program, so, that is not efficient and as a result, it also may deplete the Hadoop distributed cache.

Rather than that, by using HDFS locations we can optimize to ship libjars. However, Hadoop will reuse the entries in the distributed cache, by doing this.

bin/hadoop fs -copyFromLocal $HCAT_HOME/share/hcatalog/hcatalog-core-0.5.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-metastore-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libthrift-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/hive-exec-0.10.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/libfb303-0.7.0.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/jdo2-api-2.3-ec.jar /tmp
bin/hadoop fs -copyFromLocal $HIVE_HOME/lib/slf4j-api-1.6.1.jar /tmp
export LIB_JARS=hdfs:///tmp/hcatalog-core-0.5.0.jar,
hdfs:///tmp/hive-metastore-0.10.0.jar,
hdfs:///tmp/libthrift-0.7.0.jar,
hdfs:///tmp/hive-exec-0.10.0.jar,
hdfs:///tmp/libfb303-0.7.0.jar,
hdfs:///tmp/jdo2-api-2.3-ec.jar,
hdfs:///tmp/slf4j-api-1.6.1.jar
# (Other statements remain the same.)

HCatalog InputOuput – Authentication

Make sure you have run “kinit <username>@FOO.COM” to get a Kerberos ticket and to be able to authenticate to the HCatalog server if a failure results in a message like “2010-11-03 16:17:28,225 WARN hive.metastore … – Unable to connect metastore with URI thrift://…” in /tmp/<username>/hive.log.

Filter Operators

There can be  operators in a filter like’and’, ‘or’, ‘like’, ‘()’, ‘=’, ‘<>’ (not equal), ‘<‘, ‘>’, ‘<=’ and ‘>=’.
For example:

ds > "20110924"
ds < "20110925"
ds <= "20110925" and ds >= "20110924"

HCatalog InputOutput- Scan Filter

let’s suppose we have a web_logs table that is partitioned by the column “ds”. hence, we could select one partition of the table by changing:

HCatInputFormat.setInput(job, InputJobInfo.create(dbName, inputTableName, null));
to
HCatInputFormat.setInput(job,
                        InputJobInfo.create(dbName, inputTableName, “ds=\”20110924\””));
This filter must reference only partition columns. Values from other columns will cause the job to fail.

HCatalog InputOutput- Write Filter

Moreover, we can change the above example to have a Map of key-value pairs that describe all of the partition keys and values for that partition, to write to a single partition. In our example web_logs table, there is only one partition column (ds), so our Map will have only one entry. Change

HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, null));
to
Map partitions = new HashMap<String, String>(1);
partitions.put("ds", "20110924");
HCatOutputFormat.setOutput(job, OutputJobInfo.create(dbName, outputTableName, partitions));

Also, we can leave the Map null, to write multiple partitions simultaneously, still, in the data, we are writing, all of the partitioning columns must be present.

So, this was all about HCatalog InputOutPut and HCatalog- MapReduce Integration. Hope you like our explanation

Conclusion

Hence, we have seen the concept of HCatalog InputOutput Format interfaces in detail. Along with this, we discussed how to run MapReduce with HCatalog. Moreover, we also see HCatInput and HCatOutput Formats in HCatalog InputOutput tutorial.

Also, we discussed Hcatalog Mapreduce example. However, if any doubt occurs regarding HCatalog InputOutput, feel free to ask in the comment section.

Did we exceed your expectations?
If Yes, share your valuable feedback on Google

follow dataflair on YouTube

Leave a Reply

Your email address will not be published. Required fields are marked *