Apache Flume Source – Types of Flume Source

Boost your career with Free Big Data Courses!!

In this article, you will explore different types of Flume sources. The article explains different Flume sources like Avro source, Thrift source, Exec source, JMS source, Spooling Directory source, Kafka source, NetCat source, and some more. The article also explains the properties associated with each source.

Introduction to Flume Source

Apache Flume source is the component of the Flume agent which receives data from external sources and passes it on to the one or more channels. It consumes data from an external source like a web server.

The external data source sends data to Apache Flume in a format that is recognizable by the target Flume source.

There are many sources available in Apache Flume. Let us now explore different Flume sources along with the properties and examples.

1. Avro Source

Avro source receives events from the external Avro client stream. It listens to it through the Avro port number. When an Avro source is paired with the built-in Avro Sink on another Flume agent, then it can create tired collection topologies.

Some of the properties for Avro source are:

Property NameDefault valueDescription
channels

(Required)

Specify the channels through which an event travels.
type

(Required)

The component type name must be avro
bind

(Required)

Specify the hostname or IP address to listen on.
port

(Required)

Specify the port # to bind to. 
threads

(Optional)

Specify the maximum number of worker threads to spawn
selector.type

(Optional)

Specify the channel selector type.
interceptors

(Optional)

Space-separated list of interceptors
compression-type

(Optional)

noneIt can be “none” or “deflate”. The compression-type should match the compression-type of the matching AvroSource.

Example for agent named agent1, source src, and channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = avro
agent1.sources.src.channels = ch1
agent1.sources.src.bind = 0.0.0.0
agent1.sources.src.port = 4141

2. Thrift Source

Thrift Source receives events from the external Thrift client. It listens to it through Thrift port. When a Thrift source gets paired with the built-in Thrift Sink on the different Flume agent, then it could create tiered collection topologies.

We can configure the Thrift source to start in a secure mode. We can do so by enabling Kerberos authentication.

The thrift source uses two properties the agent-principal and agent-key tab to authenticate to the Kerberos KDC.

Some of the properties of Thrift source are:

Property NameDefault valueDescription
channels

(Required)

Specify the channels through which an event travels.
type

(Required)

The component type name must be thrift
bind

(Required)

Specify the hostname or IP address to listen on.
port

(Required)

Specify the port # to bind to. 
kerberos

(Optional)

falseIt specifies whether to enable Kerberos security or not. 

It is set to true to enable Kerberos authentication. 

In Kerberos mode, for successful authentication agent-principal and agent-keytab are required. 

In secure mode, Thrift source will accept connections only from those Thrift clients who are having Kerberos enabled and successfully authenticated to the Kerberos KDC.

agent-principal

(Optional)

It specifies the Kerberos principal used by the Thrift Source to authenticate to the Kerberos KDC.
agent-keytab

(Optional)

It specifies the keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the Kerberos KDC.

Example for agent named agent1, source src, and channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = thrift
agent1.sources.src.channels = ch1
agent1.sources.src.bind = 0.0.0.0
agent1.sources.src.port = 4141



3. Exec Source

This Apache Flume source Exec on strat-up runs a given Unix command. It expects that process to continuously produce data on stdout.

Unless the property logStdErr is set to true, stderr is simply discarded.

If for any reason the process exits, then the source also exits and will not produce any further data.
Thus configurations such as cat [named pipe] or tail -F [file] will produce the desired results. These two commands produce streams of data.

On the other hand, the date will probably not produce the desired result. It produces a single event and exits.

Some properties for the Exec Source are:

PropertyDefault valueDescription
channelsSpecify the channels through which an event travels.
typeThe component type name must be exec
commandIt specifies the command  to be executed
shellA shell invocation used to run the command. For example, /bin/sh -c. It is required only for commands relying on shell features like wildcards, back ticks, pipes, etc.
restartfalseIt specifies whether to restart the executed command if it dies.
batchSize20It specifies the max number of lines to read and send to the channel at a time.
restartThrottle10000It specifies the amount of time (in millis) to wait before attempting a restart.
batchTimeout3000It specifies the amount of time (in milliseconds) to wait if the buffer size was not reached before data is pushed downstream.

The problem with ExecSource is that it provides zero guarantees for event delivery.

Example for agent named agent1, source src, and channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = exec
agent1.sources.src.command = tail -F /var/log/secure
agent1.sources.src.channels = ch1

4. JMS source

This Apache Flume source reads messages from a JMS destination (queue or topic). Because of being a JMS application, this source should work with any JMS provider. But it has only been tested with ActiveMQ.

JMS source provides configurable user/pass, batch size, message selector, and message to the flume event converter.

Note: The JMS jar provided by the vendor should be included in the Flume classpath. It can be included using any of the three ways:
a) plugins.d directory (preferred)
b) –classpath on the command line
c) FLUME_CLASSPATH variable in flume-env.sh

Some of the property for JMS source are:

PropertyDefault ValueProperty
channelsSpecify the channels through which an event travels.
typeThe component type name must be jms
initialContextFactoryInitial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactoryThe JNDI name the connection factory should be appear as
providerURLThe JMS provider URL
destinationNameDestination name
destinationTypeDestination type (queue or topic)

 

batchSize100Number of messages to consume in one batch.

Example for agent named agent1, source src, channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = jms
agent1.sources.src.channels = c1
agent1.sources.src.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory
agent1.sources.src.connectionFactory = GenericConnectionFactory
agent1.sources.src.providerURL = tcp://mqserver:61616
agent1.sources.src.destinationName = BUSINESS_DATA
agent1.sources.src.destinationType = QUEUE

5. Spooling Directory Source

This Apache Flume source allows us to ingest data by placing files that are to be ingested into a “spooling” directory on disk. The Spooling Directory source will look at the specified directory for new files.

This source will parse data out of new files as they appear. The data parsing logic is pluggable. When a given file is fully read into the channel, then by default the completion is indicated either by renaming the file or deleting the file. The trackerDir is used for keeping track of processed files.

Spooling Directory Source is reliable and does not lead to data loss even if the Apache Flume is restarted or killed.
Some of the condition when it fails is:

a) If we write a file after it has been placed into the spooling directory, then Apache Flume will print an error to its log file and stop processing.

b) At a later time, if a file name is reused, then Apache Flume will print an error to its log file and stop processing.

These issues can be resolved by adding a unique identifier like timestamp to log file names when we are moving them into the spooling directory.

Some of the properties for Spooling Directory Source:

PropertyDefault ValuesDescription
channelsSpecify the channels through which an event travels.
typeThe component type name must be spooldir
spoolDirThe directory from which to read files from.
deletePolicyneverIt specifies when to delete completed files: never or immediate
fileHeaderfalseIt specifies whether to add a header storing the absolute path filename.
includePattern^.*$It specifies which file to include using Regular expression. It can be used together with ignorePattern. The file is ignored, if it matches ignorePattern and includePattern regex, 
ignorePattern^$It specifies which files to ignore (skip) through Regular expression . It can be used together with includePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored.
trackerDir.flumespoolIt specifies the directory to store metadata related to the processing of files. If this path is not absolute, then it is interpreted as relative to the spoolDir.

Example for an agent named agent1, source src, channel ch1:

agent1.channels = ch1
agent1.sources = src
agent1.sources.src.type = spooldir
agent1.sources.src.channels = ch1
agent1.sources.src.spoolDir = /var/log/apache/flumeSpool
agent1.sources.src.fileHeader = true

6. Kafka source

The Apache Flume source is an Apache Kafka consumer who reads messages from Kafka topics.
If we are having multiple Kafka sources, then we can configure them with the same Consumer Group. This will ensure that each will read a unique partition set for the topics.

It supports Kafka server release 0.10.1.0 or higher releases.
Testing was done up to 2.0.1 because at the time of release it was the highest released version.

Some properties for Kafka source are:

Property NameDefault valuesDescription
channelsSpecify the channels through which an event travels.
typeThe component type name must be org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.serversIt specifies the list of brokers in the Kafka cluster which is used by the source
kafka.consumer.group.idflumeIt specifies the Unique identified of the consumer group. Setting up the same id in multiple sources or agents indicates that they are part of the same consumer group.
kafka.topicsIt specifies the list of topics separated by comma from which the Kafka consumer will read messages.
kafka.topics.regexIt specifies the Regex which defines the set of topics the source is subscribed on. It has a higher priority than kafka.topics and overrides kafka.topics if it exists.
batchSize1000It specifies the maximum number of messages that can be written to Channel in one batch

Example of topic subscription through the comma-separated topic list, agent agent1, source src, and channel ch1:

agent1.sources.src.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.src.channels = ch1
agent1.sources.src.batchSize = 5000
agent1.sources.src.batchDurationMillis = 2000
agent1.sources.src.kafka.bootstrap.servers = localhost:9092
agent1.sources.src.kafka.topics = test1, test2
agent1.sources.src.kafka.consumer.group.id = custom.g.id

Example of topic subscription by regex

agent1.sources.src.type = org.apache.flume.source.kafka.KafkaSource
agent1.sources.src.channels = ch1
agent1.sources.src.kafka.bootstrap.servers = localhost:9092
agent1.sources.src.kafka.topics.regex = ^topic[0-9]$
# the default kafka.consumer.group.id=flume is used

7. NetCat TCP source

This Apache Flume source listens on a given port. It turns each line of text into a flume event. It acts like nc -k -l [host] [port].

In simple words, it opens a specified port and listens for data. The data supplied is newline-separated. It turns each line of text into a Flume event and sends it through the connected channel.

Some properties for NetCat TCP source:

Property NameDefault valuesDescription
channelsSpecify the channels through which an event travels.
typeThe component type name must be netcat
bindIt specifies the hostname or IP address to bind to.
portIt specifies the Port # to bind to
max-line-length512It specifies the max line length per event body (in bytes).
ack-every-eventtrueRespond with an “OK” for every event received.

Example for agent named agent1, sources src, channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = netcat
agent1.sources.src.bind = 0.0.0.0
agent1.sources.src.port = 6666
agent1.sources.src.channels = ch1

8. NetCat UDP Source

NetCat UDO Source is similar to the NetCat TCP source. It listens to a given port and turns each line of text into the flume event and sends it through the connected channel. It acts like nc -u -k -l [host] [port].

Some properties for NetCat UDP source:

Property NameDefault valuesDescription
channelsSpecify the channels through which an event travels.
typeThe component type name must be netcatudp
bindIt specifies the hostname or IP address to bind to.
portIt specifies the Port # to bind to

Example for agent named agent1, sources src, channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = netcatudp
agent1.sources.src.bind = 0.0.0.0
agent1.sources.src.port = 6666
agent1.sources.src.channels = ch1

9. Sequence Generator Source

A simple sequence generator source continuously generates events with a counter. The counter starts from 0, increments by 1, and stops at totalEvents.

The source retries when it cannot send events to the channel. This source is useful mainly for testing.

It keeps the body of the retried message the same as before during retries so that the number of unique events is equal to the specified totalEvents.

Some properties for Sequence Generator source:

Property NameDefault valuesDescription
channelsSpecify the channels through which an event travels.
typeThe component type name must be seq
batchSize1It specifies the number of events to attempt to process per request loop.
totalEventsLong.MAX_VALUEA number of unique events sent by the source.

Example for agent named agent1, sources src, channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = seq
agent1.sources.src.channels = ch1

10. Syslog Source

Syslog source reads syslog data and generates Flume events.

  • The Syslog UDP source considers an entire message as a single event.
  • The Syslog TCP sources create a new event for each character string separated by a newline (‘n’).

1. Syslog TCP source

Some of the property for the original, tried-and-true syslog TCP source:

Property NameDefault valuesDescription
channelsSpecify the channels through which an event travels.
typeThe component type name must be syslogtcp
hostIt specifies the hostname or IP address to bind to.
portIt specifies the Port # to bind to

Example for a syslog TCP source for agent named agent1, source src, channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = syslogtcp
agent1.sources.src.port = 5140
agent1.sources.src.host = localhost
agent1.sources.src.channels = ch1

2. Multiport Syslog TCP Source

This is a newer, faster, and multi-port capable version of the Syslog TCP source. It means it can listen on multiple ports at once efficiently.
Note: The port configuration setting has been replaced by ports.

Some of the property for Multiport Syslog TCP source:

Property NameDefault valuesDescription
channelsSpecify the channels through which an event travels.
typeThe component type name must be multiport_syslogtcp
hostIt specifies the hostname or IP address to bind to.
portsSpace-separated list of (one or more) ports to bind to.

Example for a multiport syslog TCP source for agent named agent1, source src, and channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = multiport_syslogtcp
agent1.sources.src.channels = ch1
agent1.sources.src.host = 0.0.0.0
agent1.sources.src.ports = 10001 10002 10003
agent1.sources.src.portHeader = port

3. Syslog UDP source

Some of the property for the original, tried-and-true Syslog UDP source:

Property NameDefault valuesDescription
channelsSpecify the channels through which an event travels.
typeThe component type name must be syslogudp
hostIt specifies the hostname or IP address to bind to.
portIt specifies the Port # to bind to

Example for a Syslog UDP source for agent named agent1, source src, and channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = syslogudp
agent1.sources.src.port = 5140
agent1.sources.src.host = localhost
agent1.sources.src.channels = ch1
 

11. HTTP Source

An HTTP source is a flume source which receives Flume Events by HTTP POST and GET.

A pluggable “handler” which implements HTTPSourceHandler interface converts HTTP requests into flume events. It takes a HttpServletRequest and returns a list of flume events.

All those events which are handled by one Http request are committed to the channel in one transaction. This increases channel efficiency.

If the pluggable handler throws an exception, then the HTTP source returns an HTTP status of 400.

In case, if the channel is full, or if the HTTP source is not able to append events to the channel, then in such case the HTTP source returns an HTTP 503 – Temporarily unavailable status.

Events sent in one POST request were assumed to be one batch and are inserted into the channel in one transaction.

HTTP source is based on Jetty 9.4. It offers the ability to set additional Jetty-specific parameters. These parameters will be passed directly to the Jetty components.

Some properties for HTTP source:

Property NameDefault valuesDescription
typeThe component type name must be http
portThe port the source should bind to.
bind0.0.0.0The hostname or IP address to listen on
handlerorg.apache.flume.source.http.JSONHandlerThe FQCN of the handler class.

Example for a HTTP source for agent named agent1, source src, and channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = http
agent1.sources.src.port = 5140
agent1.sources.src.channels = ch1
agent1.sources.src.handler = org.example.rest.RestHandler
agent1.sources.src.handler.nickname = random props
agent1.sources.src.HttpConfiguration.sendServerVersion = false
agent1.sources.src.ServerConnector.idleTimeout = 300
 

12. Legacy Source

The legacy sources allow the Flume 1.x agent to receive events from the Flume 0.9.4 agents.

These sources accept events in the Flume 0.9.4 format. They convert them to the Flume 1.0 format and store them in the connected channel.

The properties like pri, timestamp, nanos, host, etc of 0.9.4 events are converted to 1.x event header attributes.

Legacy source supports Avro and Thrift RPC connections. To use legacy sources we have to start a Flume 1.x agent with the avroLegacy or thriftLegacy source.

The Flume 0.9.4 agent should have the flume agent Sink which is pointing to the host/port of the Flume 1.x agent.

1. Avro legacy source

Some of the Properties for Avro legacy sources:

Property NameDefault valuesDescription
channelsSpecify the channels through which an event travels.
typeThe component type name must be org.apache.flume.source.avroLegacy.AvroLegacySource
hostIt specifies the hostname or IP address to bind to.
portIt specifies the Port # to bind to

Example for a Avro legacy source for agent named agent1, source src, and channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = org.apache.flume.source.avroLegacy.AvroLegacySource
agent1.sources.src.host = 0.0.0.0
agent1.sources.src.bind = 6666
agent1.sources.src.channels = ch1

2. Thrift legacy system

Some of the Properties for the Thrift legacy system:

Property NameDefault valuesDescription
channelsSpecify the channels through which an event travels.
typeThe component type name must be org.apache.flume.source.thriftLegacy.ThriftLegacySource
hostIt specifies the hostname or IP address to bind to.
portIt specifies the Port # to bind to

Example for a Thrift legacy source for agent named agent1, source src, and channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource
agent1.sources.src.host = 0.0.0.0
agent1.sources.src.bind = 6666
agent1.sources.src.channels = ch1

13. Custom Source

A custom source is our own implementation of the Source interface. We must include the custom source’s class and its dependencies in the agent’s classpath while starting the Flume agent. The type for the custom source is its FQCN.

Some of the property for Custom source are:

Property NameDefault valuesDescription
channelsSpecify the channels through which an event travels.
typeThe component type name must be FQCN 

Example for a Custom source for agent named agent1, source src, and channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = org.example.MySource
agent1.sources.src.channels = ch1

14. Scribe Source

The Flume source is another type of ingesting system. For adopting the existing Scribe ingest system, the Apache Flume should use ScribeSource based on Thrift with a compatible transferring protocol.

Some of the property for Scribe source are:

Property NameDefault valuesDescription
typeThe component type name must be org.apache.flume.source.scribe.ScribeSource
port1499It specifies the port that Scribe should be connected
maxReadBufferBytes16384000Thrift Default FrameBuffer Size
workerThreads5Handing threads number in Thrift

Example for a Scribe source for agent named agent1, source src, and channel ch1:

agent1.sources = src
agent1.channels = ch1
agent1.sources.src.type = org.apache.flume.source.scribe.ScribeSource
agent1.sources.src.port = 1463
agent1.sources.src.workerThreads = 5
agent1.sources.src.channels = ch1

Summary

I hope after reading this article you clearly understood different types of available flume sources. The article explains different flume sources like Avro, Thrift, HTTP, Kafka, NetCat, etc. The article has also explained the legacy sources, custom sources, as well as scribe sources.

Did you like this article? If Yes, please give DataFlair 5 Stars on Google

follow dataflair on YouTube

3 Responses

  1. Jyotsna says:

    How do we retain the original file name thats written to HDFS?
    Whenever a file is written to HDFS, a counter is suffixed. if I use baseNameheader and get the original file name, still a counter is appended to it.
    long counter = fileExtensionCounter.incrementAndGet();
    String fullFileName = fileName + “.” + counter;

  2. Praveen kumar Polimeni says:

    for a spooling directory source, can we mention the address of a remote directory ??

  3. Samira Karki says:

    Hello ,

    I am using Kafka source and pushing .txt data to Hadoop. used the similar .conf file and ran the flume agent.

    the command runs and returns the partition ID, but the file is not created in Hadoop. what am I missing?

    please advise

Leave a Reply

Your email address will not be published. Required fields are marked *