Bucketing in Hive – Creation of Bucketed Table in Hive
In Apache Hive, for decomposing table data sets into more manageable parts, it uses Hive Bucketing concept. However, there are much more to learn about Bucketing in Hive.
So, in this article, we will cover the whole concept of Bucketing in Hive. It includes one of the major questions, that why even we need Bucketing in Hive after Hive Partitioning Concept.
At last, we will discuss Features of Bucketing in Hive, Advantages of Bucketing in Hive, Limitations of Bucketing in Hive, Example Use Case of Bucketing in Hive with some Hive Bucketing with examples.
What is Bucketing in HiveÂ
Basically, for decomposing table data sets into more manageable parts, Apache Hive offers another technique. That technique is what we call Bucketing in Hive.
Why Bucketing?
Basically, the concept of Hive Partitioning provides a way of segregating hive table data into multiple files/directories. However, it only gives effective results in few scenarios. Such as:
– When there is the limited number of partitions.
– Or, while partitions are of comparatively equal size.
Although, it is not possible in all scenarios. For example when are partitioning our tables based geographic locations like country. Hence, some bigger countries will have large partitions (ex: 4-5 countries itself contributing 70-80% of total data).
While small countries data will create small partitions (remaining all countries in the world may contribute to just 20-30 % of total data). Hence, at that time Partitioning will not be ideal.
Then, to solve that problem of over partitioning, Hive offers Bucketing concept. It is another effective technique for decomposing table data sets into more manageable parts.
Features of Bucketing in Hive
Basically, this concept is based on hashing function on the bucketed column. Along with mod (by the total number of buckets).
i. Where the hash_function depends on the type of the bucketing column.
ii. However, the Records with the same bucketed column will always be stored in the same bucket.
iii. Moreover, Â to divide the table into buckets we use CLUSTERED BY clause.
iv. Generally, in the table directory, each bucket is just a file, and Bucket numbering is 1-based.
v. Along with Partitioning on Hive tables bucketing can be done and even without partitioning.
vi. Moreover, Bucketed tables will create almost equally distributed data file parts.
Advantages of Bucketing in Hive
i. On comparing with non-bucketed tables, Bucketed tables offer the efficient sampling.
ii. Map-side joins will be faster on bucketed tables than non-bucketed tables, as the data files are equal sized parts.
iii. Here also bucketed tables offer faster query responses than non-bucketed tables as compared to  Similar to partitioning.
iv. This concept offers the flexibility to keep the records in each bucket to be sorted by one or more columns.
v. Since the join of each bucket becomes an efficient merge-sort, this makes map-side joins even more efficient.
Limitations of Bucketing in Hive
i. However, it doesn’t ensure that the table is properly populated.
ii. So, we need to handle Data Loading into buckets by our-self.
Example Use Case for Bucketing in Hive
To understand the remaining features of Hive Bucketing let’s see an example Use case, by creating buckets for the sample user records file for testing in this post
first_name,last_name, address, country, city, state, post,phone1,phone2, email, web Rebbecca, Didio, 171 E 24th St, AU, Leith, TA, 7315, 03-8174-9123, 0458-665-290, [email protected],http://www.brandtjonathanfesq.com.au
Hence, let’s create the table partitioned by country and bucketed by state and sorted in ascending order of cities.
a. Creation of Bucketed Tables
However, with the help of CLUSTERED BY clause and optional SORTED BY clause in CREATE TABLE statement we can create bucketed tables. Moreover, we can create a bucketed_user table with above-given requirement with the help of the below HiveQL.
CREATE TABLE bucketed_user(
       firstname VARCHAR(64),
       lastname  VARCHAR(64),
       address   STRING,
       city  VARCHAR(64),
       state  VARCHAR(64),
       post      STRING,
       phone1    VARCHAR(64),
       phone2    STRING,
       email     STRING,
       web       STRING
       )
       COMMENT ‘A bucketed sorted user table’
       PARTITIONED BY (country VARCHAR(64))
       CLUSTERED BY (state) SORTED BY (city) INTO 32 BUCKETS
       STORED AS SEQUENCEFILE;
CREATE TABLE bucketed_user( firstname VARCHAR(64), lastname VARCHAR(64), address STRING, city VARCHAR(64), state VARCHAR(64), post STRING, phone1 VARCHAR(64), phone2 STRING, email STRING, web STRING ) COMMENT 'A bucketed sorted user table' PARTITIONED BY (country VARCHAR(64)) CLUSTERED BY (state) SORTED BY (city) INTO 32 BUCKETS STORED AS SEQUENCEFILE;
As shown in above code for state and city columns Bucketed columns are included in the table definition, Unlike partitioned columns. Especially, which are not included in table columns definition.
b. Inserting data Into Bucketed Tables
However, we can not directly load bucketed tables with LOAD DATA (LOCAL) INPATH command, similar to partitioned tables. Instead to populate the bucketed tables we need to use INSERT OVERWRITE TABLE … SELECT …FROM clause from another table.
Hence, we will create one temporary table in hive with all the columns in input file from that table we will copy into our target bucketed table for this.
Moreover, let’s suppose we have created the temp_user temporary table. Further, for populating the bucketed table with the temp_user table below is the HiveQL.
In addition, we need to set the property hive.enforce.bucketing = true, so that Hive knows to create the number of buckets declared in the table definition to populate the bucketed table.
set hive.enforce.bucketing = true; INSERT OVERWRITE TABLE bucketed_user PARTITION (country) SELECT firstname, lastname, address , city, state, post, phone1, phone2, email, web, country FROM temp_user; set hive.enforce.bucketing = true; INSERT OVERWRITE TABLE bucketed_user PARTITION (country) SELECT firstname, lastname, address, city, state, post, phone1, phone2, email, web, country FROM temp_user;
Some points are important to Note:
i. However, in partitioning the property hive.enforce.bucketing = true is similar to hive.exec.dynamic.partition=true property. So, we can enable dynamic bucketing while loading data into hive table By setting this property.
ii. Moreover, it will automatically set the number of reduce tasks to be equal to the number of buckets mentioned in the table definition (for example 32 in our case). Further, it automatically selects the clustered by column from table definition.
iii. Also, we have to manually convey the same information to Hive that, number of reduce tasks to be run (for example in our case, by using set mapred.reduce.tasks=32) and CLUSTER BY (state) and SORT BY (city) clause in the above INSERT …Statement at the end since we do not set this property in Hive Session.
c. Solution For Example Use Case
Along with script required for temporary hive table creation, Below is the combined HiveQL. However,  let’s save this HiveQL into bucketed_user_creation.hql. Also, save the input file provided for example use case section into the user_table.txt file in home directory.
set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=1000; set hive.enforce.bucketing = true; DROP TABLE IF EXISTS bucketed_user; CREATE TEMPORARY TABLE temp_user( firstname VARCHAR(64), lastname VARCHAR(64), address STRING, country VARCHAR(64), city VARCHAR(64), state VARCHAR(64), post STRING, phone1 VARCHAR(64), phone2 STRING, email STRING, web STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE; LOAD DATA LOCAL INPATH '/home/user/user_table.txt' INTO TABLE temp_user; CREATE TABLE bucketed_user( firstname VARCHAR(64), lastname VARCHAR(64), address STRING, city VARCHAR(64), state VARCHAR(64), post STRING, phone1 VARCHAR(64), phone2 STRING, email STRING, web STRING ) COMMENT 'A bucketed sorted user table' PARTITIONED BY (country VARCHAR(64)) CLUSTERED BY (state) SORTED BY (city) INTO 32 BUCKETS STORED AS SEQUENCEFILE;
set hive.enforce.bucketing = true; INSERT OVERWRITE TABLE bucketed_user PARTITION (country) SELECT firstname , lastname , address, city, state, post, phone1, phone2, email, web, country FROM temp_user; set hive.exec.dynamic.partition=true; set hive.exec.dynamic.partition.mode=nonstrict; set hive.exec.max.dynamic.partitions.pernode=1000; set hive.enforce.bucketing = true; DROP TABLE IF EXISTS bucketed_user; CREATE TEMPORARY TABLE temp_user( firstname VARCHAR(64), lastname VARCHAR(64), address STRING, country VARCHAR(64), city VARCHAR(64), state VARCHAR(64), post STRING, phone1 VARCHAR(64), phone2 STRING, email STRING, web STRING ) ROW FORMAT DELIMITED FIELDS TERMINATED BY ',' LINES TERMINATED BY '\n' STORED AS TEXTFILE; LOAD DATA LOCAL INPATH '/home/user/user_table.txt' INTO TABLE temp_user; CREATE TABLE bucketed_user( firstname VARCHAR(64), lastname VARCHAR(64), address STRING, city VARCHAR(64), state VARCHAR(64), post STRING, phone1 VARCHAR(64), phone2 STRING, email STRING, web STRING ) COMMENT 'A bucketed sorted user table' PARTITIONED BY (country VARCHAR(64)) CLUSTERED BY (state) SORTED BY (city) INTO 32 BUCKETS STORED AS SEQUENCEFILE; set hive.enforce.bucketing = true; INSERT OVERWRITE TABLE bucketed_user PARTITION (country) SELECT firstname, lastname, address, city, state, post, phone1, phone2, email, webweb, country FROM temp_user;
d. Output
Moreover, in hive lets execute this script. Also, see the output of the above script execution below.
user@tri03ws-386:~$ hive -f bucketed_user_creation.hql
Logging initialized using configuration in jar:file:/home/user/bigdata/apache-hive-0.14.0-bin/lib/hive-common-0.14.0.jar!/hive-log4j.properties
OK
Time taken: 12.144 seconds
OK
Time taken: 0.146 seconds
Loading data to table default.temp_user
Table default.temp_user stats: [numFiles=1, totalSize=283212]
OK
Time taken: 0.21 seconds
OK
Time taken: 0.5 seconds
Query ID = user_20141222163030_3f024f2b-e682-4b08-b25c-7775d7af4134
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 32
In order to change the average load for a reducer (in bytes):
 set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
 set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
 set mapreduce.job.reduces=<number>
Starting Job = job_1419243806076_0002, Tracking URL = http://tri03ws-
386:8088/proxy/application_1419243806076_0002/
Kill Command = /home/user/bigdata/hadoop-2.6.0/bin/hadoop job  -kill job_1419243806076_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 32
2014-12-22 16:30:36,164 Stage-1 map = 0%, Â reduce = 0%
2014-12-22 16:31:09,770 Stage-1 map = 100%, Â reduce = 0%, Cumulative CPU 1.66 sec
2014-12-22 16:32:10,368 Stage-1 map = 100%, Â reduce = 0%, Cumulative CPU 1.66 sec
2014-12-22 16:32:28,037 Stage-1 map = 100%, Â reduce = 13%, Cumulative CPU 3.19 sec
2014-12-22 16:32:36,480 Stage-1 map = 100%, Â reduce = 14%, Cumulative CPU 7.06 sec
2014-12-22 16:32:40,317 Stage-1 map = 100%, Â reduce = 19%, Cumulative CPU 7.63 sec
2014-12-22 16:33:40,691 Stage-1 map = 100%, Â reduce = 19%, Cumulative CPU 12.28 sec
2014-12-22 16:33:54,846 Stage-1 map = 100%, Â reduce = 31%, Cumulative CPU 17.45 sec
2014-12-22 16:33:58,642 Stage-1 map = 100%, Â reduce = 38%, Cumulative CPU 21.69 sec
2014-12-22 16:34:52,731 Stage-1 map = 100%, Â reduce = 56%, Cumulative CPU 32.01 sec
2014-12-22 16:35:21,369 Stage-1 map = 100%, Â reduce = 63%, Cumulative CPU 35.08 sec
2014-12-22 16:35:22,493 Stage-1 map = 100%, Â reduce = 75%, Cumulative CPU 41.45 sec
2014-12-22 16:35:53,559 Stage-1 map = 100%, Â reduce = 94%, Cumulative CPU 51.14 sec
2014-12-22 16:36:14,301 Stage-1 map = 100%, Â reduce = 100%, Cumulative CPU 54.13 sec
MapReduce Total cumulative CPU time: 54 seconds 130 msec
Ended Job = job_1419243806076_0002
Loading data to table default.bucketed_user partition (country=null)
Time taken for load dynamic partitions : 2421
Loading partition {country=AU}
Loading partition {country=country}
Loading partition {country=US}
Loading partition {country=UK}
Loading partition {country=CA}
Time taken for adding to write entity : 17
Partition default.bucketed_user{country=AU} stats: [numFiles=32, numRows=500, totalSize=78268, rawDataSize=67936]
Partition default.bucketed_user{country=CA} stats: [numFiles=32, numRows=500, totalSize=76564, rawDataSize=66278]
Partition default.bucketed_user{country=UK} stats: [numFiles=32, numRows=500, totalSize=85604, rawDataSize=75292]
Partition default.bucketed_user{country=US} stats: [numFiles=32, numRows=500, totalSize=75468, rawDataSize=65383]
Partition default.bucketed_user{country=country} stats: [numFiles=32, numRows=1, totalSize=2865, rawDataSize=68]
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1  Reduce: 32 Cumulative CPU: 54.13 sec   HDFS Read: 283505 HDFS Write: 316247 SUCCESS
Total MapReduce CPU Time Spent: 54 seconds 130 msec
OK
Time taken: 396.486 seconds
user@tri03ws-386:~$
user@tri03ws-386:~$ hive -f bucketed_user_creation.hql
Logging initialized using configuration in jar:file:/home/user/bigdata/apache-hive-0.14.0-bin/lib/hive-common-0.14.0.jar!/hive-log4j.properties
OK
Time taken: 12.144 seconds
OK
Time taken: 0.146 seconds
Loading data to table default.temp_user
Table default.temp_user stats: [numFiles=1, totalSize=283212]
OK
Time taken: 0.21 seconds
OK
Time taken: 0.5 seconds
Query ID = user_20141222163030_3f024f2b-e682-4b08-b25c-7775d7af4134
Total jobs = 1
Launching Job 1 out of 1
Number of reduce tasks determined at compile time: 32
In order to change the average load for a reducer (in bytes):
 set hive.exec.reducers.bytes.per.reducer=<number>
In order to limit the maximum number of reducers:
 set hive.exec.reducers.max=<number>
In order to set a constant number of reducers:
 set mapreduce.job.reduces=<number>
Starting Job = job_1419243806076_0002, Tracking URL = http://tri03ws-386:8088/proxy/application_1419243806076_0002/
Kill Command = /home/user/bigdata/hadoop-2.6.0/bin/hadoop job  -kill job_1419243806076_0002
Hadoop job information for Stage-1: number of mappers: 1; number of reducers: 32
2014-12-22 16:30:36,164 Stage-1 map = 0%, Â reduce = 0%
2014-12-22 16:31:09,770 Stage-1 map = 100%, Â reduce = 0%, Cumulative CPU 1.66 sec
2014-12-22 16:32:10,368 Stage-1 map = 100%, Â reduce = 0%, Cumulative CPU 1.66 sec
2014-12-22 16:32:28,037 Stage-1 map = 100%, Â reduce = 13%, Cumulative CPU 3.19 sec
2014-12-22 16:32:36,480 Stage-1 map = 100%, Â reduce = 14%, Cumulative CPU 7.06 sec
2014-12-22 16:32:40,317 Stage-1 map = 100%, Â reduce = 19%, Cumulative CPU 7.63 sec
2014-12-22 16:33:40,691 Stage-1 map = 100%, Â reduce = 19%, Cumulative CPU 12.28 sec
2014-12-22 16:33:54,846 Stage-1 map = 100%, Â reduce = 31%, Cumulative CPU 17.45 sec
2014-12-22 16:33:58,642 Stage-1 map = 100%, Â reduce = 38%, Cumulative CPU 21.69 sec
2014-12-22 16:34:52,731 Stage-1 map = 100%, Â reduce = 56%, Cumulative CPU 32.01 sec
2014-12-22 16:35:21,369 Stage-1 map = 100%, Â reduce = 63%, Cumulative CPU 35.08 sec
2014-12-22 16:35:22,493 Stage-1 map = 100%, Â reduce = 75%, Cumulative CPU 41.45 sec
2014-12-22 16:35:53,559 Stage-1 map = 100%, Â reduce = 94%, Cumulative CPU 51.14 sec
2014-12-22 16:36:14,301 Stage-1 map = 100%, Â reduce = 100%, Cumulative CPU 54.13 sec
MapReduce Total cumulative CPU time: 54 seconds 130 msec
Ended Job = job_1419243806076_0002
Loading data to table default.bucketed_user partition (country=null)
Time taken for load dynamic partitions : 2421
Loading partition {country=AU}
Loading partition {country=country}
Loading partition {country=US}
Loading partition {country=UK}
Loading partition {country=CA}
Time taken for adding to write entity : 17
Partition default.bucketed_user{country=AU} stats: [numFiles=32, numRows=500, totalSize=78268, rawDataSize=67936]
Partition default.bucketed_user{country=CA} stats: [numFiles=32, numRows=500, totalSize=76564, rawDataSize=66278]
Partition default.bucketed_user{country=UK} stats: [numFiles=32, numRows=500, totalSize=85604, rawDataSize=75292]
Partition default.bucketed_user{country=US} stats: [numFiles=32, numRows=500, totalSize=75468, rawDataSize=65383]
Partition default.bucketed_user{country=country} stats: [numFiles=32, numRows=1, totalSize=2865, rawDataSize=68]
MapReduce Jobs Launched:
Stage-Stage-1: Map: 1  Reduce: 32 Cumulative CPU: 54.13 sec   HDFS Read: 283505 HDFS Write: 316247 SUCCESS
Total MapReduce CPU Time Spent: 54 seconds 130 msec
OK
Time taken: 396.486 seconds
user@tri03ws-386:~$
Hence, we have seen that MapReduce job initiated 32 reduce tasks for 32 buckets and four partitions are created by country in the above box.
Conclusion
As a result, we have seen the whole concept of Hive Bucketing. Also, it includes why even we need Hive Bucketing after Hive Partitioning Concept, Features of Bucketing in Hive, Advantages of Bucketing in Hive, Limitations of Bucketing in Hive, And Example Use Case of Bucketing in Hive.
As a result we seen Hive Bucketing Without Partition, how to decide number of buckets in hive, hive bucketing with examples, and hive insert into bucketed table.Still, if any doubt occurred feel free to ask in the comment section.
You give me 15 seconds I promise you best tutorials
Please share your happy experience on Google
How can I select particular bucket in bucketing as well as how can I select particular partition in partitioning……
how to decide the number of buckets in the hive