Apache Flume Sink – Types of Sink in Flume
In this article, we will study different sinks supported by Flume. The article covers all the Flume sinks along with their configuration properties and examples.
Let us first see a short introduction to Flume sink.
Introduction to Flume sink
Apache Flume sink is the component of flume agent. It is used for storing data into a centralized store such as HDFS, HBase, etc. Sink consumes events from the Flume channel and pushes them on to the central repository.
In simple words, the component that removes events from a Flume agent and writes it to another flume agent or some other system or a data store is called a sink. Sink is transactional in nature.
Let us now explore different types of sink in detail.
Types of Flume sink
1. HDFS sink
The HDFS sink writes flume events into the HDFS. The file formats supported by the HDFS sink are text files and sequence files. It provides support for compression in both file types.
Based on the elapsed time or number of events or size of data, files can be rolled which means closing the current file and creating a new one. We can also use Hive concepts like bucketing, partitioning data by attributes in it.
The HDFS directory path may sometimes contain formatting escape sequences. These escape sequences will be replaced by this sink for generating a directory or file name that stores the events.
For using the HDFS sink we require Hadoop to be installed. Flume uses Hadoop jars for communicating with the HDFS cluster.
Note: The Hadoop version must support the sync() call.
Some of the configuration property for HDFS sink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be hdfs. |
hdfs.path | – | It specifies the HDFS directory path. For eg, hdfs://namenode/flume/webdata/ |
hdfs.filePrefix | FlumeData | It specifies the name prefixed to the files created by Apache Flume in the HDFS directory. |
hdfs.fileSuffix | – | It specifies the suffix to append to file. For eg, .avro |
hdfs.inUsePrefix | – | It specifies the prefix that is used for the temporary files that flume actively writes into. |
hdfs.inUseSuffix | .tmp | It specifies the suffix that is used for the temporary files that flume actively writes into. |
hdfs.rollInterval | 30 | It specifies the number of seconds to wait before rolling the current file. If it is equal to 0 then it means never roll based on the time interval. |
hdfs.rollSize | 1024 | It specifies the file size to trigger roll, in bytes. If it is equal to 0 then it means never roll based on file size. |
hdfs.rollCount | 10 | It specifies the number of events written to the file before it rolled. If it is equal to 0 then it means never roll based on the number of events. |
hdfs.idleTimeout | 0 | It specifies the timeout after which inactive files get closed. If it is equal to 0 then it means disabling automatic closing of idle files. |
hdfs.batchSize | 100 | It specifies the number of events written to file before it is flushed to HDFS. |
hdfs.fileType | SequenceFile | It specifies the File format. File formats that are currently supported are SequenceFile, DataStream or CompressedStream. The DataStream will not compress the output file and please don’t set codeC. The CompressedStream requires set hdfs.codeC with an available codeC |
hdfs.maxOpenFiles | 5000 | It specifies the number of files allowed to be open. The oldest file is closed if this number is exceeded. |
hdfs.round | false | It specifies should the timestamp be rounded down. If it is set to true then it affects all time-based escape sequences except %t |
hdfs.roundValue | 1 | Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than the current time. |
hdfs.roundUnit | second | It specifies the unit of the round down value. It can be second, minute or hour |
Example for the agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = hdfs agent1.sinks.sk1.channel = ch1 agent1.sinks.sk1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S agent1.sinks.sk1.hdfs.filePrefix = events- agent1.sinks.sk1.hdfs.round = true agent1.sinks.sk1.hdfs.roundValue = 10 agent1.sinks.sk1.hdfs.roundUnit = minute
This above configuration rounds down the timestamp to the last 10th minute.
For example, a flume event with the timestamp 11:44:34 AM, April 08, 2020, will cause the hdfs path to become /flume/events/2020-04-08/1140/00.
2. Hive sink
The Hive sink streams flume events containing delimited text or JSON data directly into the Hive table or partition. The Flume Events are written using Hive transactions.
As soon as a set of flume events are committed to Hive, they become immediately visible to Hive queries. We can either pre-create or optionally Flume can create the partitions to which the Flume will stream to if they are missing.
Fields from incoming flume event data are mapped to the corresponding columns in a table in Hive.
Some of the configuration property of hive are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be hive. |
hive.metastore | – | It specifies the Hive metastore URI. For eg, thrift://a.b.com:9083 |
hive.database | – | It specifies the Hive database name. |
hive.table | – | It specifies the Hive table name. |
hive.partition | – | It specifies the comma-separated list of partition values identifying the partition to write to. It may contain escape sequences. For E.g: If the table is partitioned by continent: string, country :string, and time : string, then ‘Asia,India,2020-04-08-01-21’ will indicate continent=Asia,country=India,time=2020-04-08-01-21 |
batchSize | 15000 | It specifies the maximum number of flume events written to Hive in a single Hive transaction |
serializer | – | Serializer is responsible for parsing out the fields from the event and mapping them to the columns in the table in Hive. The choice of serializer depends upon the format of the data in the flume event. The supported serializers are DELIMITED and JSON |
roundUnit | minute | It specifies the unit of the round down value. It can be second, minute or hour |
roundValue | 1 | Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than the current time |
useLocalTimeStamp | false | It specifies whether to use the local time (instead of the timestamp from the event header) while replacing the escape sequences. |
serializer.delimiter | , | (Type: string). It specifies the field delimiter in the incoming data. For using special characters, surround characters with double quotes like “\t” |
serializer.fieldnames | – | The mapping from input fields to columns in the table in Hive. It is Specified as a comma-separated list (no spaces) of the hive table columns name, identifying the input fields in order of their occurrence. For skipping fields leave the column name unspecified. For eg. ‘time,,ip,message’ indicates the 1st, 3rd and 4th fields in the input map to time, ip and message columns in the hive table. |
serializer.serdeSeparator | Ctrl-A | (Type: character). It customizes the separators which are used by the underlying serde. We can increase efficiency if:
Use single quotes for special characters like ‘\t’. Ensure that the input fields do not contain this character. NOTE: If serializer.delimiter is a single character, preferably set this to the same character |
Example for Hive table :
create table logs ( id int , msg string ) partitioned by (continent string, country string, time string) clustered by (id) into 5 buckets stored as orc;
Example for agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.channels.ch1.type = memory agent1.sinks = sk1 agent1.sinks.sk1.type = hive agent1.sinks.sk1.channel = ch1 agent1.sinks.sk1.hive.metastore = thrift://127.0.0.1:9083 agent1.sinks.sk1.hive.database = logsdb agent1.sinks.sk1.hive.table = logs agent1.sinks.sk1.hive.partition = asia,%{country},%y-%m-%d-%H-%M agent1.sinks.sk1.useLocalTimeStamp = false agent1.sinks.sk1.round = true agent1.sinks.sk1.roundValue = 10 agent1.sinks.sk1.roundUnit = minute agent1.sinks.sk1.serializer = DELIMITED agent1.sinks.sk1.serializer.delimiter = "\t" agent1.sinks.sk1.serializer.serdeSeparator = '\t' agent1.sinks.sk1.serializer.fieldnames =id,,msg
This above configuration rounds down the timestamp to the last 10th minute.
For example, a flume event with the timestamp header set to 11:44:34 AM, April 08, 2020, and ‘country’ header set to ‘India’ will evaluate to the partition (continent=’asia’, country=’India’, time=‘2020-04-08-11-40’.
The serializer is configured in such a way that to accept tab-separated input containing three fields and skip the second field.
3. Logger sink
The Logger sink logs event at the information level. It is used for testing or debugging purposes. Logger sink is the only exception that does not require the extra configuration.
The configuration property for logger sink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be logger. |
maxBytesToLog | 16 | It specifies the maximum number of bytes of the Event body to log. |
Example for the agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = logger agent1.sinks.sk1.channel = ch1
4. Avro sink
The Avro sink turns Flume events into Avro events and sends them to the configured hostname/port pair. The flume events are taken in batches of configured batch size from the configured Channel. The Avro sink forms one half of the Apache Flume’s tiered collection support.
Some of the properties of the Avro sink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be avro. |
hostname | – | It specifies the hostname or IP address to bind to. |
port | – | It specifies the port # to listen on. |
batchSize | 100 | It specifies the number of events to batch together for sending. |
Example for the agent named agent1, sink sk1, channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = avro agent1.sinks.sk1.channel = ch1 agent1.sinks.sk1.hostname = 10.10.10.10 agent1.sinks.sk1.port = 4545
5. Thrift sink
The Thrift sink turns the Flume events into the Thrift events and sends them to the configured hostname/port pair. The flume events are taken in batches of configured batch size from the configured Channel.
We can configure the Thrift sink to start in secure mode by enabling the Kerberos authentication. For communicating with a Thrift source which is started in the secure mode we need the Thrift sink to be operated in a secure mode.
The client-principal and client-keytab are two properties used by the Thrift sink for authentication to the Kerberos KDC.
Some of the configuration property for Thrift source are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be thrift. |
hostname | – | It specifies the hostname or IP address to bind to. |
port | – | It specifies the port # to listen on. |
batchSize | 100 | It specifies the number of events to batch together for sending. |
kerberos | It specifies whether to enable Kerberos authentication or not. For enabling Kerberos authentication, it is set to true. In kerberos mode, client-principal, client-keytab, and server-principal are required for successful authentication and communication to a Kerberos enabled Thrift Source. | |
client-principal | It specifies the Kerberos principal used by the Thrift Sink for authenticating to the Kerberos KDC. | |
client-keytab | It specifies the keytab location used by the Thrift Sink in combination with the client-principal for authenticating to the Kerberos KDC. | |
server-principal | The Kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to. |
Example for the agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = thrift agent1.sinks.sk1.channel = ch1 agent1.sinks.sk1.hostname = 10.10.10.10 agent1.sinks.sk1.port = 4545
6. IRC sink
This Apache Flume sink takes messages from the attached flume channel and relays those to the configured IRC destinations.
Some of the configuration properties for IRC sink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be irc. |
hostname | – | It specifies the hostname or IP address to bind to. |
port | 6667 | It specifies the port number of the remote host to connect |
nick | 100 | It specifies the Nickname |
chan | It specifies the channel. |
Example for the agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = irc agent1.sinks.sk1.channel = ch1 agent1.sinks.sk1.hostname = irc.yourdomain.com agent1.sinks.sk1.nick = flume agent1.sinks.sk1.chan = #flume
7. File Roll sink
This sink stores event on the local filesystem.
Some of the properties for file roll sink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be file_roll. |
sink.directory | – | It specifies the directory where files will be stored |
sink.pathManager | DEFAULT | It specifies the PathManager implementation to use. |
sink.pathManager.extension | – | It specifies the file extension if the default PathManager is used |
sink.batchSize | 100 | It specifies the batch size. |
Example for the agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = file_roll agent1.sinks.sk1.channel = ch1 agent1.sinks.sk1.sink.directory = /var/log/flume
8. Null sink
Null sink discards all flume events it receives from the channel.
Some of the property for the null sink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be null. |
batchSize | 100 | It specifies the batch size. |
Example for the agent named agent1, sink sk1, channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = null agent1.sinks.sk1.channel = ch1
9. HBase Sink
9.1. HBaseSink
The HBaseSink writes data to the HBase.The Hbase configuration is taken up from the first hbase-site.xml found in the classpath.
A class that is implementing the HbaseEventSerializer specified by the configuration is used for converting the events into HBase puts and/or increments.
These HBase puts and increments are then written to HBase. This Flume sink guarantees the same consistency as HBase, which is row-wise atomicity.
In the case of failure of writing certain events, this sink will replay all the events in that transaction.
This sink supports writing data to the secure HBase. For writing data to the secure HBase, the agent must have the write permissions to the table the sink is configured to write to.
We can specify the principal and keytab property use for authentication against the KDC in the configuration.
The hbase-site.xml in the Flume agent’s classpath must have an authentication set to Kerberos.
Two serializers are provided with Apache Flume.
a) SimpleHbaseEventSerializer
(org.apache.flume.sink.hbase.SimpleHbaseEventSerializer)
This serializer writes the event body as it is to HBase, and optionally increments a column in Hbase.
b) RegexHbaseEventSerializer
(org.apache.flume.sink.hbase.RegexHbaseEventSerializer)
This serializer breaks the event body on the basis of the given regex. Then it writes each part into different columns.
The type is the FQCN: org.apache.flume.sink.hbase.HBaseSink.
Some of the configuration property for HBaseSink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be hbase. |
table | – | It specifies the name of the table in Hbase to write to. |
columnFamily | – | It specifies the column family in Hbase to write to. |
batchSize | 100 | It specifies the number of flume events to be written per transaction. |
kerberosPrincipal | – | It specifies the Kerberos user principal for accessing secure HBase. |
kerberosKeytab | – | It specifies the Kerberos keytab for accessing secure HBase. |
serializer | org.apache.flume.sink.hbase.SimpleHbaseEventSerializer | Default increment column = “iCol”, payload column = “pCol”. |
Example for the agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = hbase agent1.sinks.sk1.table = employee agent1.sinks.sk1.columnFamily = family1 agent1.sinks.sk1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer agent1.sinks.sk1.channel = ch1
9.2 HBase2Sink
The sink is the equivalent of HBaseSink for the HBase version 2.
The functionality and the configuration parameters for HBase2Sink are the same as in the case of HBaseSink except for the type which is hbase2 and the package/class names.
The type is the FQCN: org.apache.flume.sink.hbase2.HBase2Sink.
Some of the property for HBase2Sink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be hbase2. |
table | – | It specifies the name of the table in Hbase to write to. |
columnFamily | – | It specifies the column family in Hbase to write to. |
serializer | org.apache.flume.sink.hbase2.SimpleHBase2EventSerializer | Default increment column = “iCol”, payload column = “pCol”. |
Example for the agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = hbase2 agent1.sinks.sk1.table = employee agent1.sinks.sk1.columnFamily = family1 agent1.sinks.sk1.serializer = org.apache.flume.sink.hbase2.RegexHBase2EventSerializer agent1.sinks.sk1.channel = ch1
9.3 AsyncHBaseSink
AsyncHBaseSink writes data to HBase by using an asynchronous model.
It uses a class which implements AsyncHbaseEventSerializer specified by the configuration for converting the flume events into HBase puts and/or increments. These HBase puts and increments are then written to HBase.
AsyncHBaseSink uses the Asynchbase API for writing to HBase. This flume sink guarantees the same consistency as HBase, which is currently row-wise atomicity.
In the case of failure of writing certain events, this sink will replay all the events in that transaction. We can use this sink only with HBase 1.x. For HBase 2, the async library used by this sink is not available.
The type is the FQCN: org.apache.flume.sink.hbase.AsyncHBaseSink.
Some of the properties for AsyncHBaseSink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be asynchbase. |
table | – | It specifies the name of the table in Hbase to write to. |
columnFamily | – | It specifies the column family in Hbase to write to. |
serializer | org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer |
Example for the agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = asynchbase agent1.sinks.sk1.table = employee agent1.sinks.sk1.columnFamily =family1 agent1.sinks.sk1.serializer = org.apache.flume.sink.hbase.SimpleAsyncHbaseEventSerializer agent1.sinks.sk1.channel = ch1
10. MorphlineSolrSink
MorphlineSolrSink extracts data from the Flume events. They transform it and load them in near-real-time into the Apache Solr servers. Apache Solr servers in turn cater queries to the end-users or search applications.
It is the best option for the use cases that stream raw data into HDFS through the HdfsSink and simultaneously extract, transform, and load the same raw data into Apache Solr servers through MorphlineSolrSink.
We can configure the ETL functionality by using a morphline configuration file. This file defines a chain of transformation commands that the pipe event records from one command to another.
A morphline command is a little bit like a Flume Interceptor. We can embed Morphlines into Hadoop components such as Apache Flume.
The type is the FQCN: org.apache.flume.sink.solr.morphline.MorphlineSolrSink
Some of the properties for MorphlineSolrSink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be org.apache.flume.sink.solr.morphline.MorphlineSolrSink. |
morphlineFile | – | It specifies the relative or absolute path on the local file system to the morphline configuration file. For eg: /etc/flume-ng/conf/morphline.conf |
batchSize | 1000 | It specifies the maximum number of events to take per flume transaction. |
morphlineId | null | It specifies the optional name that can be used for identifying a morphline if there are multiple morphlines in a morphline config file. |
Example for the agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink agent1.sinks.sk1.channel = ch1 agent1.sinks.sk1.morphlineFile = /etc/flume-ng/conf/morphline.conf # agent1.sinks.sk1.morphlineId = morphline1 # agent1.sinks.sk1.batchSize = 1000 # agent1.sinks.sk1.batchDurationMillis = 1000
11. ElasticSearchSink
ElasticSearchSink writes data to an elastic search cluster. The events will be written by default so that the Kibana graphical interface displays them as if logstash wrote them.
Make sure that the elasticsearch and the Lucene-core jar which are required for our environment are placed in the lib directory of Flume installation.
The Elasticsearch requires a major version of the client JAR to match with that of the server and both are running the same minor version of the JVM. If this is not correct then SerializationExceptions will appear.
Every day, events will be written to a new index. The name will be <indexName>-yyyy-MM-dd where <indexName> is the indexName parameter. The sink will start writing to a new index at midnight UTC.
By default, Events are serialized for elasticsearch by the ElasticSearchLogStashEventSerializer. The Serializer parameter can override this behavior.
The type is the FQCN: org.apache.flume.sink.elasticsearch.ElasticSearchSink
Some of the properties for ElasticSearchSink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be org.apache.flume.sink.elasticsearch.ElasticSearchSink |
hostNames | – | It specifies the comma-separated list of the hostname:port. If in case the port is not present the default port ‘9300’ will be used |
clusterName | elasticsearch | It specifies the name of the ElasticSearch cluster to connect to |
batchSize | 100 | It specifies the number of events to be written per transaction. |
serializer | org.apache.flume.sink.elasticsearch.ElasticSearchLogStashEventSerializer | The ElasticSearchIndexRequestBuilderFactory or ElasticSearchEventSerializer to use. The implementations of either class are accepted but ElasticSearchIndexRequestBuilderFactory is preferred. |
Example for the agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = elasticsearch agent1.sinks.sk1.hostNames = 127.0.0.1:9200,127.0.0.2:9300 agent1.sinks.sk1.indexName = employee_index agent1.sinks.sk1.indexType = bar_type agent1.sinks.sk1.clusterName =employee_cluster agent1.sinks.sk1.batchSize = 500 agent1.sinks.sk1.ttl = 5d agent1.sinks.sk1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer agent1.sinks.sk1.channel = ch1
12. Kite Dataset Sink
Kite Dataset Sink is an experimental sink. This sink writes events to a Kite Dataset.
Kite Dataset sink deserializes the body of each incoming flume event and stores the resulting record in the Kite Dataset. By loading a dataset by URI it determines the target Dataset.
It supports only Avro serialization. The record schema must be passed in the event headers by using any of the two:
a) flume.avro.schema.literal with the JSON schema representation
b) flume.avro.schema.url with a URL where the schema may be found.
Some of the properties for Kite Dataset Sink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be org.apache.flume.sink.kite.DatasetSink |
kite.dataset.uri | – | It specifies the URI of the dataset to open. |
kite.batchSize | 100 | It specifies the number of records to process in each batch. |
kite.dataset.uri | – | It specifies the URI of the repository to open. |
13. Kafka sink
Kafka sink is a Flume Sink implementation which publishes data to a Kafka topic. The main objective behind it is to integrate Apache Flume with Kafka. This makes pull-based processing systems to process the data coming from various Flume sources.
This sink currently supports Kafka server 0.10.1.0 release or higher releases.
Some of the properties for Kafka Sink are:
Property Name | Default Value | Description |
type | – | It specifies the component type. It must be org.apache.flume.sink.kafka.KafkaSink |
kafka.bootstrap.servers | – | It specifies the list of brokers to which the Kafka-Sink will connect to, to get the list of topic partitions. This list can be a partial list of brokers. It is recommended at least two for HA. The format used is comma-separated list of hostname:port |
kafka.topic | default-flume-topic | It specifies the topic in Kafka to which the messages will be published. If we configure this parameter then messages will be published to this topic. In case if the event header contains a “topic” field then the event will be published to that topic overriding the topic configured here. |
flumeBatchSize | 100 | It specifies how many messages should be processed in one batch. Larger batches improve throughput while adding latency. |
kafka.producer.acks | 1 | It specifies how many replicas must acknowledge a message before it is considered successfully written. The values accepted are 0 which means never wait for acknowledgment, 1 which means wait for the leader only, and -1 which means wait for all replicas. Set this to -1 to avoid data loss in some cases of leader failure. |
Example for the agent named agent1, sink sk1, and channel ch1:
agent1.sinks.sk1.channel = ch1 agent1.sinks.sk1.type = org.apache.flume.sink.kafka.KafkaSink agent1.sinks.sk1.kafka.topic = mytopic agent1.sinks.sk1.kafka.bootstrap.servers = localhost:9092 agent1.sinks.sk1.kafka.flumeBatchSize = 20 agent1.sinks.sk1.kafka.producer.acks = 1 agent1.sinks.sk1.kafka.producer.linger.ms = 1 agent1.sinks.sk1.kafka.producer.compression.type = snappy
14. HTTP sink
HTTP sink takes events from the channel and sends them to a remote service by using an HTTP POST request. The content of the event is sent as the POST body.
This sink handles errors depending on the HTTP response returned by a target server.
The HTTP sink back off or ready status is configurable, as is the transaction commit/rollback result and whether the event contributes to the successful event drain count.
If a malformed HTTP response is returned by the server where the status code is not readable then it will result in a backoff signal. In such a case the event is not consumed from the channel.
Some of the properties for HTTP Sink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be http |
endpoint | – | It specifies the fully qualified URL endpoint to POST to |
connectTimeout | 5000 | It specifies the socket connection timeout in milliseconds |
requestTimeout | 5000 | It specifies the maximum request processing time in milliseconds |
contentTypeHeader | text/plain | It specifies the HTTP Content-Type header |
acceptHeader | text/plain | It specifies the HTTP Accept header value |
defaultBackoff | true | It specifies whether to backoff by default on receiving all HTTP status codes |
defaultRollback | true | It specifies whether to rollback by default on receiving all HTTP status codes |
defaultIncrementMetrics | false | It specifies whether to increment metrics by default on receiving all HTTP status codes |
backoff.CODE | – | It configures a specific backoff for an individual code or a group code. |
rollback.CODE | – | It configures a specific rollback for an individual code or a group code. |
incrementMetrics.CODE | – | It configures a specific metrics increment for an individual or a group code. |
Example for the agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = http agent1.sinks.sk1.channel = ch1 agent1.sinks.sk1.endpoint = http://localhost:8080/someuri agent1.sinks.sk1.connectTimeout = 2000 agent1.sinks.sk1.requestTimeout = 2000 agent1.sinks.sk1.acceptHeader = application/json agent1.sinks.sk1.contentTypeHeader = application/json agent1.sinks.sk1.defaultBackoff = true agent1.sinks.sk1.defaultRollback = true agent1.sinks.sk1.defaultIncrementMetrics = false agent1.sinks.sk1.backoff.4XX = false agent1.sinks.sk1.rollback.4XX = false agent1.sinks.sk1.incrementMetrics.4XX = true agent1.sinks.sk1.backoff.200 = false agent1.sinks.sk1.rollback.200 = false agent1.sinks.sk1.incrementMetrics.200 = true
15. Custom sink
This sink is our own implementation for the Sink interface. We must include a custom sink’s class and its dependencies in the agent’s classpath while starting the Flume agent.
Some of the properties for the custom sink are:
Property Name | Default Value | Description |
channel | – | Specify the channels through which an event travels. |
type | – | It specifies the component type. It must be our FQCN |
Example for agent named agent1, sink sk1, and channel ch1:
agent1.channels = ch1 agent1.sinks = sk1 agent1.sinks.sk1.type = org.example.MySink agent1.sinks.sk1.channel = ch1
Summary
I hope the article clearly explained different sinks supported by Flume. The flume sink is the component of the flume agent that consumes data from the flume channel and pushes them on the central repository. The article had explained different flume sinks along with their properties and examples.
Did we exceed your expectations?
If Yes, share your valuable feedback on Google