Apache Flume Source – Types of Flume Source

1. Apache Flume Source – Objective

There is a tool which receives data from the data generators. Also, it transfers it to one or more channels as Flume events. Basically, that tool is what we call Apache Flume Source in Apache Flume. However, there are many sources available in Apache Flume. So, in this blog Apache Flume Source, we will learn different types Flume Source: Avro Flume Source, Thrift Source, Exec Source, JMS Source, Spooling Directory Source, Flume Kafka Source, NetCat TCP Source, NetCat UDP Source, Sequence Generator Source, and syslog Sources in Flume.
So, let’s start discussing each and every Apache Flume Source with examples in detail.

Flume Sources

Introduction – Apache Flume Source

Hadoop Quiz

2. Different Types of Flume Source

There are several Apache Flume Source available in Apache Flume. So let’s discuss each of Flume Source in detail.

Types- Apache Flume Source

Types- Apache Flume Source

a. Avro Source: Flume Source

Listens on Avro port and receives events from external Avro client streams. When paired with the built-in Avro Sink on another (previous hop) Flume agent, it can create tiered collection topologies. Required properties are in bold.
So, Let’s discuss Flume Avro Sink Example
Below Table contains Avro Flume Source with property name, default and description.

Property NameDefaultDescription
channels
typeThe component type name, needs to be avro
bindhostname or IP address to listen on
portPort # to bind to
threadsMaximum number of worker threads to spawn
selector.type
selector.*
interceptorsSpace-separated list of interceptors
interceptors.*
compression-typenoneThis can be “none” or “deflate”. The compression-type must match the compression-type of matching AvroSource
SSLFalseSet this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”.
keystoreThis is the path to a Java keystore file. Required for SSL.
keystore-passwordThe password for the Java keystore. Required for SSL
keystore-typeJKSThe type of the Java keystore. This can be “JKS” or “PKCS12”.
exclude-protocolsSSLv3Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
ipFilterfalseSet this to true to enable ipFiltering for netty
ipFilterRulesDefine N netty ipFilter pattern rules with this config.

Let’s see an example for agent named a1:
For example,
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = avro
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
Example of ipFilterRules
Moreover, ipFilterRules defines N netty ipFilters separated by a comma a pattern rule must be in this format.
<’allow’ or deny>:<’ip’ or ‘name’ for computer name>:<pattern> or allow/deny:ip/name:pattern
example: ipFilterRules=allow:ip:127.*,allow:name:localhost,deny:ip:*
This is most important to note that the first rule to match will apply as the example below shows from a client on the localhost
However, it will Allow the client to localhost be denied clients from any other ip “allow:name:localhost,deny:ip:”
Although, it will deny the client on localhost be to allow clients from any other ip “deny:name:localhost,allow:ip:“
So, Let’s discuss Interceptors in Flume 

b. Thrift Source: Flume Source

Thrift source is nothing but a Apache Flume source which listens on Thrift port and receives events from external Thrift client streams. Moreover, while we pair it with the built-in ThriftSink on another (previous hop) Flume agent, it can create tiered collection topologies. However, we can configure it to start in secure mode by enabling kerberos authentication. Also, to authenticate to the kerberos KDC agent-principal and agent-keytab are the properties used by the Thrift Flume source.
Below Table contains Thrift Flume Source with property name, default and description.

Property NameDefaultDescription
channels
typeThe component type name, needs to be thrift
bindhostname or IP address to listen on
portPort # to bind to
threadsMaximum number of worker threads to spawn
selector.type
selector.*
interceptorsSpace separated list of interceptors
interceptors.*
sslFalse Set this to true to enable SSL encryption. You must also specify a “keystore” and a “keystore-password”.
keystoreThis is the path to a Java keystore file. Required for SSL
keystore-passwordThe password for the Java keystore. Required for SSL
keystore-typeJKSThe type of the Java keystore. This can be “JKS” or “PKCS12”.
exclude-protocolsSSL v3Space-separated list of SSL/TLS protocols to exclude. SSLv3 will always be excluded in addition to the protocols specified.
kerberosfalseSet to true to enable kerberos authentication. In kerberos mode, agent-principal and agent-keytab are required for successful authentication. The Thrift source in secure mode will accept connections only from Thrift clients that have kerberos enabled and are successfully authenticated to the kerberos KDC.
agent-principalThe kerberos principal used by the Thrift Source to authenticate to the kerberos KDC.
agent-keytabThe keytab location used by the Thrift Source in combination with the agent-principal to authenticate to the kerberos KDC.

Now, let’s see an example for agent named a1:
For example,
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = thrift
a1.sources.r1.channels = c1
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 4141
So, let’s revise Apache Flume Architecture in detail

c. Exec source: Flume Source

This Apache Flume source runs a given Unix command on start-up and expects. Basically, that process to continuously produce data on standard out. Unless property logStdErr is set to true, stderr simply discard. Exec Flume source also exits and will produce no further data if the process exits for any reason. It simply means that configurations such as cat [named pipe] or tail -F [file] are going to produce the desired results. Although date will probably not-the former two commands produce streams of data. Whereas the latter produces a single event and exits.
Also, let’s see an example for agent named a1:
For example,
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = exec
a1.sources.r1.command = tail -F /var/log/secure
a1.sources.r1.channels = c1
Moreover, here to invoke the ‘command’ through a command shell the ‘shell’ config is used. Such as Bash or Powershell. Also, for execution, the ‘command’ pass as an argument to ‘shell’. Although, it allows the ‘command’ to use features from the shell. Like wildcards, backticks, pipes, loops, conditionals and many more. Likewise, the ‘command’ will be invoked directly in the absence of the ‘shell’ config. For ‘shell’ common values are : ‘/bin/sh -c’, ‘/bin/ksh -c’, ‘cmd /c’, ‘powershell -Command’, etc.
For example,
a1.sources.tailsource-1.type = exec
a1.sources.tailsource-1.shell = /bin/bash -c
a1.sources.tailsource-1.command = for i in /path/*.txt; do cat $i; done

d. JMS Source: Flume Source

JMS Flume Source, it reads messages from a JMS destination such as a queue or topic. Make sure it has only been tested with ActiveMQ even though being a JMS application it should work with any JMS provider. However, it offers configurable batch size, message selector, user/pass, and message to flume event converter. It is important to note that the vendor provided JMS jars should be included in the Flume classpath using plugins.d directory (preferred), –classpath on the command line, or via FLUME_CLASSPATH variable in flume-env.sh.
Below Table contains JMS Flume Source with property name, default and description.

Property NameDefaultDescription
channels
typeThe component type name, needs to be jms
initialContextFactoryInital Context Factory, e.g: org.apache.activemq.jndi.ActiveMQInitialContextFactory
connectionFactoryThe JNDI name the connection factory should appear as
providerURLThe JMS provider URL
destinationNameDestination name
destinationTypeDestination type (queue or topic)
messageSelectorMessage selector to use when creating the consumer
userNameUsername for the destination/provider
passwordFileFile containing the password for the destination/provider
batchSize100Number of messages to consume in one batch
converter.typeDEFAULTClass to use to convert messages to flume events.
converter.*Converter properties
converter.charsetUTF-8Default converter only. Charset to use when converting JMS TextMessages to byte arrays.
createDurableSubscriptionFalseWhether to create durable subscription. Durable subscription can only be used with destinationType topic. If true, “clientId” and “durableSubscriptionName” have to be specified.
clientIdJMS client identifier set on Connection right after it is created.
durableSubscriptionNameName used to identify the durable subscription. Required for durable subscriptions.

e. Spooling Directory Source: Flume Source

We can ingest data by placing files to be ingested into a “spooling” directory on disk with this “spooling” directory. Moreover, though, this Apache Flume source we will watch the specified directory for new files and will parse events out of new files as they appear. However, the event parsing logic is pluggable. Also, it renames to indicate completion after a given file will  fully read into the channel.
In addition, a Spooling Directory Flume source is reliable and will not miss data. Although, even if Flume is restarted or killed, unlike the Exec source. Although, the spooling directory in exchange for this reliability, only immutable, uniquely-named files must be dropped into. Also, Flume tries to detect these problem conditions and will fail loudly if they violate:

  • Moreover, Flume will print an error to its log file and stop processing if a file is written to after being placed into the spooling directory.
  • Also, Flume will print an error to its log file and stop processing if a file name is reused at a later time.

However, it may be useful to add a unique identifier (such as a timestamp) to log file names when they are moved into the spooling directory to avoid the above issues.
Also, in case of oldest and youngest, the last modified time of the files will be used to compare the files. In case of a tie, the file with smallest lexicographical order will be consumed first.
FAIL: Throw an exception and fail to parse the file. REPLACE: Replace the unparseable character with the “replacement character” char, typically Unicode U+FFFD. IGNORE: Drop the unparseable character sequence.
Below Table contains Spooling Directory Flume Source with property name, default and description.

Property NameDefaultDescription
channel
typeThe component type name, needs to be spooldir.
SpoolDirThe directory from which to read files from.
fileSuffix.COMPLETEDSuffix to append to completely ingested files
deletePolicyneverWhen to delete completed files: never or immediate
fileHeaderfalseWhether to add a header storing the absolute path filename.
fileHeaderKeyfileHeader key to use when appending absolute path filename to event header.
basenameHeaderfalseWhether to add a header storing the basename of the file.
basenameHeaderKeybasenameHeader Key to use when appending basename of the file to event header.
includePattern^.*$Regular expression specifying which files to include. It can use together with ignorePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored.
ignorePattern^$Regular expression specifying which files to ignore (skip). It can used together with includePattern. If a file matches both ignorePattern and includePattern regex, the file is ignored.
trackerDir.flumespoolDirectory to store metadata related to the processing of files. If this path is not an absolute path, then it is interpreted as relative to the spoolDir.
consumeOrderoldestIn which order files in the spooling directory will be consumed oldest, youngest and random.
pollDelay500Delay (in milliseconds) used when polling for new files.
recursiveDirectorySearchFALSEWhether to monitor sub directories for new files to read.
maxBackoff4000The maximum time (in millis) to wait between consecutive attempts to write to the channel(s) if the channel is full. The source will start at a low backoff and increase it exponentially each time the channel throws a ChannelException, upto the value specified by this parameter.
batchSize100Granularity at which to batch transfer to the channel
inputCharsetUTF-8Character set used by deserializers that treat the input file as text.
decodeErrorPolicyFAILWhat to do when we see a non-decodable character in the input file.
deserializerlINESpecify the deserializer used to parse the file into events. Defaults to parsing each line as an event. The class specified must implement EventDeserializer.Builder.
deserializer.*Varies per event deserializer.
bufferMaxLines(Obselete) This option is now ignored.
bufferMaxLineLength5000(Deprecated) Maximum length of a line in the commit buffer. Use deserializer.maxLineLength instead
selector.typereplicatingreplicating or multiplexing
selector.*Depends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.*

So, let’ see an example for an agent named agent-1:
a1.channels = ch-1
a1.sources = src-1
a1.sources.src-1.type = spooldir
a1.sources.src-1.channels = ch-1
a1.sources.src-1.spoolDir = /var/log/apache/flumeSpool
a1.sources.src-1.fileHeader = true
Also, Let’s look at Apache Flume Features & Limitations in Detail

f. Kafka Source: Flume Source

Flume Kafka Source is an Apache Kafka consumer that reads messages from Kafka topics. If you have multiple Kafka Flume sources running, you can configure them with the same Consumer Group so each will read a unique set of partitions for the topics.
Below Table contains Kafka Flume Source with property name, default and description.

Property NameDefaultDescription
channels
typeThe component type name, needs to be org.apache.flume.source.kafka.KafkaSource
kafka.bootstrap.serversList of brokers in the Kafka cluster used by the source
kafka.consumer.group.idFLUMEUnique identified of consumer group. Setting the same id in multiple sources or agents indicates that they are part of the same consumer group
kafka.topicsComma-separated list of topics the kafka consumer will read messages from.
kafka.topics.regexRegex that defines set of topics the source is subscribed to.
setTopicHeadertrueWhen set to true, stores the topic of the retrieved message into a header, defined by the topicHeader property.
topicHeadertopicDefines the name of the header in which to store the name of the topic the message was received from, if the setTopicHeader property is set to true.
kafka.consumer.security.protocolPLAINTEXTSet to SASL_PLAINTEXT, SASL_SSL or SSL if writing to Kafka using some level of security. See below for additional info on secure setup.
more consumer security propsIf using SASL_PLAINTEXT, SASL_SSL or SSL refer to Kafka security for additional properties that need to be set on the consumer.

Also, it is very important to note that the Kafka Flume Source overrides two Kafka consumer parameters: auto.commit.enable set to “false” by the source and every commit batch. Kafka Flume source guarantees at least once strategy of messages retrieval. The duplicates can be present when the source starts. The Kafka Source also provides defaults for the key.deserializer(org.apache.kafka.common.serialization.StringSerializer) and value.deserializer(org.apache.kafka.common.serialization.ByteArraySerializer).
Deprecated Properties

Property NameDefaultDescription
topicUse kafka.topics
groupId flumeUse kafka.consumer.group.id
zookeeperConnectIs no longer supported by kafka consumer client since 0.9.x. Use kafka.bootstrap.servers to establish connection with kafka cluster

Now, let’s see an example for topic subscription by comma-separated topic list.
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.batchSize = 5000
tier1.sources.source1.batchDurationMillis = 2000
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics = test1, test2
tier1.sources.source1.kafka.consumer.group.id = custom.g.id
Also, let’s see an example for topic subscription by regex
For example,
tier1.sources.source1.type = org.apache.flume.source.kafka.KafkaSource
tier1.sources.source1.channels = channel1
tier1.sources.source1.kafka.bootstrap.servers = localhost:9092
tier1.sources.source1.kafka.topics.regex = ^topic[0-9]$
# the default uses kafka.consumer.group.id=flume
Click here to know Best books to learn Apache Flume

i. NetCat TCP Source: Flume Source

A netcat-TCP Apache Flume source that listens on a given port and turns each line of text into an event. Acts like nc -k -l [host] [port]. In other words, it opens a specified port and listens for data. The expectation supply data is newline separated text. Each line of text is turne into a Flume event and sent via the connecting channel.
Below Table contains NetCat TCP Flume Source with property name, default and description.

Property NameDefaultDescription
channels
typeThe component type name, needs to be netcat
bindHost name or IP address to bind to
portPort # to bind to
max-line-length 512Max line length per event body (in bytes)
ack-every-event trueRespond with an “OK” for every event received
selector.typereplicatingreplicating or multiplexing
selector.*Depends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.*

Now, let’s see an example for agent named a1:
For example,
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcat
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

j. NetCat UDP Source: Flume Source

As per the original Netcat (TCP) source, Apache Flume source that listens on a given port and turns each line of text into an event and sent via the connected channel. Acts like nc -u -k -l [host] [port].
Below Table contains NetCat UDP Flume Source with property name, default and description.

Property NameDefaultDescription
channels
typeThe component type name, needs to be netcatudp
bindHost name or IP address to bind to
portPort # to bind to
remoteAddressHeaderreplicating
selector.typereplicating or multiplexing
selector.*Depends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.*

So, let’s see an example example for agent named a1:
For example,
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = netcatudp
a1.sources.r1.bind = 0.0.0.0
a1.sources.r1.port = 6666
a1.sources.r1.channels = c1

k. Sequence Generator Source: Flume Source

A generator that continuously generates events with a counter that starts from 0, increments by 1 and stops at totalEvents is what we call simple sequence. Also, it retries when it can’t send events to the channel. Moreover, we can say it is very useful mainly for testing. Also, it keeps the body of the retry messages the same as before so that the number of unique events – after de-duplication at destination – is expecting to be equal to the specified total Events during retries.
Below Table contains Sequence Generator Flume Source with property name, default and description.

Property NameDefaultDescription
channels
typeThe component type name, needs to be seq
selector.typereplicating or multiplexing
selector.*replicatingDepends on the selector.type value
interceptorsSpace-separated list of interceptors
interceptors.*
batchSize1Number of events to attempt to process per request loop
totalEventsLong.MAX_VALUENumber of unique events sent by the source.

Also, let’s see an example for agent named a1:
For example,
a1.sources = r1
a1.channels = c1
a1.sources.r1.type = seq
a1.sources.r1.channels = c1

l. Syslog Sources: Flume Source

Basically, Syslog Flume Sources reads syslog data and after that generate Flume events. Moreover, here the UDP source treats an entire message as a single event. However, for each string of characters separated by a newline (‘n’), the TCP sources create a new event.

Get the most demanding skills of IT Industry - Learn Hadoop

3. Conclusion

Hence, in this apache flume tutorial, we have seen all the Apache Flume Source available in Apache Flume and the types of Flume Sources: Avro Flume Source, Thrift  Source in Flume, Exec Apache Flume Source, JMS Flume Source, Spooling Directory Source in Flume, Flume Kafka Source, NetCat TCP Flume Source, NetCat UDP Apache Flume Source, Sequence Generator Flume Source, and syslog Sources in Flume with Apache flume examples. Furthermore, if you want to ask any query regarding Flume Source, feel free to ask in the comment section.
See Also- How to Install Apache Flume
For reference

2 Responses

  1. Jyotsna says:

    How do we retain the original file name thats written to HDFS?
    Whenever a file is written to HDFS, a counter is suffixed. if I use baseNameheader and get the original file name, still a counter is appended to it.
    long counter = fileExtensionCounter.incrementAndGet();
    String fullFileName = fileName + “.” + counter;

  2. Praveen kumar Polimeni says:

    for a spooling directory source, can we mention the address of a remote directory ??

Leave a Reply

Your email address will not be published. Required fields are marked *