Apache Flume Sink Processors- Its Types

Boost your career with Free Big Data Courses!!

Another important component of Apache Flume is Flume Sink Processor. In this article, we will study what a sink processor is. The article describes different types of sink processors in Apache Flume. It explains each type of sink processor along with their example and properties.

Let us now start with the introduction to Apache Flume Sink Processors.

Introduction to Flume Sink Processors

Before learning sink processors, we should know what a sink group is.

Sink group

Sink groups allow users to group multiple flume sinks into one entity.

Flume Sink processor is the component of Flume that invokes one sink from the assigned sink group. Sink processor is for providing load balancing capabilities over all the flume sinks inside the sink group. In the case of temporal failure, sink processors are useful for achieving failover from one sink to another.

In simple words, Sink Processors offer –

  • Load balancing capabilities overall flume sinks inside the sink group.
  • Failover capabilities by maintaining an ordered list of sinks inside the sink group.

Types of Flume Sink Processors

The different types of sink processors are:

1. Default Sink Processor

It is the default sink processor. It accepts only a single sink. In the default sink processor, the user is not forced to create a processor for single sinks. Users can follow the source – channel – sink pattern.

Some of the properties required for the default sink processor definition are:

Property nameDefault valuesDescription
sinksIt specifies the space-separated list of sinks participating in the group.
processor.typedefaultThe component type must be default.

Example for the agent named agent1, and sink sk1, sk2:

agent1.sinkgroups = group1
agent1.sinkgroups.group1.sinks = sk1 sk2
agent1.sinkgroups.group1.processor.type = default

2. Failover Sink Processor

This sink processor maintains the prioritized list of sinks. It guarantees the delivery of long events as well. It works by transferring failed sinks to a pool. In the pool, they were assigned a cool-down period. If the same sink increments the sequential failures then that sink will get retired.

The sink that successfully sends an event is restored to the live pool. All the Sinks have associated priority. Larger the number, the higher the priority. If any Sink fails while sending an Event, then the next Sink which is having the highest priority is tried for sending Events.

For example, a sink having priority 100 is activated before the Sink having priority 80. If the priorities are not assigned, then the priority is determined on the basis of the order in which the sinks were specified in the configuration.

Some of the properties required for the failover sink processor definition are:

Property nameDefault valuesDescription
sinksIt specifies the space-separated list of sinks participating in the group.
processor.typedefaultThe component type must be failover.
processor.priority.<sinkName>It specifies the priority value. The <sinkName> must be one of the sink instances associated with the current sink group. Sink with a higher priority value activates firstly. The larger absolute value indicates higher priority.
processor.maxpenalty30000It specifies the maximum backoff period (in millis) for the failed Sink. 

Example for the agent named agent1, and sink sk1, sk2:

agent1.sinkgroups = group1
agent1.sinkgroups.group1.sinks = sk1 sk2
agent1.sinkgroups.group1.processor.type = failoverC
agent1.sinkgroups.group1.processor.priority.sk1 = 5
agent1.sinkgroups.group1.processor.priority.sk2 = 10
agent1.sinkgroups.group1.processor.maxpenalty = 10000

3. Load Balancing Sink Processor

This sink processor provides the ability for load-balancing flow over multiple flume sinks. The load balancing sink processor maintains an indexed list of all the active sinks. Based on this, list the load must be distributed.

The loads distribution is by using round_robin or random selection mechanisms. By default, the round_robin selection mechanism is used. We can override it through configuration. It also supports custom selection mechanisms supported via custom classes inherited from AbstractSinkSelector.

When invoked, this selector takes up the next sink by using its configured selection mechanism and invokes the next selected sink.

For the round_robin and the random selection mechanism, if the selected sink fails in delivering the event, then the processor takes up the next available sink through its configured selection mechanism.

It doesn’t blacklist any of the failing sinks. Instead, it continues to optimistically attempt every available sink.

In case, if all the sinks invocations result in failure, then the selector propagates failure to sink runner.

In case, if backoff is enabled, then the sink processor blacklists all those sinks that fail. It removes them for selection for the given timeout.

When the timeout ends and the sink is still unresponsive, then in such a case the timeout is increased exponentially. This is to avoid potentially getting stuck in long waits on the unresponsive sink.

In round_robin if the backoff is disabled, then all the failed sinks load is passed to the next sink in line. In such a case, there is uneven balancing of  the load.

Some of the properties required for Load balancing sink processor definition are:

Property nameDefault valuesDescription
sinksIt specifies the space-separated list of sinks participating in the group.
processor.typedefaultThe component type must be load_balance.
processor.backofffalseIt specifies whether the failed sinks should be backed off exponentially.
processor.selectorround_robinIt specifies the selection mechanism. This property value must be round_robin or random. It can be FQCN of a custom class that inherits from AbstractSinkSelector.
processor.selector.maxTimeOut30000It is used by backoff selectors to limit exponential backoff (in milliseconds).

Example for the agent named agent1, and sink sk1, sk2:

agent1.sinkgroups = group1
agent1.sinkgroups.group1.sinks = sk1 sk2
agent1.sinkgroups.group1.processor.type = load_balance
agent1.sinkgroups.group1.processor.backoff = true
agent1.sinkgroups.group1.processor.selector = random

4. Custom Sink Processor

At present, Flume does not support Custom Sink Processor.

Summary

The article completely describes different types of sink processors supported by Apache Flume. Sink processors provide the load balancing and failover capability across all sinks inside the sink group. There are three types of sink processors in apache flume.

They are default sink processor, failover sink processor, and load balancing sink processor. The article explained each sink processor along with the examples and properties.

If you are Happy with DataFlair, do not forget to make us happy with your positive feedback on Google

follow dataflair on YouTube

Leave a Reply

Your email address will not be published. Required fields are marked *