

{"id":14722,"date":"2018-04-28T13:25:53","date_gmt":"2018-04-28T13:25:53","guid":{"rendered":"https:\/\/data-flair.training\/blogs\/?p=14722"},"modified":"2018-04-28T13:25:53","modified_gmt":"2018-04-28T13:25:53","slug":"kafka-consumer","status":"publish","type":"post","link":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/","title":{"rendered":"Apache Kafka Consumer | Kafka Consumer Group"},"content":{"rendered":"<p>In our last article, we discussed <strong>Kafka Producer<\/strong>. Today, we will discuss Kafka Consumer. Firstly, we will see what is Kafka Consumer and example of Kafka Consumer. Afterward, we will learn Kafka Consumer Group.<\/p>\n<p>Moreover, we will see Consumer record API and configurations setting for Kafka Consumer.<br \/>\n<span style=\"font-weight: 400\">After creating a Kafka Producer to send messages to <strong>Apache Kafka<\/strong> cluster. Now, we are creating a Kafka Consumer to consume messages from the Kafka cluster.<\/span><\/p>\n<p>So, let&#8217;s discuss Kafka Consumer in detail.<\/p>\n<h2><span style=\"font-weight: 400\">What is Kafka Consumer?<\/span><\/h2>\n<p><span style=\"font-weight: 400\">An application that reads data from <strong>Kafka Topics<\/strong> is what we call a Consumer. Basically, Kafka Consumer subscribes to one or more topics in the Kafka cluster then further feeds on tokens or messages from the Kafka Topics.\u00a0<\/span><\/p>\n<p>In addition, using Heartbeat we can know the connectivity of Consumer to <strong>Kafka Cluster<\/strong>. However, let\u2019s define Heartbeat. It is set up at Consumer to let Zookeeper or Broker Coordinator know if the Consumer is still connected to the Cluster.<\/p>\n<p>So, Kafka Consumer is no longer connected to the Cluster, if the heartbeat is absent. In that case, the Broker Coordinator has to re-balance the load.<\/p>\n<p>Moreover, Heartbeat is an overhead to the cluster. Also, by keeping the data throughput and overhead in consideration, we can configure the interval at which the heartbeat is at Consumer.<\/p>\n<div id=\"attachment_14881\" style=\"width: 665px\" class=\"wp-caption aligncenter\"><a href=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Apache-Kafka-Consumer-1.png\"><img loading=\"lazy\" decoding=\"async\" aria-describedby=\"caption-attachment-14881\" class=\"wp-image-14881 size-full\" src=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Apache-Kafka-Consumer-1.png\" alt=\"Apache Kafka Consumer\" width=\"655\" height=\"628\" srcset=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Apache-Kafka-Consumer-1.png 655w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Apache-Kafka-Consumer-1-150x144.png 150w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Apache-Kafka-Consumer-1-300x288.png 300w\" sizes=\"auto, (max-width: 655px) 100vw, 655px\" \/><\/a><p id=\"caption-attachment-14881\" class=\"wp-caption-text\">What is Apache Kafka Consumer<\/p><\/div>\n<p><span style=\"font-weight: 400\">Moreover, we can group the consumers, and the consumers in the Consumer Group in Kafka could share the partitions of the Kafka Topics they subscribed to. T<\/span><\/p>\n<p><span style=\"font-weight: 400\">o understand see, if there are N partitions in a Topic, N consumers in the Kafka Consumer Group and the group has subscribed to a Topic, each consumer would read data from a partition of the topic. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Hence, we can say, this is just a heads up that Consumers could be in groups.<\/span><\/p>\n<p><span style=\"font-weight: 400\">To be specific, to connect to the Kafka cluster and consume the data streams, the Consumer API from Kafka helps.<\/span><\/p>\n<p><span style=\"font-weight: 400\">Below is the picture showing Apache Kafka Consumer:<\/span><\/p>\n<div id=\"attachment_14776\" style=\"width: 1210px\" class=\"wp-caption aligncenter\"><a href=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Kafka-Cluster.png\"><img loading=\"lazy\" decoding=\"async\" aria-describedby=\"caption-attachment-14776\" class=\"wp-image-14776 size-full\" src=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Kafka-Cluster.png\" alt=\"Working of Apache Kafka Consumer\" width=\"1200\" height=\"628\" srcset=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Kafka-Cluster.png 1200w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Kafka-Cluster-150x79.png 150w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Kafka-Cluster-300x157.png 300w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Kafka-Cluster-768x402.png 768w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Kafka-Cluster-1024x536.png 1024w\" sizes=\"auto, (max-width: 1200px) 100vw, 1200px\" \/><\/a><p id=\"caption-attachment-14776\" class=\"wp-caption-text\">Working of Apache Kafka Consumer<\/p><\/div>\n<p><span style=\"font-weight: 400\">To subscribe to one or more topics and process the stream of records produced to them in an application, we use this Kafka Consumer API. In order words, we use KafkaConsumer API to consume messages from the Kafka cluster. Moreover, below see the KafkaConsumer class constructor.<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">public KafkaConsumer(java.util.Map&lt;java.lang.String,java.lang.Object&gt; configs)<\/pre>\n<ul>\n<li><b>configs <\/b><\/li>\n<\/ul>\n<p><span style=\"font-weight: 400\">Return a map of consumer configs.<\/span><br \/>\n<span style=\"font-weight: 400\">There are \u00a0following significant methods of KafkaConsumer class:<\/span><br \/>\n<strong>1.\u00a0public java.util.Set&lt;TopicPar-tition&gt; assignment()<\/strong><br \/>\n<span style=\"font-weight: 400\">To get the set of partitions currently assigned by the consumer.<\/span><\/p>\n<p><strong>2.\u00a0public string subscription()<\/strong><br \/>\n<span style=\"font-weight: 400\">In order to subscribe to the given list of topics to get dynamically assigned partitions.<\/span><\/p>\n<p><strong>3.\u00a0public void sub-scribe(java.util.List&lt;java.lang.String&gt; topics, ConsumerRe-balanceListener listener)<\/strong><br \/>\n<span style=\"font-weight: 400\">Further, to subscribe to the given list of topics to get dynamically assigned partitions.<\/span><\/p>\n<p><strong>4<\/strong>.\u00a0<strong>public void unsubscribe()<\/strong><br \/>\n<span style=\"font-weight: 400\">Now, to unsubscribe the topics from the given list of partitions.<\/span><\/p>\n<p><strong>5.\u00a0public void sub-scribe(java.util.List&lt;java.lang.String&gt; topics)<\/strong><br \/>\n<span style=\"font-weight: 400\">In order to subscribe to the given list of topics to get dynamically assigned partitions. If the given list of topics is empty, it is treated the same as unsubscribe().<\/span><\/p>\n<p><span style=\"font-weight: 400\"><strong>6.\u00a0<\/strong><\/span><strong>public void subscribe(java.util.regex.Pattern pattern, ConsumerRebalanceLis-tener listener)<\/strong><br \/>\nHere, argument pattern refers to the subscribing pattern in the format of regular expression and the listener argument gets notifications from the subscribing pattern.<\/p>\n<p><strong>7. public void as-sign(java.util.List&lt;TopicParti-tion&gt; partitions)<\/strong><br \/>\n<span style=\"font-weight: 400\">To manually assign a list of partitions to the customer. <\/span><\/p>\n<p><strong>8.\u00a0<span style=\"font-family: Verdana, Geneva, sans-serif\">poll()<\/span><\/strong><br \/>\n<span style=\"font-weight: 400\">Fetch data for the topics or partitions specified using one of the subscribe\/assign APIs. This will return the error if the topics are not subscribed before the polling for data. <\/span><\/p>\n<p><strong>9. public void commitSync()<\/strong><br \/>\n<span style=\"font-weight: 400\">In order to commit offsets returned in the last poll() for all the subscribed list of topics and partitions. The same operation is applied to commitAsyn(). <\/span><\/p>\n<p><span style=\"font-weight: 400\"><strong>10.\u00a0<\/strong><\/span><strong><span style=\"font-family: Verdana, Geneva, sans-serif\">public void seek(TopicPartition partition, long offset)<\/span><\/strong><br \/>\n<span style=\"font-weight: 400\">To fetch the current offset value that consumer will use in the next poll() method.<\/span><\/p>\n<p><strong>11. public void resume()<\/strong><br \/>\n<span style=\"font-weight: 400\">In order to resume the paused partitions. <\/span><\/p>\n<p><strong>12. public void wakeup()<\/strong><br \/>\n<span style=\"font-weight: 400\">To wake Up the consumer.<\/span><\/p>\n<h2><span style=\"font-weight: 400\">ConsumerRecord API<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Basically, to receive records from the Kafka cluster, we use the ConsumerRecord API. It includes a topic name, partition number, from which the record is being received also an offset that points to the record in a Kafka partition. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Moreover, to create a consumer record with specific topic name, partition count and &lt;key, value&gt; pairs, we use consumerRecord class. Its <strong>signature<\/strong> is:<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">public ConsumerRecord(string topic,int partition, long offset,K key, V value)<\/pre>\n<ul>\n<li style=\"font-weight: 400\"><strong>Topic <\/strong><\/li>\n<\/ul>\n<p><span style=\"font-weight: 400\">The topic name for consumer record received from the Kafka cluster.<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><strong>Partition <\/strong><\/li>\n<\/ul>\n<p><span style=\"font-weight: 400\">Partition for the topic.<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><strong>Key <\/strong><\/li>\n<\/ul>\n<p><span style=\"font-weight: 400\">The key of the record, if no key exists null will be returned.<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><strong>Value <\/strong><\/li>\n<\/ul>\n<p><span style=\"font-weight: 400\">Record contents.<\/span><\/p>\n<h2><span style=\"font-weight: 400\">ConsumerRecords API<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Basically, it is a container for ConsumerRecord. To keep the list of ConsumerRecord per partition for a particular topic we use this API. Its <strong>Constructor<\/strong> is:<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">public ConsumerRecords(java.util.Map&lt;TopicPartition,java.util.List\n&lt;Consumer-Record&gt;K,V&gt;&gt;&gt; records)<\/pre>\n<ul>\n<li style=\"font-weight: 400\"><strong>TopicPartition <\/strong><\/li>\n<\/ul>\n<p><span style=\"font-weight: 400\">To return a map of partition for a particular topic.<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><strong>Records <\/strong><\/li>\n<\/ul>\n<p><span style=\"font-weight: 400\">To return the list of ConsumerRecord.<\/span><br \/>\n<span style=\"font-weight: 400\">These are the following methods of a ConsumerRecords class:<\/span><\/p>\n<p><span style=\"font-weight: 400\"><strong>1.<\/strong>\u00a0<\/span><strong style=\"font-family: Verdana, Geneva, sans-serif\">public int count()<\/strong><br \/>\n<span style=\"font-weight: 400\">The number of records for all the topics.<\/span><br \/>\n<span style=\"font-weight: 400\"><strong>2.<\/strong>\u00a0<\/span><strong style=\"font-family: Verdana, Geneva, sans-serif\">public Set partitions()<\/strong><br \/>\n<span style=\"font-weight: 400\">The set of partitions with data in this recordset (if no data was returned then the set is empty).<\/span><br \/>\n<span style=\"font-weight: 400\"><strong>3.\u00a0<\/strong><\/span><strong style=\"font-family: Verdana, Geneva, sans-serif\">public Iterator iterator()<\/strong><br \/>\n<span style=\"font-weight: 400\">Generally, iterator enables you to cycle through a collection, obtaining or removing elements.<\/span><br \/>\n<span style=\"font-weight: 400\"><strong>4.<\/strong>\u00a0<\/span><strong style=\"font-family: Verdana, Geneva, sans-serif\">public List records()<\/strong><br \/>\n<span style=\"font-weight: 400\">Basically, get the list of records for the given partition.<\/span><\/p>\n<h2>ConsumerRecord API vs ConsumerRecords API<\/h2>\n<p><strong>a. ConsumerRecord API<\/strong><br \/>\nConsumerRecord API is a key\/value pair to be received from Kafka.\u00a0It contains a topic name and a partition number, from which the record is being received and an offset that points to the record in a Kafka partition.<\/p>\n<p><strong>b. ConsumerRecords API<\/strong><br \/>\nWhereas, ConsumerRecords API is a container that holds the list ConsumerRecord per partition for a particular topic. Basically, there is one ConsumerRecord list for every topic partition returned by a Consumer.poll(long) operation.<\/p>\n<h2><span style=\"font-weight: 400\">Configuration Settings<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Here, we are listing the configuration settings for the Consumer client API \u2212<\/span><br \/>\n<span style=\"font-weight: 400\"><strong>1.\u00a0<\/strong><\/span><strong><span style=\"font-family: Verdana, Geneva, sans-serif\">bootstrap.servers<\/span><\/strong><br \/>\n<span style=\"font-weight: 400\">It bootstraps list of brokers.<\/span><br \/>\n<strong>2.\u00a0<span style=\"font-family: Verdana, Geneva, sans-serif\">group.id<\/span><\/strong><br \/>\n<span style=\"font-weight: 400\">To assign an individual consumer to a group.<\/span><br \/>\n<span style=\"font-weight: 400\"><strong>3.\u00a0<\/strong><\/span><strong><span style=\"font-family: Verdana, Geneva, sans-serif\">enable.auto.commit<\/span><\/strong><br \/>\n<span style=\"font-weight: 400\">Basically, it enables auto-commit for offsets if the value is true, otherwise not committed.<\/span><br \/>\n<strong>4.\u00a0<span style=\"font-family: Verdana, Geneva, sans-serif\">auto.commit.interval.ms<\/span><\/strong><br \/>\n<span style=\"font-weight: 400\">Basically, it returns how often updated consumed offsets are written to <strong>ZooKeeper<\/strong>.<\/span><br \/>\n<strong>5.\u00a0<span style=\"font-family: Verdana, Geneva, sans-serif\">session.timeout.ms<\/span><\/strong><br \/>\n<span style=\"font-weight: 400\">It indicates how many milliseconds Kafka will wait for the ZooKeeper to respond to a request (read or write) before giving up and continuing to consume messages.<\/span><\/p>\n<h2><span style=\"font-weight: 400\">SimpleConsumer Application<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Make sure the producer application steps remain the same here. Here also start your ZooKeeper and Kafka broker. Further, create a SimpleConsumer application with the <strong>java class<\/strong> named SimpleCon-sumer.java. Then type the following <strong>code<\/strong>:<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">import java.util.Properties;\nimport java.util.Arrays;\nimport org.apache.kafka.clients.consumer.KafkaConsumer;\nimport org.apache.kafka.clients.consumer.ConsumerRecords;\nimport org.apache.kafka.clients.consumer.ConsumerRecord;\npublic class SimpleConsumer {\n  public static void main(String[] args) throws Exception {\n     if(args.length == 0){\n        System.out.println(\"Enter topic name\");\n        return;\n     }\n     \/\/Kafka consumer configuration settings\n     String topicName = args[0].toString();\n     Properties props = new Properties();\n     props.put(\"bootstrap.servers\", \"localhost:9092\");\n     props.put(\"group.id\", \"test\");\n     props.put(\"enable.auto.commit\", \"true\");\n     props.put(\"auto.commit.interval.ms\", \"1000\");\n     props.put(\"session.timeout.ms\", \"30000\");\n     props.put(\"key.deserializer\",\n        \"org.apache.kafka.common.serializa-tion.StringDeserializer\");\n     props.put(\"value.deserializer\",\n        \"org.apache.kafka.common.serializa-tion.StringDeserializer\");\n     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer\n        &lt;String, String&gt;(props);\n     \/\/Kafka Consumer subscribes list of topics here.\n     consumer.subscribe(Arrays.asList(topicName))\n     \/\/print the topic name\n     System.out.println(\"Subscribed to topic \" + topicName);\n     int i = 0;\n     while (true) {\n        ConsumerRecords&lt;String, String&gt; records = con-sumer.poll(100);\n        for (ConsumerRecord&lt;String, String&gt; record : records)\n        \/\/ print the offset,key and value for the consumer records.\n        System.out.printf(\"offset = %d, key = %s, value = %s\\n\",\n           record.offset(), record.key(), record.value());\n     }\n  }\n}<\/pre>\n<h3>\u00a0a. Compilation<\/h3>\n<p><span style=\"font-weight: 400\">By using the following <strong>command<\/strong> we can compile the application.<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">javac -cp \u201c\/path\/to\/kafka\/kafka_2.11-0.9.0.0\/lib\/*\u201d *.java<\/pre>\n<h3>\u00a0b. Execution<\/h3>\n<p><span style=\"font-weight: 400\">Moreover, using the following <strong>command<\/strong> we can execute the application.<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">java -cp \u201c\/path\/to\/kafka\/kafka_2.11-0.9.0.0\/lib\/*\u201d:. SimpleConsumer &lt;topic-name&gt;<\/pre>\n<h3>\u00a0c. Input<\/h3>\n<p><span style=\"font-weight: 400\">Further, open the producer CLI and send some messages to the topic. We can put the simple input as \u2018Hello Consumer\u2019.\u00a0 <\/span><\/p>\n<h3><span style=\"font-weight: 400\">d.\u00a0<\/span>Output<\/h3>\n<p><span style=\"font-weight: 400\">The <strong>output<\/strong> is<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">Subscribed to topic Hello-Kafka\noffset = 3, key = null, value = Hello Consumer<\/pre>\n<h2>Kafka Consumer Group<\/h2>\n<p><span style=\"font-weight: 400\">Basically, Consumer group in Kafka is a multi-threaded or multi-machine consumption from Kafka topics.<\/span><\/p>\n<div id=\"attachment_14783\" style=\"width: 1210px\" class=\"wp-caption aligncenter\"><a href=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Consumer-Group-1.png\"><img loading=\"lazy\" decoding=\"async\" aria-describedby=\"caption-attachment-14783\" class=\"wp-image-14783 size-full\" src=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Consumer-Group-1.png\" alt=\"Kafka Consumer- Kafka Consumer Group\" width=\"1200\" height=\"628\" srcset=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Consumer-Group-1.png 1200w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Consumer-Group-1-150x79.png 150w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Consumer-Group-1-300x157.png 300w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Consumer-Group-1-768x402.png 768w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/04\/Consumer-Group-1-1024x536.png 1024w\" sizes=\"auto, (max-width: 1200px) 100vw, 1200px\" \/><\/a><p id=\"caption-attachment-14783\" class=\"wp-caption-text\">Kafka Consumer- Kafka Consumer Group<\/p><\/div>\n<ul>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">By using the same group.id, Consumers can join a group.<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">The maximum parallelism of a group is that the number of consumers in the group \u2190 numbers of partitions.<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Moreover, Kafka assigns the partitions of a topic to the consumer in a group. Hence, each partition is consumed by exactly one consumer in the group.<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Also, Kafka guarantees that a message is only ever read by a single consumer in the group.<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Consumers can see the message in the order they were stored in the log.<\/span><\/li>\n<\/ul>\n<p><strong>a. Re-balancing of a Consumer<\/strong><\/p>\n<p><span style=\"font-weight: 400\">Basically, an addition of more processes\/threads will cause Kafka to re-balance. Basically, if somehow any consumer or broker fails to send heartbeat to ZooKeeper, then it can be re-configured via the Kafka cluster. Also, Kafka will assign available partitions to the available threads, possibly moving a partition to another process, during this re-balance.<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">import java.util.Properties;\nimport java.util.Arrays;\nimport org.apache.kafka.clients.consumer.KafkaConsumer;\nimport org.apache.kafka.clients.consumer.ConsumerRecords;\nimport org.apache.kafka.clients.consumer.ConsumerRecord;\npublic class ConsumerGroup {\n  public static void main(String[] args) throws Exception {\n     if(args.length &lt; 2){\n        System.out.println(\"Usage: consumer &lt;topic&gt; &lt;groupname&gt;\");\n        return;\n     }\n     String topic = args[0].toString();\n     String group = args[1].toString();\n     Properties props = new Properties();\n     props.put(\"bootstrap.servers\", \"localhost:9092\");\n     props.put(\"group.id\", group);\n     props.put(\"enable.auto.commit\", \"true\");\n     props.put(\"auto.commit.interval.ms\", \"1000\");\n     props.put(\"session.timeout.ms\", \"30000\");\n     props.put(\"key.deserializer\",\n        \"org.apache.kafka.common.serializa-tion.StringDeserializer\");\n     props.put(\"value.deserializer\",\n        \"org.apache.kafka.common.serializa-tion.StringDeserializer\");\n     KafkaConsumer&lt;String, String&gt; consumer = new KafkaConsumer&lt;String, String&gt;(props);\n     consumer.subscribe(Arrays.asList(topic));\n     System.out.println(\"Subscribed to topic \" + topic);\n     int i = 0;\n     while (true) {\n        ConsumerRecords&lt;String, String&gt; records = con-sumer.poll(100);\n           for (ConsumerRecord&lt;String, String&gt; record : records)\n              System.out.printf(\"offset = %d, key = %s, value = %s\\n\",\n              record.offset(), record.key(), record.value());\n     }\n  }\n}\nii. Compilation\njavac -cp \u201c\/path\/to\/kafka\/kafka_2.11-0.9.0.0\/libs\/*\" ConsumerGroup.java\niii. Execution\n&gt;&gt;java -cp \u201c\/path\/to\/kafka\/kafka_2.11-0.9.0.0\/libs\/*\":.\nConsumerGroup &lt;topic-name&gt; my-group\n&gt;&gt;java -cp \"\/home\/bala\/Workspace\/kafka\/kafka_2.11-0.9.0.0\/libs\/*\":.\nConsumerGroup &lt;topic-name&gt; my-group<\/pre>\n<p>Hence, we can see the sample group we have created the name, my-group with two consumers.<\/p>\n<p><strong>b. Input<\/strong><\/p>\n<p><span style=\"font-weight: 400\">Now, after opening producer CLI, send some messages like-<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">Test consumer group 01\nTest consumer group 02<\/pre>\n<p><strong>c. The output of the First Process<\/strong><\/p>\n<pre class=\"EnlighterJSRAW\">Subscribed to topic Hello-kafka\noffset = 3, key = null, value = Test consumer group 01<\/pre>\n<p><strong>d. Further, the output of the Second Process<\/strong><\/p>\n<pre class=\"EnlighterJSRAW\">Subscribed to topic Hello-kafka\noffset = 3, key = null, value = Test consumer group 02<\/pre>\n<p>So, this was all about Apache Kafka Consumer and Consumer group in Kafka with examples. Hope you like our explanation.<\/p>\n<h2><span style=\"font-weight: 400\">Conclusion: Kafka Consumer<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Hence, we have seen Kafka Consumer and ConsumerGroup by using the Java client demo in detail. Also, by this, we have an idea about how to send and receive messages using a Java client. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Moreover, we discussed Kafka Consumer record API and Consumer Records API and also the comparison of both. In addition, we have learned configuration setting for Kafka Consumer client API. However, if any doubt occurs, feel free to ask in the comment section.<\/span><\/p>\n","protected":false},"excerpt":{"rendered":"<p>In our last article, we discussed Kafka Producer. Today, we will discuss Kafka Consumer. Firstly, we will see what is Kafka Consumer and example of Kafka Consumer. Afterward, we will learn Kafka Consumer Group.&#46;&#46;&#46;<\/p>\n","protected":false},"author":5,"featured_media":15530,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[9],"tags":[833,2873,3008,7871,7872,7874,8692,11420,15800],"class_list":["post-14722","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-kafka","tag-apache-kafka-consumer","tag-configuration-setting","tag-cosumer-class","tag-kafka-consumer","tag-kafka-consumer-class","tag-kafka-consumer-example","tag-methods-of-kafka-consumer-class","tag-rebalancing-of-consumer","tag-what-is-kafka-consumer"],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v27.4 - https:\/\/yoast.com\/product\/yoast-seo-wordpress\/ -->\n<title>Apache Kafka Consumer | Kafka Consumer Group - DataFlair<\/title>\n<meta name=\"description\" content=\"Apache Kafka Consumer with example,Kafka Consumer Class,Comparison of Consumer Record API-Consumer Records API,configuration setting,Consumer Group in Kafka\" \/>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/data-flair.training\/blogs\/kafka-consumer\/\" \/>\n<meta property=\"og:locale\" content=\"en_US\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"Apache Kafka Consumer | Kafka Consumer Group - DataFlair\" \/>\n<meta property=\"og:description\" content=\"Apache Kafka Consumer with example,Kafka Consumer Class,Comparison of Consumer Record API-Consumer Records API,configuration setting,Consumer Group in Kafka\" \/>\n<meta property=\"og:url\" content=\"https:\/\/data-flair.training\/blogs\/kafka-consumer\/\" \/>\n<meta property=\"og:site_name\" content=\"DataFlair\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/DataFlairWS\/\" \/>\n<meta property=\"article:published_time\" content=\"2018-04-28T13:25:53+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Apache-Kafka-Consumer-01.jpg\" \/>\n\t<meta property=\"og:image:width\" content=\"1200\" \/>\n\t<meta property=\"og:image:height\" content=\"628\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/jpeg\" \/>\n<meta name=\"author\" content=\"DataFlair Team\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:creator\" content=\"@DataFlairWS\" \/>\n<meta name=\"twitter:site\" content=\"@DataFlairWS\" \/>\n<meta name=\"twitter:label1\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data1\" content=\"DataFlair Team\" \/>\n\t<meta name=\"twitter:label2\" content=\"Est. reading time\" \/>\n\t<meta name=\"twitter:data2\" content=\"9 minutes\" \/>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"Apache Kafka Consumer | Kafka Consumer Group - DataFlair","description":"Apache Kafka Consumer with example,Kafka Consumer Class,Comparison of Consumer Record API-Consumer Records API,configuration setting,Consumer Group in Kafka","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/","og_locale":"en_US","og_type":"article","og_title":"Apache Kafka Consumer | Kafka Consumer Group - DataFlair","og_description":"Apache Kafka Consumer with example,Kafka Consumer Class,Comparison of Consumer Record API-Consumer Records API,configuration setting,Consumer Group in Kafka","og_url":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/","og_site_name":"DataFlair","article_publisher":"https:\/\/www.facebook.com\/DataFlairWS\/","article_published_time":"2018-04-28T13:25:53+00:00","og_image":[{"width":1200,"height":628,"url":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Apache-Kafka-Consumer-01.jpg","type":"image\/jpeg"}],"author":"DataFlair Team","twitter_card":"summary_large_image","twitter_creator":"@DataFlairWS","twitter_site":"@DataFlairWS","twitter_misc":{"Written by":"DataFlair Team","Est. reading time":"9 minutes"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/#article","isPartOf":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/"},"author":{"name":"DataFlair Team","@id":"https:\/\/data-flair.training\/blogs\/#\/schema\/person\/7f83c342f5d1632d6f7b4b0b0f447823"},"headline":"Apache Kafka Consumer | Kafka Consumer Group","datePublished":"2018-04-28T13:25:53+00:00","mainEntityOfPage":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/"},"wordCount":1495,"commentCount":0,"publisher":{"@id":"https:\/\/data-flair.training\/blogs\/#organization"},"image":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/#primaryimage"},"thumbnailUrl":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Apache-Kafka-Consumer-01.jpg","keywords":["Apache Kafka Consumer","Configuration Setting","Cosumer class","Kafka consumer","Kafka Consumer Class","Kafka Consumer Example","methods of kafka consumer class","Rebalancing of Consumer","what is kafka Consumer"],"articleSection":["Apache Kafka Tutorials"],"inLanguage":"en-US","potentialAction":[{"@type":"CommentAction","name":"Comment","target":["https:\/\/data-flair.training\/blogs\/kafka-consumer\/#respond"]}]},{"@type":"WebPage","@id":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/","url":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/","name":"Apache Kafka Consumer | Kafka Consumer Group - DataFlair","isPartOf":{"@id":"https:\/\/data-flair.training\/blogs\/#website"},"primaryImageOfPage":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/#primaryimage"},"image":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/#primaryimage"},"thumbnailUrl":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Apache-Kafka-Consumer-01.jpg","datePublished":"2018-04-28T13:25:53+00:00","description":"Apache Kafka Consumer with example,Kafka Consumer Class,Comparison of Consumer Record API-Consumer Records API,configuration setting,Consumer Group in Kafka","breadcrumb":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/#breadcrumb"},"inLanguage":"en-US","potentialAction":[{"@type":"ReadAction","target":["https:\/\/data-flair.training\/blogs\/kafka-consumer\/"]}]},{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/#primaryimage","url":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Apache-Kafka-Consumer-01.jpg","contentUrl":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Apache-Kafka-Consumer-01.jpg","width":1200,"height":628,"caption":"Apache Kafka Consumer"},{"@type":"BreadcrumbList","@id":"https:\/\/data-flair.training\/blogs\/kafka-consumer\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Blog Home","item":"https:\/\/data-flair.training\/blogs\/"},{"@type":"ListItem","position":2,"name":"Apache Kafka Tutorials","item":"https:\/\/data-flair.training\/blogs\/category\/kafka\/"},{"@type":"ListItem","position":3,"name":"Apache Kafka Consumer | Kafka Consumer Group"}]},{"@type":"WebSite","@id":"https:\/\/data-flair.training\/blogs\/#website","url":"https:\/\/data-flair.training\/blogs\/","name":"DataFlair","description":"Learn Today. Lead Tomorrow.","publisher":{"@id":"https:\/\/data-flair.training\/blogs\/#organization"},"potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/data-flair.training\/blogs\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"en-US"},{"@type":"Organization","@id":"https:\/\/data-flair.training\/blogs\/#organization","name":"DataFlair","url":"https:\/\/data-flair.training\/blogs\/","logo":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/data-flair.training\/blogs\/#\/schema\/logo\/image\/","url":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2016\/07\/Data-Flair.png","contentUrl":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2016\/07\/Data-Flair.png","width":106,"height":48,"caption":"DataFlair"},"image":{"@id":"https:\/\/data-flair.training\/blogs\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/DataFlairWS\/","https:\/\/x.com\/DataFlairWS","https:\/\/www.linkedin.com\/company\/dataflair-web-services-pvt-ltd\/","https:\/\/www.youtube.com\/user\/DataFlairWS"]},{"@type":"Person","@id":"https:\/\/data-flair.training\/blogs\/#\/schema\/person\/7f83c342f5d1632d6f7b4b0b0f447823","name":"DataFlair Team","image":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/secure.gravatar.com\/avatar\/4cf3a74600d131330b8c481d519afd1574093ed89f6d3396a95393ad223eb7cd?s=96&d=mm&r=g","url":"https:\/\/secure.gravatar.com\/avatar\/4cf3a74600d131330b8c481d519afd1574093ed89f6d3396a95393ad223eb7cd?s=96&d=mm&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/4cf3a74600d131330b8c481d519afd1574093ed89f6d3396a95393ad223eb7cd?s=96&d=mm&r=g","caption":"DataFlair Team"},"description":"DataFlair Team creates expert-level guides on programming, Java, Python, C++, DSA, AI, ML, data Science, Android, Flutter, MERN, Web Development, and technology. Our goal is to empower learners with easy-to-understand content. Explore our resources for career growth and practical learning.","url":"https:\/\/data-flair.training\/blogs\/author\/dfteam1\/"}]}},"amp_enabled":true,"_links":{"self":[{"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/posts\/14722","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/users\/5"}],"replies":[{"embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/comments?post=14722"}],"version-history":[{"count":0,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/posts\/14722\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/media\/15530"}],"wp:attachment":[{"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/media?parent=14722"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/categories?post=14722"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/tags?post=14722"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}