Apache Flume Sink – Types of Sink in Flume

1. Apache Flume Sink

Basically, we use Flume Sink components to store data into centralized stores in Flume. In this Apache Flume Tutorial, “Apache Flume Sink” we will learn the What is Flume Sink. Also, we will see types of Sink in Flume: HDFS Sink, Hive Sink, Logger Sink, Thrift Sink, IRC Sink, File Roll Sink, HBase Sink, MorphlineSolrSink, ElasticSearchSink, Kite Dataset Sink, Flume Kafka Sink, HTTP Sink, Custom Sink in flume, Apache Flume Avro Sink example to understand well each and every Apache Flume Sink. Moreover, we will cover Apache Flume Sink examples to learn in depth.

Flume Sinks

Introduction – Apache Flume SInk

2. What is Apache Flume Sink

While it comes to store data into centralized stores we use Flume Sink component. Stores such as HBase and HDFS. Furthermore, it consumes events from the channels and then delivers it to the destination. However, it is possible to say that sink’s destination is might be another agent or the central stores.
For example − HDFS Flume sink
In addition, it is very important to note that it is possible to have a flume agent with many numbers of sources, sinks, and channels.
Let’s discuss types of Apache Flume Sink:
Flume SinksApache Flume Sink

Hadoop Quiz

Get the most demanding skills of IT Industry - Learn Hadoop

3. Types of Apache Flume Sink

Below we discuss several types of Sink in Flume in detail:

i. HDFS Sink: Apache Flume Sink

When we need to write events into the Hadoop Distributed File System (HDFS) we use HDFS Sink in Flume. Recently, it supports creating text and sequence files. Moreover, it also supports compression in both file types. Though, on the basis of elapsed time or size of data or number of events, we can roll the files periodically. Rolling files means to close current file and create a new one. Also, it is important to note that we need a version of Hadoop that supports the sync() call.
HDFS Flume Sink supports following escape sequences:

AliasDescription
%{host}Substitute value of event header named “host”. Arbitrary header names are supported.
%tUnix time in milliseconds
%alocale’s short weekday name (Mon, Tue, …)
%Alocale’s full weekday name (Monday, Tuesday, …)
%blocale’s short month name (Jan, Feb, …)
%Blocale’s long month name (January, February, …)
%clocale’s date and time (Thu Mar 3 23:05:25 2005)
%dday of month (01)
%eday of month without padding (1)
%Ddate; same as %m/%d/%y
%Hhour (00..23)
%Ihour (01..12)
%jday of year (001..366)
%khour ( 0..23)
%mmonth (01..12)
%nmonth without padding (1..12)
%Mminute (00..59)
%plocale’s equivalent of am or pm
%sseconds since 1970-01-01 00:00:00 UTC
%Ssecond (00..59)
%ylast two digits of year (00..99)
%Yyear (2010)
%z+hhmm numeric timezone (for example, -0400)
%[localhost]Substitute the hostname of the host where the agent is running
%[IP]Substitute the IP address of the host where the agent is running
%[FQDN]Substitute the canonical hostname of the host where the agent is running

However, it is very important to note that to obtain the hostname the escape strings %[localhost], %[IP] and %[FQDN] all rely on Java’s ability. Although, that may fail in some networking environments.
America/Los_Angeles.
This number should be increased if many HDFS timeout operations are occurring.
Read more about Apache Flume Architecture
hdfs.useLocalTimeStampfalseUse the local time (instead of the timestamp from the event header) while replacing the escape sequences of HDFS Flume Sink.

NameDefaultDescription
channel
typeThe component type name, needs to be hdfs
hdfs.pathHDFS directory path (eg hdfs://namenode/flume/webdata/)
hdfs.fileSuffixSuffix to append to file (eg .avro – NOTE: period is not automatically added)
hdfs.inUsePrefixPrefix that is used for temporal files that flume actively writes into
hdfs.inUseSuffix .tmpSuffix that is used for temporal files that flume actively writes into
hdfs.rollInterval 30Number of seconds to wait before rolling current file (0 = never roll based on time interval)
hdfs.rollSize1024File size to trigger roll, in bytes (0: never roll based on file size)
hdfs.rollCount10Number of events written to file before it rolled (0 = never roll based on number of events)
hdfs.idleTimeout0Timeout after which inactive files get closed (0 = disable automatic closing of idle files)
hdfs.batchSize100number of events written to file before it is flushed to HDFS
hdfs.codeCCompression codec. one of following : gzip, bzip2, lzo, lzop, snappy
hdfs.fileTypeSequenceFileFile format: currently SequenceFile, DataStream or CompressedStream (1)DataStream will not compress output file and please don’t set codeC (2)CompressedStream requires set hdfs.codeC with an available codeC
hdfs.maxOpenFiles5000Allow only this number of open files. If this number is exceeded, the oldest file is closed.
hdfs.minBlockReplicasSpecify minimum number of replicas per HDFS block. If not specified, it comes from the default Hadoop config in the classpath.
hdfs.writeFormatWritableFormat for sequence file records. One of Text or Writable. 
hdfs.callTimeout10000Number of milliseconds allowed for HDFS operations, such as open, write, flush, close.
hdfs.threadsPoolSize10Number of threads per HDFS Flume sink for HDFS IO ops (open, write, etc.)
hdfs.rollTimerPoolSize1Number of threads per HDFS Flume sink for scheduling timed file rollinghdfs.kerberosPrincipalKerberos user principal for accessing secure HDFShdfs.kerberosKeytabKerberos keytab for accessing secure HDFShdfs.proxyUserhdfs.roundfalseShould the timestamp be rounded down (if true, affects all time based escape sequences except %t)hdfs.roundValue1Rounded down to the highest multiple of this (in the unit configured using hdfs.roundUnit), less than current time.
hdfs.roundUnitsecondThe unit of the round down value – second, minute or hour.
hdfs.timeZonelocaltimeName of the timezone that should be used for resolving the directory path, e.g.
hdfs.closeTries0Number of times Flume sink must try renaming a file, after initiating a close attempt. If set to 1, this  Apache Flume sink will not re-try a failed rename (due to, for example, NameNode or DataNode failure), and may leave the file in an open state with a .tmp extension. 
hdfs.retryInterval180Time in seconds between consecutive attempts to close a file. Each close call costs multiple RPC round-trips to the Namenode, so setting this too low can cause a lot of load on the name node. If set to 0 or less, Apache Flume sink will not attempt to close the file if the first attempt fails, and may leave the file open or with a ”.tmp” extension.
serializerTEXTOther possible options include avro_event or the fully-qualified class name of an implementation of the EventSerializer.Builder interface.
serializer.*

So, let’s See an example for agent  a1:
For example,
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hdfs
a1.sinks.k1.channel = c1
a1.sinks.k1.hdfs.path = /flume/events/%y-%m-%d/%H%M/%S
a1.sinks.k1.hdfs.filePrefix = events-
a1.sinks.k1.hdfs.round = true
a1.sinks.k1.hdfs.roundValue = 10
a1.sinks.k1.hdfs.roundUnit = minute
Read more about Best Books to Learn Apache Flume

ii. Hive Sink: Apache Flume Sink

Basically, into a Hive table or partition Hive sink streams events containing delimited text or JSON data directly. Moreover, by using Hive transactions Events are written. Although, once a set of events are committed to Hive, they become immediately visible to Hive queries. The below table shows name & description of Hive Flume Sink. 

NameDefaultDescription
channel
typeThe component type name, needs to be hive
hive.metastoreHive metastore URI (eg thrift://a.b.com:9083 )
hive.databaseHive database name
hive.tableHive table name
hive.partitionComma separate list of partition values identifying the partition to write to. May contain escape sequences. E.g: If the table is partitioned by (continent: string, country :string, time : string) then ‘Asia,India,2014-02-26-01-21’ will indicate continent=Asia,country=India,time=2014-02-26-01-21
hive.txnsPerBatchAsk100Hive grants a batch of transactions instead of single transactions to streaming clients like Flume. This setting configures the number of desired transactions per Transaction Batch. Data from all transactions in a single batch end up in a single file. Flume will write a maximum of batchSize events in each transaction in the batch. This setting in conjunction with batchSize provides control over the size of each file. Note that eventually Hive will transparently compact these files into larger files.
heartBeatInterval240(In seconds) Interval between consecutive heartbeats sent to Hive to keep unused transactions from expiring. Set this value to 0 to disable heartbeats.
autoCreatePartitionstrueFlume will automatically create the necessary Hive partitions to stream to
batchSize15000Max number of events written to Hive in a single Hive transaction
maxOpenConnections500Allow only this number of open connections. If this number is exceeded, the least recently used connection is closed.
callTimeout10000(In milliseconds) Timeout for Hive & HDFS I/O operations, such as openTxn, write, commit, abort.
serializerSerializer is responsible for parsing out field from the event and mapping them to columns in the hive table. Choice of serializer depends upon the format of the data in the event. Supported serializers: DELIMITED and JSON
roundUnitminute The unit of the round down value – second, minute or hour.
roundValue1Rounded down to the highest multiple of this (in the unit configured using hive.roundUnit), less than current time
timeZoneLocal TimeName of the timezone that should be used for resolving the escape sequences in partition, e.g. America/Los_Angeles.
useLocalTimeStampFALSEUse the local time (instead of the timestamp from the event header) while replacing the escape sequences.

So, let’s see an example Hive table :
create table weblogs ( id int , msg string )
partitioned by (continent string, country string, time string)
clustered by (id) into 5 buckets
stored as orc;
Also, see an example for agent a1:
a1.channels = c1
a1.channels.c1.type = memory
a1.sinks = k1
a1.sinks.k1.type = hive
a1.sinks.k1.channel = c1
a1.sinks.k1.hive.metastore = thrift://127.0.0.1:9083
a1.sinks.k1.hive.database = logsdb
a1.sinks.k1.hive.table = weblogs
a1.sinks.k1.hive.partition = asia,%{country},%y-%m-%d-%H-%M
a1.sinks.k1.useLocalTimeStamp = false
a1.sinks.k1.round = true
a1.sinks.k1.roundValue = 10
a1.sinks.k1.roundUnit = minute
a1.sinks.k1.serializer = DELIMITED
a1.sinks.k1.serializer.delimiter = “\t”
a1.sinks.k1.serializer.serdeSeparator = ‘\t’
a1.sinks.k1.serializer.fieldnames =id,,msg
Read more Features of Apache Flume

iii. Logger Sink: Apache Flume Sink

Basically, at INFO level Logs event. Moreover, we use it for testing/debugging purpose typically. However, it is an exception which doesn’t require the extra configuration explained in the Logging raw data section. Following tables states as property name & description of Logger Flume Sink.

Property NameDefaultDescription
channel
typeThe component type name, needs to be logger
maxBytesToLog16Maximum number of bytes of the Event body to log

So, let’s see an example for agent  a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = logger
a1.sinks.k1.channel = c1
Read about Flume Troubleshooting – Flume Known Issues 

iv. Avro Sink: Apache Flume Sink

Basically, one half of Flume’s tiered collection is supported by Flume Avro Sink. Here,  from the configured Channel in batches of the configured batch size the events are taken.
This will force the Avro Flume Sink to reconnect to the next hop. This will allow the Flume sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.

Property NameDefaultDescription
channel
typeThe component type name, needs to be avro.
hostnameThe hostname or IP address to bind to.
portThe port # to listen on.
batch-size100number of event to batch together for send.
connect-timeout20000Amount of time (ms) to allow for the first (handshake) request.
request-timeout20000Amount of time (ms) to allow for requests after the first.
reset-connection-intervalnoneAmount of time (s) before the connection to the next hop is reset.
compression-typenoneThis can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
compression-level6The level of compression to compress event. 0 = no compression and 1-9 is compression. The higher the number the more compression
Ssl
truststore
FalseSet to true to enable SSL for this Flume AvroSink. When configuring SSL, you can optionally set a “truststore”, “truststore-password”, “truststore-type”, and specify whether to “trust-all-certs”.
trust-all-certs false If this is set to true, SSL server certificates for remote servers (Avro Sources) will not be checked.
truststore-passwordThe password for the specified truststore.
truststore-type JKSThe type of the Java truststore. This can be “JKS” or other supported Java truststore type.
exclude-protocolsSSLv3Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
maxIoWorkers2 * the number of available processors in the machineThe maximum number of I/O worker threads. This is configured on the NettyAvroRpcClient NioClientSocketChannelFactory.

So, let’s see an Apache Flume Avro example for agent  a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = avro
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
Read more about Apache Flume Channel Selectors

v. Thrift Sink: Apache Flume Sink

One half of Flume’s tiered collection supports in this form. Moreover, from the configured Channel in batches of the configured batch size, the events are taken.
To start in secure mode by enabling Kerberos authentication, we can configure Thrift Flume Sink. However, the Thrift sink in Flume should also operate in a secure mode, to communicate with a Thrift source started in secure mode.
This will force the Thrift Sink in Flume to reconnect to the next hop. This will allow the Flume sink to connect to hosts behind a hardware load-balancer when news hosts are added without having to restart the agent.

Property NameDefaultDescription
channel
typeThe component type name, needs to be thrift.
hostnameThe hostname or IP address to bind to.
portThe port # to listen on.
batch-size100number of event to batch together for send.
connect-timeout20000Amount of time (ms) to allow for the first (handshake) request.
request-timeout20000Amount of time (ms) to allow for requests after the first.
connection-reset-intervalnoneAmount of time (s) before the connection to the next hop is reset.
sslfalseSet to true to enable SSL for this ThriftSink in Flume. When configuring SSL, you can optionally set a “truststore”, “truststore-password” and “truststore-type”
truststoreThe path to a custom Java truststore file. Flume uses the certificate authority information in this file to determine whether the remote Thrift Source’s SSL authentication credentials should be trusted. If not specified, the default Java JSSE certificate authority files (typically “jssecacerts” or “cacerts” in the Oracle JRE) will be used.
truststore-passwordThe password for the specified truststore.
truststore-typejksThe type of the Java truststore. This can be “JKS” or other supported Java truststore type.
exclude-protocolssslv3Space-separated list of SSL/TLS protocols to exclude
kerberosfalseSet to true to enable kerberos authentication. In kerberos mode, client-principal, client-keytab, and server-principal are required for successful authentication and communication to a kerberos enabled Thrift Source.
client-principalThe kerberos principal used by the Thrift Sink to authenticate to the kerberos KDC.
client-keytabThe keytab location used by the Thrift Sink in combination with the client-principal to authenticate to the kerberos KDC.
server-principalThe kerberos principal of the Thrift Source to which the Thrift Sink is configured to connect to.

So, let’s see an example for agent a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = thrift
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = 10.10.10.10
a1.sinks.k1.port = 4545
Read more about Flume Event in Detail

vi. IRC Sink: Apache Flume Sink

Basically, from attached channel and relays those to configured IRC destinations, this IRC Flume sink takes messages.
The below table shows property name with their description of IRC Flume Sink.

Property NameDefaultDescription
channel
typeThe component type name, needs to be irc
hostnameThe hostname or IP address to connect to
port6677The port number of remote host to connect
nickNick name
userUser name
passwordUser password
chanchannel
name
splitlines(boolean)
splitcharsnline separator (if you were to enter the default value into the config file, then you would need to escape the backslash, like this: “\n”)

So, let’s see an example for agent a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = irc
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = irc.yourdomain.com
a1.sinks.k1.nick = flume
a1.sinks.k1.chan = #flume
Read more Apache Flume Data Flow in Detail

vii. File Roll Sink: Apache Flume Sink

Basically, File Flume Sink Stores events on the local filesystem. The below table shows property name with their description of File Roll Flume Sink.

Property NameDefaultDescription
channel
typeThe component type name, needs to be file_roll.
sink.directoryThe directory where files will be stored
sink.pathManagerDEFAULTThe PathManager implementation to use.
sink.pathManager.extensionThe file extension if the default PathManager is used.
sink.pathManager.prefixA character string to add to the beginning of the file name if the default PathManager is used
sink.rollInterval30Roll the file every 30 seconds. Specifying 0 will disable rolling and cause all events to be written to a single file
sink.serializerTEXTOther possible options include avro_event or the FQCN of an implementation of EventSerializer.Builder interface.
batchSize100

So, let’s see an example for agent a1:
a1.channels = c1 

a1.sinks = k1
a1.sinks.k1.type = file_roll
a1.sinks.k1.channel = c1
a1.sinks.k1.sink.directory = /var/log/flume
Let’s read some important interview questions of Apache Flume with answers

viii. Null Sink: Apache Flume Sink

Null Flume sinks always discard all events it receives from the channel. The below table shows property name with their description of Null Flume Sink.

Property NameDefaultDescription
channel
typeThe component type name, needs to be null.
batchSize100

So, let’s see an example for agent a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = null
a1.sinks.k1.channel = c1

ix. HBase Sink: Apache Flume Sink

Basically, it writes data to HBase. Moreover, from the first hbase-site.xml encountered in the classpath the Hbase configuration is picked up. To convert the events into HBase puts and/or increments we use a class implementing HbaseEventSerializer which is specified by the configuration. Moreover, the Flume sink will replay all events in that transaction, in the event of Hbase failing to write certain events. The below table shows property name with their description of HBase Flume Sink.

Property NameDefaultDescription
Channel
typeThe component type name, needs to be hbase
tableThe name of the table in Hbase to write to.
columnfamilyThe column family in Hbase to write to.
ZookeeperQuorumThe quorum spec. This is the value for the property hbase.zookeeper.quorum in hbase-site.xml
znodeParent/hbaseThe base path for the znode for the -ROOT- region. Value of zookeeper.znode.parent in hbase-site.xml
batchSize100Number of events to be written per txn.

So, let’s see an example for agent named a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = hbase
a1.sinks.k1.table = foo_table
a1.sinks.k1.columnFamily = bar_cf
a1.sinks.k1.serializer = org.apache.flume.sink.hbase.RegexHbaseEventSerializer
a1.sinks.k1.channel = c1
Let’s know about Apache Flume interceptors

x. MorphlineSolrSink: Apache Flume Sink

While it comes to extracts data from Flume events, transforms it, and loads it in near-real-time into Apache Solr servers we use this sink. Further that in turn serve queries to end users or search applications.
In addition, it can process arbitrary heterogeneous raw data from disparate data sources. Afterwards, turn it into a data model that is used to search applications. The following table shows MorphlineSolrSink of Apache Flume sink.

Property NameDefaultDescription
channel
typeThe component type name, needs to be org.apache.flume.sink.solr.morphline.
MorphlineSolrSink in Flume.
morphlineFileThe relative or absolute path on the local
file system to the morphline configuration file.
Example: /etc/flume-ng/conf/morphline.conf
morphlineIdnullOptional name used to identify a morphline if there are multiple morphlines in a morphline config file
batchSize1000The maximum number of events to take per flume transaction.
batchDurationMillis1000The maximum duration per flume transaction (ms). The transaction commits after this duration or when batchSize is exceeded, whichever comes first.
handlerClassorg.apache.flume.
sink.solr.morphline.
MorphlineHandlerImpl
The FQCN of a class implementing org.apache.flume.sink.solr.
morphline.MorphlineHandler
isProductionModefalseThis flag should be enabled for mission critical,
large-scale online production systems that
need to make progress without downtime when
unrecoverable exceptions occur. Corrupt or malformed
parser input data, parser bugs, and errors related to unknown
Solr schema fields produce unrecoverable exceptions.
recoverable
ExceptionClasses
org.apache.solr.
client.solrj.
SolrServerException
Comma separated list of recoverable exceptions that tend to be transient, in which case the corresponding task can be retried. Examples include network connection errors, timeouts, etc. When
the production mode flag is set to true, the recoverable exceptions configured using this parameter will not be ignored and hence will lead to retries.
isIgnoring
RecoverableExceptions
FALSEThis flag should be enabled, if an unrecoverable exception is accidentally misclassified as recoverable. This enables the sink to make progress and avoid retrying an event forever.

So, let’s see an example for agent a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.apache.flume.sink.solr.morphline.MorphlineSolrSink
a1.sinks.k1.channel = c1
a1.sinks.k1.morphlineFile = /etc/flume-ng/conf/morphline.conf
# a1.sinks.k1.morphlineId = morphline1
# a1.sinks.k1.batchSize = 1000
# a1.sinks.k1.batchDurationMillis = 1000
Let’s study Apache Flume Use Cases – Future Scope

xi. ElasticSearchSink: Apache Flume Sink

Basically, to an elasticsearch cluster this Apache Flume sink writes data. However, events will be written By default, so that the Kibana graphical interface can display them – just as if logstash wrote them.
However, it is also important to note that Header substitution is handy to use the value of an event header to dynamically decide the indexName. Whereas, indexType to use when storing the event. Below table states the property name & description of ElasticSearchSink of Flume sink.

Property NameDefaultDescription
channel
typeThe component type name, needs to be org.
apache.flume.sink.elasticsearch.ElasticSearchSink
hostnamesComma separated list of hostname:port, if the port is not
present the default port ‘9300’ will be used
indexNameflume The name of the index which the date will be appended to.
Example ‘flume’ ->
‘flume-yyyy-MM-dd’ Arbitrary header substitution
is supported, eg. %{header} replaces with value of named event header
indexTypelogsThe type to index the document to, defaults to ‘log’ Arbitrary header
substitution is supported, eg. %{header} replaces with value of named event header
clusterNameelasticsearchName of the ElasticSearch cluster to connect to
batchSize100Number of events to be written per txn
ttlTTL in days, when set will cause the expired documents to be deleted automatically, if not set documents will never be automatically deleted.
TTL is accepted both in the earlier form of integer only e.g. a1.sinks.k1.ttl = 5 and also with a qualifier ms (millisecond), s (second), m (minute), h (hour), d (day) and w (week).
Example a1.sinks.k1.ttl = 5d will set TTL to 5 days. Follow 
serializerorg.apache.flume.sink.elasticsearch.
ElasticSearchLogStashEventSerializer
The ElasticSearchIndexRequestBuilderFactory or
ElasticSearchEventSerializer to use.
Implementations of either class are accepted but
ElasticSearchIndexRequestBuilderFactory is preferred
serializer.*Properties to be passed to the serializer.

So, let’s see an example for agent a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = elasticsearch
a1.sinks.k1.hostNames = 127.0.0.1:9200,127.0.0.2:9300
a1.sinks.k1.indexName = foo_index
a1.sinks.k1.indexType = bar_type
a1.sinks.k1.clusterName = foobar_cluster
a1.sinks.k1.batchSize = 500
a1.sinks.k1.ttl = 5d
a1.sinks.k1.serializer = org.apache.flume.sink.elasticsearch.ElasticSearchDynamicSerializer
a1.sinks.k1.channel = c1
How to install Apache Flume in your System

xii. Kite Dataset Sink: Apache Flume Sink

Kite Dataset Flume sink is nothing but an Experimental sink that writes events to a Kite Dataset. Basically, it will deserialize the body of each incoming event. Also, stores the resulting record in a Kite Dataset. Moreover, by loading a dataset by URI it determines target Dataset. The below table shows property name with their description of Kite Dataset Sink in Flume.

Property NameDefaultDescription
channel
typeMust be org.apache.flume.sink.kite.DatasetSink
kite.dataset.uriURI of the dataset to open
kite.repo.uriURI of the repository to open (deprecated; use kite.dataset.uri instead)
kite.dataset.namespaceNamespace of the Dataset where records will be written 
kite.dataset.nameName of the Dataset where records will be written (deprecated; use kite.dataset.uri instead)
kite.batchSize100Number of records to process in each batch
auth.kerberosKeytabKerberos keytab location (local FS) for the principal
auth.proxyUserThe effective user for HDFS actions, if different from the kerberos principal

xiii. Kafka Sink: Apache Flume Sink

Basically, it is an Apache Flume Sink implementation that can publish data to a Kafka topic. However, to integrate Flume with Kafka is one of the objectives of it. So that pull based processing systems can process the data coming from various Flume sources. The below table shows property name with their description of Kafka Sink in Flume.
Note: It currently supports Kafka 0.9.x series of releases. This version of Flume no longer supports Older Versions (0.8.x) of  Flume Kafka.

Property NameDefaultDescription
typeMust be set to org.apache.flume.sink.kafka.KafkaSink
kafka.bootstrap.serversList of brokers Flume Kafka-Sink will connect to, to get the list of topic partitions This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
kafka.topicdefault-flume-topicThe topic in Kafka to which the messages will be published.
Other Kafka Producer PropertiesThese properties are used to configure the Kafka Producer. Any producer property supported by Kafka can be used. The only requirement is to prepend the property name with the prefix kafka.producer. For example: kafka.producer.linger.ms

xiv. HTTP Sink: Apache Flume Sink

Basically, HTTP Flume sink takes events from the channel, and send those events to a remote server using an HTTP POST request. Moreover, here the event content is sent to the POST body. The below table shows property name with their description of HTTP Flume Sink.

Property NameDefault Description
channel
typeThe component type name, needs to be http
endpointThe fully qualified URL endpoint to POST to
connectTimeout5000The socket connection timeout in milliseconds
requestTimeout5000The maximum request processing time in milliseconds
contentTypeHeadertext/plainThe HTTP Content-Type header
acceptHeadertext/plainThe HTTP Accept header value
defaultBackofftrueWhether to backoff by default on receiving all HTTP status codes
defaultRollbacktrueWhether to rollback by default on receiving all HTTP status codes
defaultIncrementMetricsfalseWhether to increment metrics by default on receiving all HTTP status codes
backoff.CODEConfigures a specific backoff for an individual (i.e. 200) code or a group (i.e. 2XX) code
rollback.CODEConfigures a specific rollback for an individual (i.e. 200) code or a group (i.e. 2XX) code
incrementMetrics.CODEConfigures a specific metrics increment for an individual (i.e. 200) code or a group (i.e. 2XX) code

However, without any request being made to the HTTP endpoint any empty or null events are consumed.
So, let’s see an example for agent a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = http
a1.sinks.k1.channel = c1
a1.sinks.k1.endpoint = http://localhost:8080/someuri
a1.sinks.k1.connectTimeout = 2000
a1.sinks.k1.requestTimeout = 2000
a1.sinks.k1.acceptHeader = application/json
a1.sinks.k1.contentTypeHeader = application/json
a1.sinks.k1.defaultBackoff = true
a1.sinks.k1.defaultRollback = true
a1.sinks.k1.defaultIncrementMetrics = false
a1.sinks.k1.backoff.4XX = false
a1.sinks.k1.rollback.4XX = false
a1.sinks.k1.incrementMetrics.4XX = true
a1.sinks.k1.backoff.200 = false
a1.sinks.k1.rollback.200 = false
a1.sinks.k1.incrementMetrics.200 = true

xv. Custom Sink: Apache Flume Sink

However, it is our own implementation of Flume Sink interface. Although, when starting the Flume agent a custom sink’s class and its dependencies must include. Alos, the type of the custom sink is its FQCN. The below table shows property name with their description of Custom Flume sink.

Property NameDefaultDescription
channel
typeThe component type name, needs to be your FQCN

So, let’s see an example for agent a1:
a1.channels = c1
a1.sinks = k1
a1.sinks.k1.type = org.example.MySink
a1.sinks.k1.channel = c1
Follow this link to know how to transfer From Flume to HDFS

3. Conclusion

As a result, this Apache Flume Tutorial, we have covered all the content of Flume Sink. Moreover, we have seen all types of Flume Sink- HDFS Sink, Hive Sink, Logger Sink, Thrift Sink, Apache Flume Avro Sink, IRC Sink, File Roll Sink, HBase Sink, MorphlineSolrSink, ElasticSearchSink, Kite Dataset Sink, Flume Kafka Sink, HTTP Sink, and Custom Sink. Also, we have seen Apache Flume Sink examples and Apache Flume avro sink example. Still, if you want to ask any query, feel free to ask in the comment section.
For reference

Leave a Reply

Your email address will not be published. Required fields are marked *

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.