Apache Flume Source – Types of Flume Source
In this article, you will explore different types of Flume sources. The article explains different Flume sources like Avro source, Thrift source, Exec source, JMS source, Spooling Directory source, Kafka source, NetCat source, and some more. The article also explains the properties associated with each source.
Introduction to Flume Source
Apache Flume source is the component of the Flume agent which receives data from external sources and passes it on to the one or more channels. It consumes data from an external source like a web server.
The external data source sends data to Apache Flume in a format that is recognizable by the target Flume source.
There are many sources available in Apache Flume. Let us now explore different Flume sources along with the properties and examples.
1. Avro Source
Avro source receives events from the external Avro client stream. It listens to it through the Avro port number. When an Avro source is paired with the built-in Avro Sink on another Flume agent, then it can create tired collection topologies.
Some of the properties for Avro source are:
Property Name | Default value | Description |
channels (Required) | – | Specify the channels through which an event travels. |
type (Required) | – | The component type name must be avro |
bind (Required) | – | Specify the hostname or IP address to listen on. |
port (Required) | – | Specify the port # to bind to. |
threads (Optional) | – | Specify the maximum number of worker threads to spawn |
selector.type (Optional) | – | Specify the channel selector type. |
interceptors (Optional) | – | Space-separated list of interceptors |
compression-type (Optional) | none | It can be “none” or “deflate”. The compression-type should match the compression-type of the matching AvroSource. |
Example for agent named agent1, source src, and channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = avro agent1.sources.src.channels = ch1 agent1.sources.src.bind = 0.0.0.0 agent1.sources.src.port = 4141
2. Thrift Source
Thrift Source receives events from the external Thrift client. It listens to it through Thrift port. When a Thrift source gets paired with the built-in Thrift Sink on the different Flume agent, then it could create tiered collection topologies.
We can configure the Thrift source to start in a secure mode. We can do so by enabling Kerberos authentication.
The thrift source uses two properties the agent-principal and agent-key tab to authenticate to the Kerberos KDC.
Some of the properties of Thrift source are:
Property Name | Default value | Description |
channels (Required) | – | Specify the channels through which an event travels. |
type (Required) | – | The component type name must be thrift |
bind (Required) | – | Specify the hostname or IP address to listen on. |
port (Required) | – | Specify the port # to bind to. |
kerberos (Optional) | false | It specifies whether to enable Kerberos security or not. It is set to true to enable Kerberos authentication. In Kerberos mode, for successful authentication agent-principal and agent-keytab are required. In secure mode, Thrift source will accept connections only from those Thrift clients who are having Kerberos enabled and successfully authenticated to the Kerberos KDC. |
agent-principal (Optional) | – | It specifies the Kerberos principal used by the Thrift Source to authenticate to the Kerberos KDC. |
agent-keytab (Optional) | – | It specifies the keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the Kerberos KDC. |
Example for agent named agent1, source src, and channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = thrift agent1.sources.src.channels = ch1 agent1.sources.src.bind = 0.0.0.0 agent1.sources.src.port = 4141
3. Exec Source
This Apache Flume source Exec on strat-up runs a given Unix command. It expects that process to continuously produce data on stdout.
Unless the property logStdErr is set to true, stderr is simply discarded.
If for any reason the process exits, then the source also exits and will not produce any further data.
Thus configurations such as cat [named pipe] or tail -F [file] will produce the desired results. These two commands produce streams of data.
On the other hand, the date will probably not produce the desired result. It produces a single event and exits.
Some properties for the Exec Source are:
Property | Default value | Description |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be exec |
command | – | It specifies the command to be executed |
shell | – | A shell invocation used to run the command. For example, /bin/sh -c. It is required only for commands relying on shell features like wildcards, back ticks, pipes, etc. |
restart | false | It specifies whether to restart the executed command if it dies. |
batchSize | 20 | It specifies the max number of lines to read and send to the channel at a time. |
restartThrottle | 10000 | It specifies the amount of time (in millis) to wait before attempting a restart. |
batchTimeout | 3000 | It specifies the amount of time (in milliseconds) to wait if the buffer size was not reached before data is pushed downstream. |
The problem with ExecSource is that it provides zero guarantees for event delivery.
Example for agent named agent1, source src, and channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = exec agent1.sources.src.command = tail -F /var/log/secure agent1.sources.src.channels = ch1
4. JMS source
This Apache Flume source reads messages from a JMS destination (queue or topic). Because of being a JMS application, this source should work with any JMS provider. But it has only been tested with ActiveMQ.
JMS source provides configurable user/pass, batch size, message selector, and message to the flume event converter.
Note: The JMS jar provided by the vendor should be included in the Flume classpath. It can be included using any of the three ways:
a) plugins.d directory (preferred)
b) –classpath on the command line
c) FLUME_CLASSPATH variable in flume-env.sh
Some of the property for JMS source are:
Property | Default Value | Property |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be jms |
initialContextFactory | – | Initial Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory |
connectionFactory | – | The JNDI name the connection factory should be appear as |
providerURL | – | The JMS provider URL |
destinationName | – | Destination name |
destinationType | – | Destination type (queue or topic)
|
batchSize | 100 | Number of messages to consume in one batch. |
Example for agent named agent1, source src, channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = jms agent1.sources.src.channels = c1 agent1.sources.src.initialContextFactory = org.apache.activemq.jndi.ActiveMQInitialContextFactory agent1.sources.src.connectionFactory = GenericConnectionFactory agent1.sources.src.providerURL = tcp://mqserver:61616 agent1.sources.src.destinationName = BUSINESS_DATA agent1.sources.src.destinationType = QUEUE
5. Spooling Directory Source
This Apache Flume source allows us to ingest data by placing files that are to be ingested into a “spooling” directory on disk. The Spooling Directory source will look at the specified directory for new files.
This source will parse data out of new files as they appear. The data parsing logic is pluggable. When a given file is fully read into the channel, then by default the completion is indicated either by renaming the file or deleting the file. The trackerDir is used for keeping track of processed files.
Spooling Directory Source is reliable and does not lead to data loss even if the Apache Flume is restarted or killed.
Some of the condition when it fails is:
a) If we write a file after it has been placed into the spooling directory, then Apache Flume will print an error to its log file and stop processing.
b) At a later time, if a file name is reused, then Apache Flume will print an error to its log file and stop processing.
These issues can be resolved by adding a unique identifier like timestamp to log file names when we are moving them into the spooling directory.
Some of the properties for Spooling Directory Source:
Property | Default Values | Description |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be spooldir |
spoolDir | – | The directory from which to read files from. |
deletePolicy | never | It specifies when to delete completed files: never or immediate |
fileHeader | false | It specifies whether to add a header storing the absolute path filename. |
includePattern | ^.*$ | It specifies which file to include using Regular expression. It can be used together with ignorePattern. The file is ignored, if it matches ignorePattern and includePattern regex, |
ignorePattern | ^$ | It specifies which files to ignore (skip) through Regular expression . It can be used together with includePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored. |
trackerDir | .flumespool | It specifies the directory to store metadata related to the processing of files. If this path is not absolute, then it is interpreted as relative to the spoolDir. |
Example for an agent named agent1, source src, channel ch1:
agent1.channels = ch1 agent1.sources = src agent1.sources.src.type = spooldir agent1.sources.src.channels = ch1 agent1.sources.src.spoolDir = /var/log/apache/flumeSpool agent1.sources.src.fileHeader = true
6. Kafka source
The Apache Flume source is an Apache Kafka consumer who reads messages from Kafka topics.
If we are having multiple Kafka sources, then we can configure them with the same Consumer Group. This will ensure that each will read a unique partition set for the topics.
It supports Kafka server release 0.10.1.0 or higher releases.
Testing was done up to 2.0.1 because at the time of release it was the highest released version.
Some properties for Kafka source are:
Property Name | Default values | Description |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be org.apache.flume.source.kafka.KafkaSource |
kafka.bootstrap.servers | – | It specifies the list of brokers in the Kafka cluster which is used by the source |
kafka.consumer.group.id | flume | It specifies the Unique identified of the consumer group. Setting up the same id in multiple sources or agents indicates that they are part of the same consumer group. |
kafka.topics | – | It specifies the list of topics separated by comma from which the Kafka consumer will read messages. |
kafka.topics.regex | – | It specifies the Regex which defines the set of topics the source is subscribed on. It has a higher priority than kafka.topics and overrides kafka.topics if it exists. |
batchSize | 1000 | It specifies the maximum number of messages that can be written to Channel in one batch |
Example of topic subscription through the comma-separated topic list, agent agent1, source src, and channel ch1:
agent1.sources.src.type = org.apache.flume.source.kafka.KafkaSource agent1.sources.src.channels = ch1 agent1.sources.src.batchSize = 5000 agent1.sources.src.batchDurationMillis = 2000 agent1.sources.src.kafka.bootstrap.servers = localhost:9092 agent1.sources.src.kafka.topics = test1, test2 agent1.sources.src.kafka.consumer.group.id = custom.g.id
Example of topic subscription by regex
agent1.sources.src.type = org.apache.flume.source.kafka.KafkaSource agent1.sources.src.channels = ch1 agent1.sources.src.kafka.bootstrap.servers = localhost:9092 agent1.sources.src.kafka.topics.regex = ^topic[0-9]$ # the default kafka.consumer.group.id=flume is used
7. NetCat TCP source
This Apache Flume source listens on a given port. It turns each line of text into a flume event. It acts like nc -k -l [host] [port].
In simple words, it opens a specified port and listens for data. The data supplied is newline-separated. It turns each line of text into a Flume event and sends it through the connected channel.
Some properties for NetCat TCP source:
Property Name | Default values | Description |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be netcat |
bind | – | It specifies the hostname or IP address to bind to. |
port | – | It specifies the Port # to bind to |
max-line-length | 512 | It specifies the max line length per event body (in bytes). |
ack-every-event | true | Respond with an “OK” for every event received. |
Example for agent named agent1, sources src, channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = netcat agent1.sources.src.bind = 0.0.0.0 agent1.sources.src.port = 6666 agent1.sources.src.channels = ch1
8. NetCat UDP Source
NetCat UDO Source is similar to the NetCat TCP source. It listens to a given port and turns each line of text into the flume event and sends it through the connected channel. It acts like nc -u -k -l [host] [port].
Some properties for NetCat UDP source:
Property Name | Default values | Description |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be netcatudp |
bind | – | It specifies the hostname or IP address to bind to. |
port | – | It specifies the Port # to bind to |
Example for agent named agent1, sources src, channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = netcatudp agent1.sources.src.bind = 0.0.0.0 agent1.sources.src.port = 6666 agent1.sources.src.channels = ch1
9. Sequence Generator Source
A simple sequence generator source continuously generates events with a counter. The counter starts from 0, increments by 1, and stops at totalEvents.
The source retries when it cannot send events to the channel. This source is useful mainly for testing.
It keeps the body of the retried message the same as before during retries so that the number of unique events is equal to the specified totalEvents.
Some properties for Sequence Generator source:
Property Name | Default values | Description |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be seq |
batchSize | 1 | It specifies the number of events to attempt to process per request loop. |
totalEvents | Long.MAX_VALUE | A number of unique events sent by the source. |
Example for agent named agent1, sources src, channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = seq agent1.sources.src.channels = ch1
10. Syslog Source
Syslog source reads syslog data and generates Flume events.
- The Syslog UDP source considers an entire message as a single event.
- The Syslog TCP sources create a new event for each character string separated by a newline (‘n’).
1. Syslog TCP source
Some of the property for the original, tried-and-true syslog TCP source:
Property Name | Default values | Description |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be syslogtcp |
host | – | It specifies the hostname or IP address to bind to. |
port | – | It specifies the Port # to bind to |
Example for a syslog TCP source for agent named agent1, source src, channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = syslogtcp agent1.sources.src.port = 5140 agent1.sources.src.host = localhost agent1.sources.src.channels = ch1
2. Multiport Syslog TCP Source
This is a newer, faster, and multi-port capable version of the Syslog TCP source. It means it can listen on multiple ports at once efficiently.
Note: The port configuration setting has been replaced by ports.
Some of the property for Multiport Syslog TCP source:
Property Name | Default values | Description |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be multiport_syslogtcp |
host | – | It specifies the hostname or IP address to bind to. |
ports | – | Space-separated list of (one or more) ports to bind to. |
Example for a multiport syslog TCP source for agent named agent1, source src, and channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = multiport_syslogtcp agent1.sources.src.channels = ch1 agent1.sources.src.host = 0.0.0.0 agent1.sources.src.ports = 10001 10002 10003 agent1.sources.src.portHeader = port
3. Syslog UDP source
Some of the property for the original, tried-and-true Syslog UDP source:
Property Name | Default values | Description |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be syslogudp |
host | – | It specifies the hostname or IP address to bind to. |
port | – | It specifies the Port # to bind to |
Example for a Syslog UDP source for agent named agent1, source src, and channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = syslogudp agent1.sources.src.port = 5140 agent1.sources.src.host = localhost agent1.sources.src.channels = ch1
11. HTTP Source
An HTTP source is a flume source which receives Flume Events by HTTP POST and GET.
A pluggable “handler” which implements HTTPSourceHandler interface converts HTTP requests into flume events. It takes a HttpServletRequest and returns a list of flume events.
All those events which are handled by one Http request are committed to the channel in one transaction. This increases channel efficiency.
If the pluggable handler throws an exception, then the HTTP source returns an HTTP status of 400.
In case, if the channel is full, or if the HTTP source is not able to append events to the channel, then in such case the HTTP source returns an HTTP 503 – Temporarily unavailable status.
Events sent in one POST request were assumed to be one batch and are inserted into the channel in one transaction.
HTTP source is based on Jetty 9.4. It offers the ability to set additional Jetty-specific parameters. These parameters will be passed directly to the Jetty components.
Some properties for HTTP source:
Property Name | Default values | Description |
type | – | The component type name must be http |
port | – | The port the source should bind to. |
bind | 0.0.0.0 | The hostname or IP address to listen on |
handler | org.apache.flume.source.http.JSONHandler | The FQCN of the handler class. |
Example for a HTTP source for agent named agent1, source src, and channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = http agent1.sources.src.port = 5140 agent1.sources.src.channels = ch1 agent1.sources.src.handler = org.example.rest.RestHandler agent1.sources.src.handler.nickname = random props agent1.sources.src.HttpConfiguration.sendServerVersion = false agent1.sources.src.ServerConnector.idleTimeout = 300
12. Legacy Source
The legacy sources allow the Flume 1.x agent to receive events from the Flume 0.9.4 agents.
These sources accept events in the Flume 0.9.4 format. They convert them to the Flume 1.0 format and store them in the connected channel.
The properties like pri, timestamp, nanos, host, etc of 0.9.4 events are converted to 1.x event header attributes.
Legacy source supports Avro and Thrift RPC connections. To use legacy sources we have to start a Flume 1.x agent with the avroLegacy or thriftLegacy source.
The Flume 0.9.4 agent should have the flume agent Sink which is pointing to the host/port of the Flume 1.x agent.
1. Avro legacy source
Some of the Properties for Avro legacy sources:
Property Name | Default values | Description |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be org.apache.flume.source.avroLegacy.AvroLegacySource |
host | – | It specifies the hostname or IP address to bind to. |
port | – | It specifies the Port # to bind to |
Example for a Avro legacy source for agent named agent1, source src, and channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = org.apache.flume.source.avroLegacy.AvroLegacySource agent1.sources.src.host = 0.0.0.0 agent1.sources.src.bind = 6666 agent1.sources.src.channels = ch1
2. Thrift legacy system
Some of the Properties for the Thrift legacy system:
Property Name | Default values | Description |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be org.apache.flume.source.thriftLegacy.ThriftLegacySource |
host | – | It specifies the hostname or IP address to bind to. |
port | – | It specifies the Port # to bind to |
Example for a Thrift legacy source for agent named agent1, source src, and channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = org.apache.flume.source.thriftLegacy.ThriftLegacySource agent1.sources.src.host = 0.0.0.0 agent1.sources.src.bind = 6666 agent1.sources.src.channels = ch1
13. Custom Source
A custom source is our own implementation of the Source interface. We must include the custom source’s class and its dependencies in the agent’s classpath while starting the Flume agent. The type for the custom source is its FQCN.
Some of the property for Custom source are:
Property Name | Default values | Description |
channels | – | Specify the channels through which an event travels. |
type | – | The component type name must be FQCN |
Example for a Custom source for agent named agent1, source src, and channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = org.example.MySource agent1.sources.src.channels = ch1
14. Scribe Source
The Flume source is another type of ingesting system. For adopting the existing Scribe ingest system, the Apache Flume should use ScribeSource based on Thrift with a compatible transferring protocol.
Some of the property for Scribe source are:
Property Name | Default values | Description |
type | – | The component type name must be org.apache.flume.source.scribe.ScribeSource |
port | 1499 | It specifies the port that Scribe should be connected |
maxReadBufferBytes | 16384000 | Thrift Default FrameBuffer Size |
workerThreads | 5 | Handing threads number in Thrift |
Example for a Scribe source for agent named agent1, source src, and channel ch1:
agent1.sources = src agent1.channels = ch1 agent1.sources.src.type = org.apache.flume.source.scribe.ScribeSource agent1.sources.src.port = 1463 agent1.sources.src.workerThreads = 5 agent1.sources.src.channels = ch1
Summary
I hope after reading this article you clearly understood different types of available flume sources. The article explains different flume sources like Avro, Thrift, HTTP, Kafka, NetCat, etc. The article has also explained the legacy sources, custom sources, as well as scribe sources.
We work very hard to provide you quality material
Could you take 15 seconds and share your happy experience on Google
How do we retain the original file name thats written to HDFS?
Whenever a file is written to HDFS, a counter is suffixed. if I use baseNameheader and get the original file name, still a counter is appended to it.
long counter = fileExtensionCounter.incrementAndGet();
String fullFileName = fileName + “.” + counter;
for a spooling directory source, can we mention the address of a remote directory ??
Hello ,
I am using Kafka source and pushing .txt data to Hadoop. used the similar .conf file and ran the flume agent.
the command runs and returns the partition ID, but the file is not created in Hadoop. what am I missing?
please advise