What is SequenceFileInputFormat in Hadoop MapReduce?

Free Online Certification Courses – Learn Today. Lead Tomorrow. Forums Apache Hadoop What is SequenceFileInputFormat in Hadoop MapReduce?

Viewing 1 reply thread
  • Author
    Posts
    • #5393
      DataFlair TeamDataFlair Team
      Spectator

      What is a sequence file in Hadoop?
      What is SequenceFileInputFormat? For what it is used in Hadoop?

    • #5395
      DataFlair TeamDataFlair Team
      Spectator

      Hadoop Sequence files are one of the Apache Hadoop specific file formats which stores data in serialized key-value pair. Serialized in the sense: Stream of bytes

      In general, Apache Hadoop supports text files which are quite commonly used for storing the data, besides the text files it also supports binary files and one of these binary formats are called Sequence files. Hadoop Sequence file is a flat file structure which consists of serialized/binary key-value pairs. This is the same format in which the data is stored internally during the processing of the MapReduce tasks.

      Hadoop SequenceFile is used in MapReduce as input/Output formats. By default Mapper output is stored on local file system which is in Mapper node. Outputs of Maps are stored using SequenceFile. Internally Hadoop uses SequenceFile format for the Mapper which is stored in the local file system.

      What is the purpose of sequence file?

      1) To enable/store/process binary data

      2) The other objective of using SequenceFile is to pack many small files into a single large SequenceFile for the
      MapReduce computation since the design of Hadoop prefer large files. Sequence file also work well as containers for
      smaller files. HDFS and MapReduce are optimized for large files, so packing small files into a sequencefile makes storing
      and processing the smaller files more efficient.

      SequenceFile provides SequenceFile.Writer, SequenceFile.Reader and SequenceFile.Sorterclasses for writing, reading and sorting respectively. We can use these classes if you want to write/read the data the sequence file via Java program (Java API).

      The SequenceFile.Reader acts as the interface and can read any of the below SequenceFile formats.

      Sequence files can also be compressed for space considerations and based on these compression type users,

      There are three different formats are available for Sequence Files which are depending on Compression type specified

      1)Uncompressed Sequencefile format –Compression type is : NONE

      2)Record Compressed Sequencefile format –RECORD level compressed. Here only values are compressed.

      3)Block compressed Sequencefile format –Both keys and values are collected in blocks which are compressed and stored separately

      Advantage of compressing:
      1) Less Disk space

      2) Less I/O

      3) Less bandwidth required to travel
      4) SequenceFile is split-bale at block level

      All of them share a common header described below.

      SequenceFile consists of a header followed by one or more records . The first three bytes of a sequence file are the bytes SEQ. Which acts a magic number, then followed by a single byte representing the version number. The header contains other fields, including the names of the key and value classes compression details, user defined metadata, and the syn marker. The sync marker is used to allow a reader to synchronize to a record boundary from any position in the file. Each
      file has a randomly generated sync marker, whose value is stored in the header. Sync markers appear between records in the sequence file. It doesn’t necessarily appear between every pair of record, in the case of short records, it may not appear.

      Here is the format of sequence file: Header Record Record Sync

      The header consists of all the metadata which is used by the file reader to determine the format of the file or if the file is compressed or not

      SequenceFile Header:
      version – 3 bytes of magic header SEQ, followed by 1 byte of actual version number (e.g. SEQ4 or SEQ6)

      keyClassName -key class

      valueClassName – value class

      compression – A boolean which specifies if compression is turned on for keys/values in this file.

      blockCompression – A boolean which specifies if block-compression is turned on for keys/values in this file.

      compression codec – CompressionCodec class which is used for compression of keys and/or values (if compression is enabled).

      metadata – SequenceFile.Metadata for this file.

      sync – A sync marker to denote end of the header.

      The above data is available in Sequence file header part.

      1) Uncompressed Sequencefile Data format:
      Uncompressed File Record: Along with the above header, it contains below information.
      Record length: Int (Key length and value length)
      Key length: Int
      Key
      Value

      Record: Record is where the actual data is stored. Following is the format of the records in uncompressed sequence
      files.
      No compression file format:
      Record length
      Key length
      Key
      Value

      Sync marker:
      Sync marker is placed every 100 bytes of data or so, this helps when the file needs to be split for workers processed

      2) Record Compressed File Data format: 
      Values are compressed in record compressed format. It consists of the following data
      Record compression:
      Record length: Int (Key length and value length)
      Key length
      Key
      Compressed value
      The internal format of the records depends on whether compression is enabled , and if it is , whether it is record
      compression or block compression

      Record: Record is where the actual data is stored.
      Sync marker: Sync marker is placed every 100 bytes of data or so, this helps when the file needs to be split for
      workers processed.

      3) Block compressed File data format:
      Block compressed is the format in which both keys and values are collected in blocks separately and these blocks are
      compressed.
      Block compression compresses multiple records at once. It is therefore more compact than and should generally be
      preferred over record compression because
      it has the opportunity to take advantage of similarities between records. Records are added to a block until it reaches
      a minimum size in bytes, defined by the io.seqfile.compress.blocksize property: the default is one million bytes. A sync
      marker is written before the start of every block. The format of a block is a field indicating the number of records in the
      block, followed by four compressed fields: the key lengths, the keys, the value length and the value

      Below are fields available For a block in block compression:
      No of records:
      Compressed key length:
      Compressed key:
      Compressed value length:
      Compressed value:

      Writing text data into sequence file: We will write the data in the sequence file from our text file. We can do it in two ways.

      1) Write a sequencefile using Java program via Hadoop API:
      To create a sequencefile, use one of its createWriter() static methods, which returns a SequenceFile.Writer instance.
      Once you have SequenceFile.Writer instance, you can start writing key-value pairs using append() method.

      Java code:

      package com.hadoop.dataflair;
      
      	import java.net.URI;
      	import org.apache.hadoop.conf.Configuration;
      	import org.apache.hadoop.fs.FileSystem;
      	import org.apache.hadoop.fs.Path;
      	import org.apache.hadoop.io.IOUtils;
      	import org.apache.hadoop.io.IntWritable;
      	import org.apache.hadoop.io.SequenceFile;
      	import org.apache.hadoop.io.Text;
      
      	public class SequenceFileWriteDemo {
      
      private static final String[] DATA = {"One, two, buckle my shoe",
      "Three, four, shut the door",
      "Five, six, pick up sticks",
      "Seven, eight, lay them straight",
      "Nine, ten, a big fat hen"
      };
      
      public static void main(String[] args) throws Exception {
      String uri = args[0];
      Configuration conf = new Configuration();
      FileSystem fs = FileSystem.get(URI.create(uri), conf);
      Path path = new Path(uri);
      
      IntWritable key = new IntWritable();
      Text value = new Text();
      
      SequenceFile.Writer writer = null;
      try{
      writer = SequenceFile.createWriter(fs, conf, path, key.getClass(),
      value.getClass());
      			for(int i=0; i<100 ;i++){
      				key.set(100-i);
      				value.set(DATA[i % DATA.length]);
      				System.out.printf("[%s]\t%s\t%s\n", writer.getLength(),
      key, value);
      	writer.append(key, value);
      			}
      			}finally {
      				IOUtils.closeStream(writer);
      			}
      		}
      
      	}

      For the above program, generate a jar. Copy this jar to hadoop box. and then run the below command.

      hadoop jar <jarname> com.hadoop.dataflair.SequenceFileWriteDemo /user/hdadmin/sequenceout

      2) Write a sequencefile using MapReduce program:
      Below example shows the usage of SequenceFileOutputFormatclass.

      SequenceFileWriterMapper.java: Mapper class

      package com.dataflair.seq;
      
      import java.io.IOException;
      
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.mapreduce.Mapper;
      
      public class SequenceFileWriterMapper extends Mapper<Text, Text, LongWritable,
      Text>{
      
      @Override
      public void map(Text key, Text value, Context context) throws IOException,
      InterruptedException
       {
      context.write(new LongWritable(Long.parseLong(key.toString())), value);
      	}
      
      }

      SequeneFileWriterApp.java: MapReduce setup job

      package com.dataflair.seq;
      
      import org.apache.hadoop.conf.Configuration;
      import org.apache.hadoop.fs.Path;
      import org.apache.hadoop.io.LongWritable;
      import org.apache.hadoop.io.SequenceFile.CompressionType;
      import org.apache.hadoop.io.Text;
      import org.apache.hadoop.io.compress.DefaultCodec;
      import org.apache.hadoop.io.compress.GzipCodec;
      import org.apache.hadoop.io.compress.SnappyCodec;
      import org.apache.hadoop.mapred.MapReduceBase;
      import org.apache.hadoop.mapreduce.Job;
      import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      import org.apache.hadoop.mapreduce.lib.input.KeyValueTextInputFormat;
      import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
      import org.apache.hadoop.util.GenericOptionsParser;
      
      public class SequeneFileWriterApp {
      
      	public static void main(String[] args) throws Exception {
      		Configuration conf = new Configuration();
      	String[] otherArgs = new GenericOptionsParser(conf, args).getRemainingArgs();
      		if(otherArgs.length < 2) {
      			System.err.println("Usage: Sequencefile <in> [<in>....] <out>");
      			System.exit(2);
      		}
      
      		Job job = new Job(conf, "Sequence file Writer");
      		job.setJarByClass(SequeneFileWriterApp.class);
      		job.setMapperClass(SequenceFileWriterMapper.class);
      		job.setMapOutputKeyClass(LongWritable.class);
      		job.setMapOutputValueClass(Text.class);
      		job.setInputFormatClass(KeyValueTextInputFormat.class);
      		job.setOutputFormatClass(SequenceFileOutputFormat.class);
      		job.setNumReduceTasks(0);
      		FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
      		FileOutputFormat.setCompressOutput(job, true);
      		FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
      		SequenceFileOutputFormat.setOutputCompressionType(job,
      CompressionType.RECORD);
      		FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
      		System.exit(job.waitForCompletion(true) ? 0 : 1);
      
      	}
      
      }

      For the above program, generate a jar. Copy this jar to hadoop box. and then run the below command.

      yarn jar <jarname> com.hadoop.dataflair.SequenceFileWriteDemo mytext seqout1

      Here mytext is a text file which is an input for this job. mytext file looks like below. Between the fields, i have specified
      tab seperated string because my input format class is KeyValueTextInputFormat class.
      mytext file:
      1 Amazon
      2 JP Morgon
      3 Bank Of America
      4 ThoughtWorks
      5 DBS Bank
      6 NetFlix
      7 Apple
      8 MAPR

      Reading a sequence file: We can also read the sequence files and convert them to the normal text. This could be done in two ways either by using MapReduce or Hadoop API in normal program.

      1)Read a sequencefile using Java program via Hadoop API: 
      Look at Example 5-11. Reading a SequenceFile in Hadoop:
      The definitive guide document(Page no: 130). If you could understand read sequence usage from the document, then “http://data-flair.training/forums/topic/how-to-handle-record-boundaries-in-text-sequence-files-in-mapreduce-inputsplits&#8221; is easier to answer– For this question, we can easily post the question if you could understand the complete picture of read a sequence file concept.

      2) Read a sequencefile using MapReduce program: Convert sequencefile format data to text data
      Mapper code:

      SequenceFileReaderMapper.java:

      package com.dataflair.seq;
      
      	import java.io.IOException;
      
      	import org.apache.hadoop.io.LongWritable;
      	import org.apache.hadoop.io.Text;
      	import org.apache.hadoop.mapreduce.Mapper;
      
      public class SequenceFileReaderMapper extends Mapper<LongWritable, Text,
      LongWritable,
       Text>
      {
      
      	@Override
      public void map(LongWritable key, Text value, Context context) throws IOException,
       InterruptedException
       {
      			context.write(key, value);
      		}
      
      	}

      Driver code:

      package com.dataflair.seq;
      
      	import org.apache.hadoop.conf.Configuration;
      	import org.apache.hadoop.fs.Path;
      	import org.apache.hadoop.io.LongWritable;
      	import org.apache.hadoop.io.SequenceFile.CompressionType;
      	import org.apache.hadoop.io.Text;
      	import org.apache.hadoop.io.compress.SnappyCodec;
      	import org.apache.hadoop.mapreduce.Job;
      	import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
      	import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
      	import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
      	import org.apache.hadoop.mapreduce.lib.output.SequenceFileOutputFormat;
      	import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
      	import org.apache.hadoop.util.GenericOptionsParser;
      
      	public class SequenceFileReaderApp {
      
      		public static void main(String[] args) throws Exception{
      			Configuration conf = new Configuration();
      			String[] otherArgs = new GenericOptionsParser(conf, args).
      getRemainingArgs();
      			if(otherArgs.length < 2) {
      				System.err.println("Usage: Sequencefile <in> [<in>....] <out>");
      				System.exit(2);
      			}
      
      			Job job = new Job(conf, "Sequence file Reader");
      			job.setJarByClass(SequeneFileWriterApp.class);
      			job.setMapperClass(SequenceFileReaderMapper.class);
      
      			job.setMapOutputKeyClass(LongWritable.class);
      			job.setMapOutputValueClass(Text.class);
      			job.setInputFormatClass(SequenceFileInputFormat.class);
      			job.setOutputFormatClass(TextOutputFormat.class);
      			job.setNumReduceTasks(0);
      			FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
      			FileOutputFormat.setCompressOutput(job, true);
      			FileOutputFormat.setOutputCompressorClass(job, SnappyCodec.class);
      			SequenceFileOutputFormat.setOutputCompressionType(job,
      CompressionType.BLOCK);
      			FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
      			System.exit(job.waitForCompletion(true) ? 0 : 1);
      		}
      	}

      Then execute yarn command for the above mr code.

      If we want to use the compressed formats of the sequence file, we will need to set the configuration in the driver class itself.

      How to enable compression for sequence file?

      FileOutputFormat.setCompressOutput(job, true);
      FileOutputFormat.setOutputCompressorClass(job, DefaultCodec.class);
      SequenceFileOutputFormat.setOutputCompressionType(job,
      CompressionType.BLOCK);

      The above setting shows that, compression class to be used, which is DefaultCodec.class in this case. Last setting it to set the compression type which can be either Block or RECORD.

Viewing 1 reply thread
  • You must be logged in to reply to this topic.