Apache Flume Channel | Types of Channels in Flume

1. Apache Flume Channel 

In this Apache Flume Tutorial, we talk about Channels in Flume. Apache Flume channel is nothing but a transient store that receives the events from the source also buffers them till they sink consumes it. However, there are more insights of Flume Channel. There are several types of channel in Apache Flume are Memory Flume Channel, JDBC Flume Channel, Flume Kafka Channel, Flume File Channel, Spillable Memory Flume Channel, Pseudo Transaction Flume Channel, and Custom Flume Channel. 

So, let’s start Apache Flume Channel.

Flume Channels

What is Apache Flume Channel

2. Introduction – Apache Flume Channel

A Flume channel is nothing but a transient store that receives the events from the source also buffers them until they sink consumes it. In simple language, Flume Channel acts as a bridge between the sources and the sinks in Flume.

However, we can say this Flume channel can work with any number of Flume sources and Flume sinks. Basically, they are fully transactional in nature.

Such as − JDBC channel, Flume File channel, Memory channel in Flume, etc.

To be more specific, Flume Channel is the repositories where the events are staged on an agent. Moreover, Source adds the events and Sink remove it.

Read more about Channel Selector in Flume

Hadoop Quiz

Get the most demanding skills of IT Industry - Learn Hadoop

3. Types of Channel in Flume 

There are seven types of Flume Channel:

Flume Channels

Types of Flume Channel

i. Memory Channel: Apache Flume Channel

Basically, with configurable max size, all the events are stored in an in-memory queue. Moreover, it is ideal for flows which need higher throughput. Also, those are prepared to lose the staged data in the event of agent failures. The following information consist Property name with a description of Memory Flume Channel.

Property names with descriptions are:

  • type – The component type name needs to be a memory.
  • capacity (100) – The maximum number of events stored in the Flume channel.
  • transactionCapacity (100) – The maximum number of events the channel will take from a source or give to a sink per transaction.
  • keep-alive (3) –  Timeout in seconds for adding or removing an event.
  • byteCapacityBufferPercentage (20) – Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers.
  • byteCapacity – Maximum total bytes of memory allowed as a sum of all events in this channel. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line). 

So, let’s see an example for agent a1:

a1.channels = c1
a1.channels.c1.type = memory
a1.channels.c1.capacity = 10000
a1.channels.c1.transactionCapacity = 10000
a1.channels.c1.byteCapacityBufferPercentage = 20
a1.channels.c1.byteCapacity = 800000

Let’s look at Apache Flume Architecture & Flume Features and Limitations in Detail

ii. JDBC Channel: Apache Flume Channel

However, all the Flume events here are stored in a persistent storage that’s backed by a database. Moreover, only embedded derby is supported by the JDBC channel currently. Also, we can say it is a very durable channel that’s ideal for flows where recoverability is important. The following information consist Property name with a description of JDBC Flume Channel.

Property names with descriptions are:

  • type – The component type name, needs to be jdbc.
  • db.type (DERBY) – Database vendor, needs to be DERBY.
  • driver.class (org.apache.derby.jdbc.EmbeddedDriver) – Class for vendor’s JDBC driver.
  • driver.url (constructed from other properties) – JDBC connection URL.
  • db.username (“sa”) – User id for db connection.
  • db.password – Password for db connection.
  • connection.properties.file (true)- JDBC Connection property file path.
  • create.schema (true) – If true, then creates db schema if not there.
  • create.index (true) – Create indexes to speed up lookups.
  • create.foreignkey (true)
  • Transaction.isolation Maximum.connections (“READ_COMMITTED” 10) – 
    Isolation level for db session READ_UNCOMMITTED, READ_COMMITTED, SERIALIZABLE, REPEATABLE_READ
    Max connections allowed to db.
  • maximum.capacity 0 (unlimited) – Max number of events in the channel.
  • sysprop.* – DB Vendor specific properties.
  • sysprop.user.home – Home path to store embedded Derby database.

Also, see an example for agent a1:

a1.channels = c1

a1.channels.c1.type = jdbc

Read more about Apache Flume Data Flow

iii. Kafka Channel: Apache Flume Channel

Here, all the events are stored in a Kafka cluster that must be installed separately. Also, it offers high availability and replication. Hence, in case an agent or a Flume kafka broker crashes, the events are immediately available to other sinks
For multiple scenarios, we can use the Kafka channel:

  • With Flume source and sink –

It offers a reliable and highly available channel for events.

  • With Flume source and interceptor but no sink –

For use by other apps, it offers writing Flume events into a Kafka topic.

  • With Flume sink, but no source –

To send events from Kafka to Flume sinks it is a low-latency, fault-tolerant way.  Like  HDFS, HBase or Solr
Moreover, the configuration parameters are organized like:
Related to the channel generally Configuration values are applied at the channel config level. Such as a1.channel.k1.type =

  • Related to Kafka or how the Channel operates Configuration values are prefixed with “kafka.”, (this are analogous to CommonClient Configs). Such as a1.channels.k1.kafka.topic and a1.channels.k1.kafka.bootstrap.servers.
  • By kafka.producer or kafka.consumer, Properties specific to the producer/consumer are prefixed.
  • The Kafka parameter names are used, where possible. Such as bootstrap.servers and acks

With previous versions, this version of flume is backward-compatible. Although, in the table below deprecated properties are indicated. Also, a warning message is logged on startup when they are present in the configuration file. The following information consist Property name with a description of Kafka Flume Channel.

Property names with descriptions are:

  • type – The component type name, needs to be org.apache.flume.channel.kafka.KafkaChannel.
  • kafka.bootstrap.servers – List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port
  • Kafka.topic (flume-channel) – Kafka topic which the channel will use.
  • kafka.consumer.group.id (flume) – Consumer group ID the channel uses to register with Kafka. Multiple channels must use the same topic and group to ensure that when one agent fails another can get the data Note that having non-channel consumers with the same ID can lead to data loss.
  • parseAsFlumeEvent (true) – Expecting Avro datums with FlumeEvent schema in the channel. This should be true if Flume source is writing to the channel and false if other producers are writing into the topic that the channel is using. Flume source messages to Kafka can be parsed outside of Flume by using org.apache.flume.source.avro.AvroFlumeEvent provided by the flume-ng-sdk artifact.
  • migrateZookeeperOffsets (true) – When no Kafka stored offset is found, look up the offsets in Zookeeper and commit them to Kafka. This should be true to support seamless Kafka client migration from older versions of Flume. Once migrated this can be set to false, though that should generally not be required. If no Zookeeper offset is found the kafka.consumer.auto.offset.reset configuration defines how offsets are handled.
  • pollTimeout (500) – The amount of time(in milliseconds) to wait in the “poll()” call of the consumer.
  • defaultPartitionId – Specifies a Kafka partition ID (integer) for all events in this channel to be sent to, unless overriden by partitionIdHeader. By default, if this property is not set, events will be distributed by the Kafka Producer’s partitioner – including by key if specified (or by a partitioner specified by kafka.partitioner.class).
  • partitionIdHeader – When set, the producer will take the value of the field named using the value of this property from the event header and send the message to the specified partition of the topic. If the value represents an invalid partition the event will not be accepted into the channel. If the header value is present then this setting overrides defaultPartitionId.
  • kafka.consumer.auto.offset.reset (latest) – What to do when there is no initial offset in Kafka or if the current offset does not exist any more on the server (e.g. because that data has been deleted): earliest: automatically reset the offset to the earliest offset latest: automatically reset the offset to the latest offset none: throw exception to the consumer if no previous offset is found for the consumer’s group anything else: throw exception to the consumer.
  • kafka.producer.security.protocol (PLAIN TEXT) – Set to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
  • kafka.consumer.security.protocol(PLAIN TEXT) – Same as kafka.producer.security.protocol but for reading/consuming from Kafka.
  • more producer/consumer security props – If using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on producer/consumer.

Read about Apache Flume Interceptors in detail

Deprecated Properties

  • brokerList – List of brokers in the Kafka cluster used by the channel This can be a partial list of brokers, but we recommend at least two for HA. The format is comma separated list of hostname:port.
  • topic (flume-channel) – Use kafka.topic.
  • groupId (flume) – Use kafka.consumer.group.id
  • readSmallestOffset (false) – Use kafka.consumer.auto.offset.rest

However, this is very important to note that since the way the channel is load balanced, there may be duplicate events when the agent first starts up.

So, let’s see an example for agent a1:

a1.channels.channel1.type = org.apache.flume.channel.kafka.KafkaChannel

a1.channels.channel1.kafka.bootstrap.servers = kafka-1:9092,kafka-2:9092,kafka-3:9092

a1.channels.channel1.kafka.topic = channel1

a1.channels.channel1.kafka.consumer.group.id = flume-consumer

Let’s learn  Apache Flume Installation 

iv. File Channel: Apache Flume Channel

The following Table consist Property name with a description of Flume File Channel.

  • type – The component type name, needs to be file.
  • checkpointDir (~/.flume/file-channel/checkpoint) – The directory where checkpoint file will be stored.
  • useDualCheckpoints (false) – Backup the checkpoint. If this is set to true, backupCheckpointDir must be set.
  • backupCheckpointDir – The directory where the checkpoint is backed up to. This directory must not be the same as the data directories or the checkpoint directory.
  • dataDirs (~/.flume/file-channel/data) – Comma-separated list of directories for storing log files. Using multiple directories on separate disks can improve file channel performance.
  • transactionCapacity (10000) – The maximum size of transaction supported by the channel.
  • checkpointInterval (30000) – Amount of time (in millis) between checkpoints
  • maxFileSize (2146435071) – Max size (in bytes) of a single log file.
  • minimumRequiredSpace (524288000) – Minimum Required free space (in bytes). To avoid data corruption, File Channel stops accepting take/put requests when free space drops below this value.
  • capacity (1000000) – Maximum capacity of the channel.
  • keep-alive (3)- Amount of time (in sec) to wait for a put operation.
  • use-log-replay-v1 (false)- Expert: Use old replay logic.
  • use-fast-replay (false)- Expert: Replay without using queue.
  • checkpointOnClose (true) – Controls if a checkpoint is created when the channel is closed. Creating a checkpoint on close speeds up subsequent startup of the file channel by avoiding replay.
  • encryption.activeKey – Key name used to encrypt new data.
  • encryption.cipherProvider – Cipher provider type, supported types: AESCTRNOPADDING
  • encryption.keyProvider – Key provider type, supported types: JCEKSFILE
  • encryption.keyProvider.Keystore file – Path to the keystore file
  • encrpytion.keyProvider.keyStorePasswordFile – Path to the keystore password file.
  • encryption.keyProvider.keys – List of all keys (e.g. history of the activeKey setting)
  • encyption.keyProvider.keys.*.passwordFile – Path to the optional key password file.

Although, it is very important to note that the file channel uses paths for checkpoint and data directories that are within the user home, by default. Since you have more than one file channel instances active within the agent, only one will be able to lock the directories and cause the other channel initialization to fail. Hence, it is essential that you offer explicit paths to all the configured channels, preferably on different disks.

So, let’s see an example for agent a1:

For example,

a1.channels = c1

a1.channels.c1.type = file

a1.channels.c1.checkpointDir = /mnt/flume/checkpoint

a1.channels.c1.dataDirs = /mnt/flume/data

v. Spillable Memory Channel: Apache Flume Channel

However, here, all the events are stored on disk and in an in-memory queue. So, the disk serves as overflow and the in-memory queue as the primary store. Moreover, using an embedded File channel the disk store is managed. Likewise, additional incoming events are stored in the file channel, when the in-memory queue is full.  

Although, we can say this channel is ideal for flows that need a high throughput of memory channel during normal operation. Still, at the same time need the larger capacity of the file channel for better tolerance of intermittent sink side outages or drop in drain rates. The throughput will reduce approximately to file channel speeds during such abnormal situations.

Also, make sure that only the events stored on disk are recovered when the agent comes online, in case of an agent crash or restart. Thus, it is not recommended for use in production this channel is currently experimental. See file channelAny file channel property with the exception of ‘keep-alive’ and ‘capacity’ can be used. The keep-alive of file channel is managed by spillable memory channel. Use ‘overflowCapacity’ to set the File channel’s capacity. The following information consist Property name with a description of Spillable Memory Flume Channel.

  • type – The component type name, needs to be SPILLABLEMEMORY
  • memoryCapacity (10000) – Maximum number of events stored in memory queue. To disable use of in-memory queue, set this to zero.
  • overflowCapacity (100000000) – Maximum number of events stored in overflow disk (i.e File channel). To disable use of overflow, set this to zero.
  • overflowTimeout (3) – The number of seconds to wait before enabling disk overflow when memory fills up.
  • byteCapacityBufferPercentage (20) – Defines the percent of buffer between byteCapacity and the estimated total size of all events in the channel, to account for data in headers.
  • byteCapacity – Maximum bytes of memory allowed as a sum of all events in the memory queue. The implementation only counts the Event body, which is the reason for providing the byteCapacityBufferPercentage configuration parameter as well. Defaults to a computed value equal to 80% of the maximum memory available to the JVM (i.e. 80% of the -Xmx value passed on the command line).
  • avgEventSize (500) – The estimated average size of events, in bytes, going into the channel.
  • <file channel properties> (see file channel) – We can use any file channel property with the exception of ‘keep-alive’ and ‘capacity’ . The keep-alive of file channel is managed by Spillable Memory Channel. Use ‘overflowCapacity’ to set the File channel’s capacity.

However, we consider the in-memory queue full if either it reaches the memoryCapacity or byteCapacity limit.

Also, see an example for agent  a1:

For example,

a1.channels = c1

a1.channels.c1.type = SPILLABLEMEMORY

a1.channels.c1.memoryCapacity = 10000

a1.channels.c1.overflowCapacity = 1000000

a1.channels.c1.byteCapacity = 800000

a1.channels.c1.checkpointDir = /mnt/flume/checkpoint

a1.channels.c1.dataDirs = /mnt/flume/data

Look at Best Books to Learn Apache Flume

vi. Pseudo Transaction Channel: Apache Flume Channel

While using it make sure that it is only for unit testing purposes. However, we can not use it for production use. The following information consist Property name with a description of Pseudo Transaction Flume Channel.

  • type – The component type name needs to be org.apache.flume.channel.PseudoTxnMemoryChannel
  • capacity (50) – The max number of events stored in the channel
  • keep-alive (3) – Timeout in seconds for adding or removing an event.

vii. Custom Channel: Apache Flume Channel

Basically, we can say it is our own implementation of the channel interface. Moreover, while when starting the Flume agent a custom channel’s class and its dependencies must include in the agent’s classpath. However, its FQCN is the type of the custom channel. The following information consist Property name with a description of Custom Channel Flume Channel.

type – The component type name, need to be an FQCN.

So, let’s see an example for agent  a1:

For example,

a1.channels = c1
a1.channels.c1.type = org.example.MyChannel

So, this was all in Apache Flume Channel. Hope you liken our explanation.

3. Apache Flume Channel – Conclusion

As a result, we have seen What is Apache Flume Channel types of Flume Channel: Memory Flume Channel, JDBC Flume Channel, Flume Kafka Channel, Flume File Channel, Spillable Memory Flume Channel, Pseudo Transaction Flume Channel, and Custom Flume Channel. Also, we have seen complete content of Apache Flume Channel to understand it well. Moreover, we have covered several examples to completely. Still, if any query arises, feel free to ask in the comment section.
For reference

Leave a Reply

Your email address will not be published. Required fields are marked *