Apache Flume Sink – Types of Sink in Flume

Boost your career with Free Big Data Courses!!

In this article, we will study different sinks supported by Flume. The article covers all the Flume sinks along with their configuration properties and examples.
Let us first see a short introduction to Flume sink.

Introduction to Flume sink

Apache Flume sink is the component of flume agent. It is used for storing data into a centralized store such as HDFS, HBase, etc. Sink consumes events from the Flume channel and pushes them on to the central repository.

In simple words, the component that removes events from a Flume agent and writes it to another flume agent or some other system or a data store is called a sink. Sink is transactional in nature.

Let us now explore different types of sink in detail.

Types of Flume sink

1. HDFS sink

The HDFS sink writes flume events into the HDFS. The file formats supported by the HDFS sink are text files and sequence files. It provides support for compression in both file types.

Based on the elapsed time or number of events or size of data, files can be rolled which means closing the current file and creating a new one. We can also use Hive concepts like bucketing, partitioning data by attributes in it.

The HDFS directory path may sometimes contain formatting escape sequences. These escape sequences will be replaced by this sink for generating a directory or file name that stores the events.

For using the HDFS sink we require Hadoop to be installed. Flume uses Hadoop jars for communicating with the HDFS cluster.

Note: The Hadoop version must support the sync() call.

Technology is evolving rapidly!
Stay updated with DataFlair on WhatsApp!!

Some of the configuration property for HDFS sink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be hdfs.
hdfs.pathIt specifies the HDFS directory path. For eg, hdfs://namenode/flume/webdata/
hdfs.filePrefixFlumeDataIt specifies the name prefixed to the files created by Apache Flume in the HDFS directory.
hdfs.fileSuffixIt specifies the suffix to append to file. For  eg, .avro 
hdfs.inUsePrefixIt specifies the prefix that is used for the temporary files that flume actively writes into.
hdfs.inUseSuffix.tmpIt specifies the suffix that is used for the temporary files that flume actively writes into.
hdfs.rollInterval30It specifies the number of seconds to wait before rolling the current file. If it is equal to 0 then it means never roll based on the time interval.
hdfs.rollSize1024It specifies the file size to trigger roll, in bytes. If it is equal to 0 then it means never roll based on file size.
hdfs.rollCount10It specifies the number of events written to the file before it rolled. If it is equal to 0 then it means never roll based on the number of events.
hdfs.idleTimeout0It specifies the timeout after which inactive files get closed. If it is equal to 0 then it means disabling automatic closing of idle files. 
hdfs.batchSize100It specifies the number of events written to file before it is flushed to HDFS.
hdfs.fileTypeSequenceFileIt specifies the File format. File formats that are currently supported are SequenceFile, DataStream or CompressedStream.

The DataStream will not compress the output file and please don’t set codeC. The CompressedStream requires set hdfs.codeC with an available codeC

hdfs.maxOpenFiles5000It specifies the number of files allowed to be open. The oldest file is closed if this number is exceeded.
hdfs.roundfalseIt specifies should the timestamp be rounded down. If it is set to true then it affects all time-based escape sequences except %t
hdfs.roundValue1Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than the current time.
hdfs.roundUnitsecondIt specifies the unit of the round down value. It can be second, minute or hour

Example for the agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = hdfs
agent1.sinks.sk1.channel = ch1
agent1.sinks.sk1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
agent1.sinks.sk1.hdfs.filePrefix = events-
agent1.sinks.sk1.hdfs.round = true
agent1.sinks.sk1.hdfs.roundValue = 10
agent1.sinks.sk1.hdfs.roundUnit = minute

This above configuration rounds down the timestamp to the last 10th minute.

For example, a flume event with the timestamp 11:44:34 AM, April 08, 2020, will cause the hdfs path to become /flume/events/2020-04-08/1140/00.

2. Hive sink

The Hive sink streams flume events containing delimited text or JSON data directly into the Hive table or partition. The Flume Events are written using Hive transactions.

As soon as a set of flume events are committed to Hive, they become immediately visible to Hive queries. We can either pre-create or optionally Flume can create the partitions to which the Flume will stream to if they are missing.

Fields from incoming flume event data are mapped to the corresponding columns in a table in Hive.

Some of the configuration property of hive are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be hive.
hive.metastoreIt specifies the Hive metastore URI. For eg, thrift://a.b.com:9083 
hive.databaseIt specifies the Hive database name.
hive.tableIt specifies the Hive table name.
hive.partitionIt specifies the comma-separated list of partition values identifying the partition to write to. It may contain escape sequences. For E.g: If the table is partitioned by continent: string, country :string, and  time : string, then ‘Asia,India,2020-04-08-01-21’ will indicate continent=Asia,country=India,time=2020-04-08-01-21
batchSize15000It specifies the maximum number of flume events written to Hive in a single Hive transaction
serializerSerializer is responsible for parsing out the fields from the event and mapping them to the columns in the table in Hive. The choice of serializer depends upon the format of the data in the flume event. The supported serializers are DELIMITED and JSON
roundUnitminuteIt specifies the unit of the round down value. It can be second, minute or hour
roundValue1Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than the current time
useLocalTimeStampfalseIt specifies whether to use the local time (instead of the timestamp from the event header) while replacing the escape sequences.
serializer.delimiter,(Type: string). It specifies the field delimiter in the incoming data. For using special characters, surround characters with double quotes like “\t”
serializer.fieldnamesThe mapping from input fields to columns in the table in Hive. It is Specified as a comma-separated list (no spaces) of the hive table columns name, identifying the input fields in order of their occurrence. For skipping fields leave the column name unspecified. For eg. ‘time,,ip,message’ indicates the 1st, 3rd and 4th fields in the input map to time, ip and message columns in the hive table.
serializer.serdeSeparatorCtrl-A(Type: character). It customizes the separators which are used by the underlying serde. We can increase efficiency if:

  1. the fields in serializer.fieldnames are in the same order as table columns
  2. the serializer.delimiter is same as the serializer.serdeSeparator
  3. the number of fields in serializer.fieldnames is less than or equal to the number of table columns, as the fields in the incoming event body do not need to be reordered to match the order of table columns.

Use single quotes for special characters like ‘\t’. Ensure that the input fields do not contain this character. 

NOTE: If serializer.delimiter is a single character, preferably set this to the same character

Example for Hive table :

create table logs ( id int , msg string )
    partitioned by (continent string, country string, time string)
    clustered by (id) into 5 buckets
    stored as orc;

Example for agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.channels.ch1.type = memory
agent1.sinks = sk1
agent1.sinks.sk1.type = hive
agent1.sinks.sk1.channel = ch1
agent1.sinks.sk1.hive.metastore = thrift://127.0.0.1:9083
agent1.sinks.sk1.hive.database = logsdb
agent1.sinks.sk1.hive.table = logs
agent1.sinks.sk1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
agent1.sinks.sk1.useLocalTimeStamp = false
agent1.sinks.sk1.round = true
agent1.sinks.sk1.roundValue = 10
agent1.sinks.sk1.roundUnit = minute
agent1.sinks.sk1.serializer = DELIMITED
agent1.sinks.sk1.serializer.delimiter = "\t"
agent1.sinks.sk1.serializer.serdeSeparator = '\t'
agent1.sinks.sk1.serializer.fieldnames =id,,msg

This above configuration rounds down the timestamp to the last 10th minute.

For example, a flume event with the timestamp header set to 11:44:34 AM, April 08, 2020, and ‘country’ header set to ‘India’ will evaluate to the partition (continent=’asia’, country=’India’, time=‘2020-04-08-11-40’.

The serializer is configured in such a way that to accept tab-separated input containing three fields and skip the second field.

3. Logger sink

The Logger sink logs event at the information level. It is used for testing or debugging purposes. Logger sink is the only exception that does not require the extra configuration.

The configuration property for logger sink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be logger.
maxBytesToLog16It specifies the maximum number of bytes of the Event body to log. 

Example for the agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = logger
agent1.sinks.sk1.channel = ch1

4. Avro sink

The Avro sink turns Flume events into Avro events and sends them to the configured hostname/port pair. The flume events are taken in batches of configured batch size from the configured Channel. The Avro sink forms one half of the Apache Flume’s tiered collection support.

Some of the properties of the Avro sink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be avro.
hostnameIt specifies the hostname or IP address to bind to.
portIt specifies the port # to listen on.
batchSize100It specifies the number of events to batch together for sending.

Example for the agent named agent1, sink sk1, channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = avro
agent1.sinks.sk1.channel = ch1
agent1.sinks.sk1.hostname = 10.10.10.10
agent1.sinks.sk1.port = 4545

5. Thrift sink

The Thrift sink turns the Flume events into the Thrift events and sends them to the configured hostname/port pair. The flume events are taken in batches of configured batch size from the configured Channel.

We can configure the Thrift sink to start in secure mode by enabling the Kerberos authentication. For communicating with a Thrift source which is started in the secure mode we need the Thrift sink to be operated in a secure mode.

The client-principal and client-keytab are two properties used by the Thrift sink for authentication to the Kerberos KDC.

Some of the configuration property for Thrift source are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be thrift.
hostnameIt specifies the hostname or IP address to bind to.
portIt specifies the port # to listen on.
batchSize100It specifies the number of events to batch together for sending.
kerberosIt specifies whether to enable Kerberos authentication or not. For enabling Kerberos authentication, it is set to true. 

In kerberos mode, client-principal, client-keytab, and server-principal are required for successful authentication and communication to a Kerberos enabled Thrift Source.

client-principalIt specifies the Kerberos principal used by the Thrift Sink for authenticating to the Kerberos KDC.
client-keytabIt specifies the keytab location used by the Thrift Sink in combination with the client-principal for authenticating to the Kerberos KDC.
server-principalThe Kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to.

Example for the agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = thrift
agent1.sinks.sk1.channel = ch1
agent1.sinks.sk1.hostname = 10.10.10.10
agent1.sinks.sk1.port = 4545

6. IRC sink

This Apache Flume sink takes messages from the attached flume channel and relays those to the configured IRC destinations.

Some of the configuration properties for IRC sink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be irc.
hostnameIt specifies the hostname or IP address to bind to.
port6667It specifies the port number of the remote host to connect
nick100It specifies the Nickname
chanIt specifies the channel.

Example for the agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = irc
agent1.sinks.sk1.channel = ch1
agent1.sinks.sk1.hostname = irc.yourdomain.com
agent1.sinks.sk1.nick = flume
agent1.sinks.sk1.chan = #flume

7. File Roll sink

This sink stores event on the local filesystem.

Some of the properties for file roll sink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be file_roll.
sink.directoryIt specifies the directory where files will be stored
sink.pathManagerDEFAULTIt specifies the PathManager implementation to use.
sink.pathManager.extensionIt specifies the file extension if the default PathManager is used
sink.batchSize100It specifies the batch size. 

Example for the agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = file_roll
agent1.sinks.sk1.channel = ch1
agent1.sinks.sk1.sink.directory = /var/log/flume

8. Null sink

Null sink discards all flume events it receives from the channel.

Some of the property for the null sink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be null.
batchSize100It specifies the batch size. 

Example for the agent named agent1, sink sk1, channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = null
agent1.sinks.sk1.channel = ch1

9. HBase Sink

9.1. HBaseSink

The HBaseSink writes data to the HBase.The Hbase configuration is taken up from the first hbase-site.xml found in the classpath.

A class that is implementing the HbaseEventSerializer specified by the configuration is used for converting the events into HBase puts and/or increments.

These HBase puts and increments are then written to HBase. This Flume sink guarantees the same consistency as HBase, which is row-wise atomicity.

In the case of failure of writing certain events, this sink will replay all the events in that transaction.
This sink supports writing data to the secure HBase. For writing data to the secure HBase, the agent must have the write permissions to the table the sink is configured to write to.

We can specify the principal and keytab property use for authentication against the KDC in the configuration.

The hbase-site.xml in the Flume agent’s classpath must have an authentication set to Kerberos.

Two serializers are provided with Apache Flume.

a) SimpleHbaseEventSerializer

(org.apache.flume.sink.hbase.SimpleHbaseEventSerializer)

This serializer writes the event body as it is to HBase, and optionally increments a column in Hbase.

b) RegexHbaseEventSerializer

(org.apache.flume.sink.hbase.RegexHbaseEventSerializer)

This serializer breaks the event body on the basis of the given regex. Then it writes each part into different columns.

The type is the FQCN: org.apache.flume.sink.hbase.HBaseSink.

Some of the configuration property for HBaseSink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be hbase.
tableIt specifies the name of the table in Hbase to write to.
columnFamilyIt specifies the column family in Hbase to write to.
batchSize100It specifies the number of flume events to be written per transaction.
kerberosPrincipalIt specifies the Kerberos user principal for accessing secure HBase.
kerberosKeytabIt specifies the Kerberos keytab for accessing secure HBase.
serializerorg.apache.flume.sink.hbase.SimpleHbaseEventSerializerDefault increment column = “iCol”, payload column = “pCol”.

Example for the agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = hbase
agent1.sinks.sk1.table = employee
agent1.sinks.sk1.columnFamily = family1
agent1.sinks.sk1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
agent1.sinks.sk1.channel = ch1

9.2 HBase2Sink

The sink is the equivalent of HBaseSink for the HBase version 2.

The functionality and the configuration parameters for HBase2Sink are the same as in the case of HBaseSink except for the type which is hbase2 and the package/class names.

The type is the FQCN: org.apache.flume.sink.hbase2.HBase2Sink.

Some of the property for HBase2Sink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be hbase2.
tableIt specifies the name of the table in Hbase to write to.
columnFamilyIt specifies the column family in Hbase to write to.
serializerorg.apache.flume.sink.hbase2.SimpleHBase2EventSerializerDefault increment column = “iCol”, payload column = “pCol”.

Example for the agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = hbase2
agent1.sinks.sk1.table = employee
agent1.sinks.sk1.columnFamily = family1
agent1.sinks.sk1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer
agent1.sinks.sk1.channel = ch1

9.3 AsyncHBaseSink

AsyncHBaseSink writes data to HBase by using an asynchronous model.

It uses a class which implements AsyncHbaseEventSerializer specified by the configuration for converting the flume events into HBase puts and/or increments. These HBase puts and increments are then written to HBase.

AsyncHBaseSink uses the Asynchbase API for writing to HBase. This flume sink guarantees the same consistency as HBase, which is currently row-wise atomicity.

In the case of failure of writing certain events, this sink will replay all the events in that transaction. We can use this sink only with HBase 1.x. For HBase 2, the async library used by this sink is not available.

The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink.

Some of the properties for AsyncHBaseSink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be asynchbase.
tableIt specifies the name of the table in Hbase to write to.
columnFamilyIt specifies the column family in Hbase to write to.
serializerorg.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer

Example for the agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = asynchbase
agent1.sinks.sk1.table = employee
agent1.sinks.sk1.columnFamily =family1
agent1.sinks.sk1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer
agent1.sinks.sk1.channel = ch1

10. MorphlineSolrSink

MorphlineSolrSink extracts data from the Flume events. They transform it and load them in near-real-time into the Apache Solr servers. Apache Solr servers in turn cater queries to the end-users or search applications.

It is the best option for the use cases that stream raw data into HDFS through the HdfsSink and simultaneously extract, transform, and load the same raw data into Apache Solr servers through MorphlineSolrSink.

We can configure the ETL functionality by using a morphline configuration file. This file defines a chain of transformation commands that the pipe event records from one command to another.

A morphline command is a little bit like a Flume Interceptor. We can embed Morphlines into Hadoop components such as Apache Flume.

The type is the FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink

Some of the properties for MorphlineSolrSink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be org.apache.flume.sink.solr.morphline.MorphlineSolrSink.
morphlineFileIt specifies the relative or absolute path on the local file system to the morphline configuration file. For eg: /etc/flume-ng/conf/morphline.conf
batchSize1000It specifies the maximum number of events to take per flume transaction.
morphlineIdnullIt specifies the optional name that can be  used for identifying a morphline if there are multiple morphlines in a morphline config file.

Example for the agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
agent1.sinks.sk1.channel = ch1
agent1.sinks.sk1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# agent1.sinks.sk1.morphlineId = morphline1
# agent1.sinks.sk1.batchSize = 1000
# agent1.sinks.sk1.batchDurationMillis = 1000

11. ElasticSearchSink

ElasticSearchSink writes data to an elastic search cluster. The events will be written by default so that the Kibana graphical interface displays them as if logstash wrote them.

Make sure that the elasticsearch and the Lucene-core jar which are required for our environment are placed in the lib directory of Flume installation.

The Elasticsearch requires a major version of the client JAR to match with that of the server and both are running the same minor version of the JVM. If this is not correct then SerializationExceptions will appear.

Every day, events will be written to a new index. The name will be <indexName>-yyyy-MM-dd where <indexName> is the indexName parameter. The sink will start writing to a new index at midnight UTC.

By default, Events are serialized for elasticsearch by the ElasticSearchLogStashEventSerializer. The Serializer parameter can override this behavior.

The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink

Some of the properties for ElasticSearchSink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be org.apache.flume.sink.elasticsearch.ElasticSearchSink
hostNamesIt specifies the comma-separated list of the hostname:port. If in case the port is not present the default port ‘9300’ will be used
clusterNameelasticsearchIt specifies the name of the ElasticSearch cluster to connect to
batchSize100It specifies the number of events to be written per transaction.  
serializerorg.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializerThe ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. The implementations of either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred.

Example for the agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = elasticsearch
agent1.sinks.sk1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
agent1.sinks.sk1.indexName = employee_index
agent1.sinks.sk1.indexType = bar_type
agent1.sinks.sk1.clusterName =employee_cluster
agent1.sinks.sk1.batchSize = 500
agent1.sinks.sk1.ttl = 5d
agent1.sinks.sk1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
agent1.sinks.sk1.channel = ch1

12. Kite Dataset Sink

Kite Dataset Sink is an experimental sink. This sink writes events to a Kite Dataset.

Kite Dataset sink deserializes the body of each incoming flume event and stores the resulting record in the Kite Dataset. By loading a dataset by URI it determines the target Dataset.

It supports only Avro serialization. The record schema must be passed in the event headers by using any of the two:

a) flume.avro.schema.literal with the JSON schema representation

b) flume.avro.schema.url with a URL where the schema may be found.

Some of the properties for Kite Dataset Sink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be org.apache.flume.sink.kite.DatasetSink
kite.dataset.uriIt specifies the URI of the dataset to open.
kite.batchSize100It specifies the number of records to process in each batch.
kite.dataset.uriIt specifies the URI of the repository to open.

13. Kafka sink

Kafka sink is a Flume Sink implementation which publishes data to a Kafka topic. The main objective behind it is to integrate Apache Flume with Kafka. This makes pull-based processing systems to process the data coming from various Flume sources.

This sink currently supports Kafka server 0.10.1.0 release or higher releases.

Some of the properties for Kafka Sink are:

Property NameDefault ValueDescription
typeIt specifies the component type. It must be org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.serversIt specifies the list of brokers to which the Kafka-Sink will connect to, to get the list of topic partitions.  This list can be a partial list of brokers. It is recommended at least two for HA. The format used is comma-separated list of hostname:port
kafka.topicdefault-flume-topicIt specifies the topic in Kafka to which the messages will be published. If we configure this parameter then messages will be published to this topic. In case if the event header contains a “topic” field then the event will be published to that topic overriding the topic configured here.
flumeBatchSize100It specifies how many messages should be processed in one batch. Larger batches improve throughput while adding latency.
kafka.producer.acks1It specifies how many replicas must acknowledge a message before it is considered successfully written. The values accepted are 0  which means never wait for acknowledgment, 1  which means wait for the leader only, and -1  which means wait for all replicas. Set this to -1 to avoid data loss in some cases of leader failure.

Example for the agent named agent1, sink sk1, and channel ch1:

agent1.sinks.sk1.channel = ch1
agent1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink
agent1.sinks.sk1.kafka.topic = mytopic
agent1.sinks.sk1.kafka.bootstrap.servers = localhost:9092
agent1.sinks.sk1.kafka.flumeBatchSize = 20
agent1.sinks.sk1.kafka.producer.acks = 1
agent1.sinks.sk1.kafka.producer.linger.ms = 1
agent1.sinks.sk1.kafka.producer.compression.type = snappy

14. HTTP sink

HTTP sink takes events from the channel and sends them to a remote service by using an HTTP POST request. The content of the event is sent as the POST body.

This sink handles errors depending on the HTTP response returned by a target server.

The HTTP sink back off or ready status is configurable, as is the transaction commit/rollback result and whether the event contributes to the successful event drain count.

If a malformed HTTP response is returned by the server where the status code is not readable then it will result in a backoff signal. In such a case the event is not consumed from the channel.

Some of the properties for HTTP Sink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be http
endpointIt specifies the fully qualified URL endpoint to POST to
connectTimeout5000It specifies the socket connection timeout in milliseconds
requestTimeout5000It specifies the maximum request processing time in milliseconds
contentTypeHeadertext/plainIt specifies the  HTTP Content-Type header
acceptHeadertext/plainIt specifies the HTTP Accept header value
defaultBackofftrueIt specifies whether to backoff by default on receiving all HTTP status codes 
defaultRollbacktrueIt specifies whether to rollback by default on receiving all HTTP status codes
defaultIncrementMetricsfalseIt specifies whether to increment metrics by default on receiving all HTTP status codes
backoff.CODEIt configures a specific backoff for an individual code or a group code. 
rollback.CODEIt configures a specific rollback for an individual code or a group code. 
incrementMetrics.CODEIt configures a specific metrics increment for an individual or a group code. 

Example for the agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = http
agent1.sinks.sk1.channel = ch1
agent1.sinks.sk1.endpoint = http://localhost:8080/someuri
agent1.sinks.sk1.connectTimeout = 2000
agent1.sinks.sk1.requestTimeout = 2000
agent1.sinks.sk1.acceptHeader = application/json
agent1.sinks.sk1.contentTypeHeader = application/json
agent1.sinks.sk1.defaultBackoff = true
agent1.sinks.sk1.defaultRollback = true
agent1.sinks.sk1.defaultIncrementMetrics = false
agent1.sinks.sk1.backoff.4XX = false
agent1.sinks.sk1.rollback.4XX = false
agent1.sinks.sk1.incrementMetrics.4XX = true
agent1.sinks.sk1.backoff.200 = false
agent1.sinks.sk1.rollback.200 = false
agent1.sinks.sk1.incrementMetrics.200 = true

15. Custom sink

This sink is our own implementation for the Sink interface. We must include a custom sink’s class and its dependencies in the agent’s classpath while starting the Flume agent.

Some of the properties for the custom sink are:

Property NameDefault ValueDescription
channelSpecify the channels through which an event travels.
typeIt specifies the component type. It must be our FQCN

Example for agent named agent1, sink sk1, and channel ch1:

agent1.channels = ch1
agent1.sinks = sk1
agent1.sinks.sk1.type = org.example.MySink
agent1.sinks.sk1.channel = ch1

Summary

I hope the article clearly explained different sinks supported by Flume. The flume sink is the component of the flume agent that consumes data from the flume channel and pushes them on the central repository. The article had explained different flume sinks along with their properties and examples.

Did you like our efforts? If Yes, please give DataFlair 5 Stars on Google

follow dataflair on YouTube

Leave a Reply

Your email address will not be published. Required fields are marked *