

{"id":16345,"date":"2018-06-05T06:00:42","date_gmt":"2018-06-05T06:00:42","guid":{"rendered":"https:\/\/data-flair.training\/blogs\/?p=16345"},"modified":"2018-06-05T06:00:42","modified_gmt":"2018-06-05T06:00:42","slug":"kafka-clients","status":"publish","type":"post","link":"https:\/\/data-flair.training\/blogs\/kafka-clients\/","title":{"rendered":"How to Create Kafka Clients: Avro Producer &amp; Consumer Client"},"content":{"rendered":"<p><span style=\"font-weight: 400\">In this article of Kafka clients, we will learn to create <strong>Apache Kafka<\/strong> clients by using Kafka API. There are several ways of creating Kafka clients such as at-most-once, at-least-once, and exactly-once message processing needs. <\/span><\/p>\n<p><span style=\"font-weight: 400\">So, in this Kafka Clients tutorial, we&#8217;ll learn\u00a0the detailed description of all three ways. Moreover, we will see how to use the Avro client in detail.<\/span><\/p>\n<p>So, let&#8217;s start Kafka Client Tutorial.<\/p>\n<h2><span style=\"font-weight: 400\">What are Kafka Clients?<\/span><\/h2>\n<ul>\n<li style=\"font-weight: 400\"><strong>Prerequisites to create\u00a0Kafka clients<\/strong><\/li>\n<\/ul>\n<ol>\n<li><span style=\"font-weight: 400\">Initially, for creating Kafka Clients, we have to setup Apache Kafka middleware on our local machine.\u00a0<\/span><\/li>\n<li><span style=\"font-weight: 400\">Moreover, before starting to create Kafka clients, a locally installed single node Kafka instance must run\u00a0on our local machine along with a r<\/span>unning Zookeeper and a r<span style=\"font-weight: 400\">unning Kafka node.<\/span><\/li>\n<\/ol>\n<p><span style=\"font-weight: 400\">Further, in Kafka Clients to create a topic named normal-topic with two partitions the command is:<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">bin\/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1<\/pre>\n<p><span style=\"font-weight: 400\">Further, execute the following command, to check the status of the created topic:<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">bin\/kafka-topics --list --topic normal-topic --zookeeper localhost:2181<\/pre>\n<p><span style=\"font-weight: 400\">Also, to increase the partition if the topic needs to be altered, execute the following command:<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">bin\/kafka-topics.sh --alter --topic normal-topic --zookeeper localhost:2181 --partitions 2<\/pre>\n<h2><span style=\"font-weight: 400\">Kafka Producer Client<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Here, is the following code to implement a <strong>Kafka producer<\/strong> client. It will help to send text messages and also to adjust the loop in order to control the number of messages that need to be sent to create Kafka Clients:<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">public class ProducerExample {\n   public static void main(String[] str) throws InterruptedException, IOException {\n           System.out.println(\"Starting ProducerExample ...\");\n           sendMessages();\n   }\n   private static void sendMessages() throws InterruptedException, IOException {\n           Producer&lt;String, String&gt; producer = createProducer();\n           sendMessages(producer);\n           \/\/ Allow the producer to complete sending of the messages before program exit.\n           Thread.sleep(20);\n   }\n   private static Producer&lt;String, String&gt; createProducer() {\n       Properties props = new Properties();\n       props.put(\"bootstrap.servers\", \"localhost:9092\");\n       props.put(\"acks\", \"all\");\n       props.put(\"retries\", 0);\n       \/\/ Controls how much bytes sender would wait to batch up before publishing to Kafka.\n       props.put(\"batch.size\", 10);\n       props.put(\"linger.ms\", 1);\n       props.put(\"key.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\");\n       props.put(\"value.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\");\n       return new KafkaProducer(props);\n   }\n   private static void sendMessages(Producer&lt;String, String&gt; producer) {\n       String topic = \"normal-topic\";\n       int partition = 0;\n       long record = 1;\n       for (int i = 1; i &lt;= 10; i++) {\n           producer.send(\n               new ProducerRecord&lt;String, String&gt;(topic, partition,                                 Long.toString(record),Long.toString(record++)));\n       }\n   }\n}<\/pre>\n<h2><span style=\"font-weight: 400\">Consumer Can Register With Kafka<\/span><\/h2>\n<p><span style=\"font-weight: 400\">At first, let\u2019s learn several ways, by which a Kafka consumer client can register with a <strong>Kafka broker<\/strong>. Specifically, there are two methods, either using the subscribe method call or using an assign method call. Let\u2019s learn both these Kafka Clients methods in detail.<\/span><\/p>\n<h3>a. Using the Subscribe Method Call<\/h3>\n<p><span style=\"font-weight: 400\">When using a subscribe method call, Kafka automatically rebalances the available consumers at the time of topic or partition gets added\/deleted, or at the time a consumer gets added or deleted.<\/span><\/p>\n<h3>b. Using the Assign Method Call.<\/h3>\n<p><span style=\"font-weight: 400\">However, Kafka clients do not offer an automatic re-balance of the consumers, when a consumer is registered with an assign method call.<\/span><\/p>\n<p><span style=\"font-weight: 400\">Either of the above registration options can be used by at-most-once, at-least-once or exactly-once consumers.<\/span><\/p>\n<p><strong>i. At-most-once Kafka Consumer (Zero or More Deliveries)<\/strong><br \/>\n<span style=\"font-weight: 400\">Basically, it is the default behavior of a <strong>Kafka Consumer<\/strong>.<\/span><br \/>\n<span style=\"font-weight: 400\">In order to configure this type of consumer in Kafka Clients, follow these steps:<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">First, set \u2018enable.auto.commit\u2019 to true.<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Also, set \u2018auto.commit.interval.ms\u2019 to a lower timeframe.<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Make sure, don\u2019t make calls to consumer.commitSync(); from the consumer. Moreover, \u00a0Kafka would auto-commit offset at the specified interval, with this configuration of the consumer.<\/span><\/li>\n<\/ul>\n<p><span style=\"font-weight: 400\">However, there is a possibility that consumer could exhibit at-most-once or at-least-once behavior, while a consumer is configured this way. Although, let\u2019s declare this consumer as at-most-once because at-most-once is the lower messaging guarantee. Let\u2019s discuss both consumer behaviors in detail:<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><strong>At-most-once scenario<\/strong><\/li>\n<\/ul>\n<p><span style=\"font-weight: 400\">The moment when commit interval has occurred, and also which triggers Kafka to automatically commit the last used offset, this scenario h<\/span><span style=\"font-weight: 400\">appens. However, let\u2019s suppose the messages and consumer have crashed between the processing. Then it starts to receive messages from the last committed offset when consumer restarts. Meanwhile, a consumer could lose a few messages.<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><strong>At-least-once scenario <\/strong><\/li>\n<\/ul>\n<p><span style=\"font-weight: 400\">While consumer processes a message and commits the message into its persistent store and consumer crashes at that point this scenario happens. However, let\u2019s suppose Kafka could not get a chance to commit the offset to the broker since commit interval has not passed. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Then, it gets delivered with a few older messages from the last committed offset when the consumer restarts.<\/span><\/p>\n<p><strong>Code for Kafka Consumer:<\/strong><\/p>\n<pre class=\"EnlighterJSRAW\">public class AtMostOnceConsumer {\n       public static void main(String[] str) throws InterruptedException {\n           System.out.println(\"Starting  AtMostOnceConsumer ...\");\n           execute();\n       }\n       private static void execute() throws InterruptedException {\n               KafkaConsumer&lt;String, String&gt; consumer = createConsumer();\n               \/\/ Subscribe to all partition in that topic. 'assign' could be used here\n               \/\/ instead of 'subscribe' to subscribe to specific partition.\n               consumer.subscribe(Arrays.asList(\"normal-topic\"));\n               processRecords(consumer);\n       }\n       private static KafkaConsumer&lt;String, String&gt; createConsumer() {\n               Properties props = new Properties();\n               props.put(\"bootstrap.servers\", \"localhost:9092\");\n               String consumeGroup = \"cg1\";\n               props.put(\"group.id\", consumeGroup);\n               \/\/ Set this property, if auto commit should happen.\n               props.put(\"enable.auto.commit\", \"true\");\n               \/\/ Auto commit interval, kafka would commit offset at this interval.\n               props.put(\"auto.commit.interval.ms\", \"101\");\n               \/\/ This is how to control number of records being read in each poll\n               props.put(\"max.partition.fetch.bytes\", \"135\");\n               \/\/ Set this if you want to always read from beginning.\n               \/\/ props.put(\"auto.offset.reset\", \"earliest\");\n               props.put(\"heartbeat.interval.ms\", \"3000\");\n               props.put(\"session.timeout.ms\", \"6001\");\n               props.put(\"key.deserializer\",\n                       \"org.apache.kafka.common.serialization.StringDeserializer\");\n               props.put(\"value.deserializer\",\n                       \"org.apache.kafka.common.serialization.StringDeserializer\");\n               return new KafkaConsumer&lt;String, String&gt;(props);\n       }\n       private static void processRecords(KafkaConsumer&lt;String, String&gt; consumer)  {\n               while (true) {\n                       ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);\n                       long lastOffset = 0;\n                       for (ConsumerRecord&lt;String, String&gt; record : records) {\n                               System.out.printf(\"\\n\\roffset = %d, key = %s, value = %s\", record.offset(),                                             record.key(), record.value());\n                              lastOffset = record.offset();\n                        }\n               System.out.println(\"lastOffset read: \" + lastOffset);\n               process();\n               }\n       }\n       private static void process() throws InterruptedException {\n               \/\/ create some delay to simulate processing of the message.\n               Thread.sleep(20);\n       }\n}<\/pre>\n<p><strong>ii. At-least-once Kafka Consumer (One or More Message Deliveries, Duplicate Possible)<\/strong><br \/>\n<span style=\"font-weight: 400\">In order to configure this type of consumer, follow these steps:<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">First, set \u2018enable.auto.commit\u2019 to false \u00a0or<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Also, set \u2018enable.auto.commit\u2019 to true with \u2018auto.commit.interval.ms\u2019 to a higher number.<\/span><\/li>\n<\/ul>\n<p><span style=\"font-weight: 400\">By making the following call consumer.commitSync(), Consumer should now then take control of the message offset commits to Kafka;<\/span><\/p>\n<p><span style=\"font-weight: 400\">In addition, to avoid reprocessing of the duplicate messages, implement \u2018idempotent\u2019 behavior within consumer, especially, for this type of consumer because in the following scenario, duplicate message delivery could happen.<\/span><\/p>\n<p><strong>Code:<\/strong><\/p>\n<pre class=\"EnlighterJSRAW\">public class AtLeastOnceConsumer {\n   public static void main(String[] str) throws InterruptedException {\n           System.out.println(\"Starting AutoOffsetGuranteedAtLeastOnceConsumer ...\");\n           execute();\n    }\n   private static void execute() throws InterruptedException {\n           KafkaConsumer&lt;String, String&gt; consumer = createConsumer();\n           \/\/ Subscribe to all partition in that topic. 'assign' could be used here\n           \/\/ instead of 'subscribe' to subscribe to specific partition.\n           consumer.subscribe(Arrays.asList(\"normal-topic\"));\n           processRecords(consumer);\n    }\n    private static KafkaConsumer&lt;String, String&gt; createConsumer() {\n           Properties props = new Properties();\n           props.put(\"bootstrap.servers\", \"localhost:9092\");\n           String consumeGroup = \"cg1\";\n           props.put(\"group.id\", consumeGroup);\n           \/\/ Set this property, if auto commit should happen.\n           props.put(\"enable.auto.commit\", \"true\");\n           \/\/ Make Auto commit interval to a big number so that auto commit does not happen,\n           \/\/ we are going to control the offset commit via consumer.commitSync(); after processing             \/\/ message.\n           props.put(\"auto.commit.interval.ms\", \"999999999999\");\n           \/\/ This is how to control number of messages being read in each poll\n           props.put(\"max.partition.fetch.bytes\", \"135\");\n           props.put(\"heartbeat.interval.ms\", \"3000\");\n           props.put(\"session.timeout.ms\", \"6001\");\n           props.put(\"key.deserializer\", \"org.apache.kafka.common.serialization.StringDeserializer\");\n           props.put(\"value.deserializer\",\"org.apache.kafka.common.serialization.StringDeserializer\");\n           return new KafkaConsumer&lt;String, String&gt;(props);\n   }\n    private static void processRecords(KafkaConsumer&lt;String, String&gt; consumer) throws {\n           while (true) {\n                   ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);\n                   long lastOffset = 0;\n                   for (ConsumerRecord&lt;String, String&gt; record : records) {\n                       System.out.printf(\"\\n\\roffset = %d, key = %s, value = %s\", record.offset(),                                         record.key(), record.value());\n                       lastOffset = record.offset();\n                   }\n                   System.out.println(\"lastOffset read: \" + lastOffset);\n                   process();\n                   \/\/ Below call is important to control the offset commit. Do this call after you\n                   \/\/ finish processing the business process.\n                   consumer.commitSync();\n           }\n   }\n   private static void process() throws InterruptedException {\n       \/\/ create some delay to simulate processing of the record.\n       Thread.sleep(20);\n   }\n}<\/pre>\n<p><strong>iii. Exactly-once Kafka Dynamic Consumer via Subscribe (One and Only One Message Delivery)<\/strong><br \/>\n<span style=\"font-weight: 400\">Here, via a \u2018subscribe\u2019(1, a) registration method call, a consumer registers with Kafka.<\/span><\/p>\n<p><span style=\"font-weight: 400\">Make sure, the offset should be manually managed in this case. In order to setup exactly-once scenario in Kafka Clients, follow these steps:<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">At first, set enable.auto.commit = false.<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">After processing the message, don\u2019t make calls to consumer.commitSync().<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Moreover, by making a \u2018subscribe\u2019 call, Register consumer to a topic. <\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">In order to start reading from a specific offset of that topic\/partition, implement a ConsumerRebalanceListener. Also, perform consumer.seek(topicPartition, offset), within the listener.<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">As a safety net, implement idempotent.<\/span><\/li>\n<\/ul>\n<p><strong>Code:<\/strong><\/p>\n<pre class=\"EnlighterJSRAW\">public class ExactlyOnceDynamicConsumer {\n      private static OffsetManager offsetManager = new OffsetManager(\"storage2\");\n       public static void main(String[] str) throws InterruptedException {\n               System.out.println(\"Starting ExactlyOnceDynamicConsumer ...\");\n               readMessages();\n       }\n       private static void readMessages() throws InterruptedException {\n               KafkaConsumer&lt;String, String&gt; consumer = createConsumer();\n               \/\/ Manually controlling offset but register consumer to topics to get dynamically\n               \/\/ assigned partitions. Inside MyConsumerRebalancerListener use\n               \/\/ consumer.seek(topicPartition,offset) to control offset which messages to be read.\n               consumer.subscribe(Arrays.asList(\"normal-topic\"),\n                               new MyConsumerRebalancerListener(consumer));\n               processRecords(consumer);\n       }\n       private static KafkaConsumer&lt;String, String&gt; createConsumer() {\n               Properties props = new Properties();\n               props.put(\"bootstrap.servers\", \"localhost:9092\");\n               String consumeGroup = \"cg3\";\n               props.put(\"group.id\", consumeGroup);\n               \/\/ To turn off the auto commit, below is a key setting.\n               props.put(\"enable.auto.commit\", \"false\");\n               props.put(\"heartbeat.interval.ms\", \"2000\");\n               props.put(\"session.timeout.ms\", \"6001\");\n               \/\/ Control maximum data on each poll, make sure this value is bigger than the maximum                   \/\/ single message size\n               props.put(\"max.partition.fetch.bytes\", \"140\");\n               props.put(\"key.deserializer\",                                 \"org.apache.kafka.common.serialization.StringDeserializer\");\n               props.put(\"value.deserializer\",                         \"org.apache.kafka.common.serialization.StringDeserializer\");\n               return new KafkaConsumer&lt;String, String&gt;(props);\n       }\n       private static void processRecords(KafkaConsumer&lt;String, String&gt; consumer)\n           while (true) {\n                   ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);\n                   for (ConsumerRecord&lt;String, String&gt; record : records) {\n                           System.out.printf(\"offset = %d, key = %s, value = %s\\n\", record.offset(),                                     record.key(), record.value());\n                           \/\/ Save processed offset in external storage.\n                           offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                             record.offset());\n                   }\n              }\n       }\n}\npublic class MyConsumerRebalancerListener implements                                 org.apache.kafka.clients.consumer.ConsumerRebalanceListener {\n       private OffsetManager offsetManager = new OffsetManager(\"storage2\");\n       private Consumer&lt;String, String&gt; consumer;\n       public MyConsumerRebalancerListener(Consumer&lt;String, String&gt; consumer) {\n               this.consumer = consumer;\n       }\n       public void onPartitionsRevoked(Collection&lt;TopicPartition&gt; partitions) {\n               for (TopicPartition partition : partitions) {\n                   offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),                     consumer.position(partition));\n               }\n       }\n       public void onPartitionsAssigned(Collection&lt;TopicPartition&gt; partitions) {\n               for (TopicPartition partition : partitions) {\n                       consumer.seek(partition,                             offsetManager.readOffsetFromExternalStore(partition.topic(),                             partition.partition()));\n               }\n       }\n}\n\/**\n* The partition offset are stored in an external storage. In this case in a local file system where\n* program runs.\n*\/\npublic class OffsetManager {\n       private String storagePrefix;\n       public OffsetMpublic class ExactlyOnceDynamicConsumer {\n      private static OffsetManager offsetManager = new OffsetManager(\"storage2\");\n       public static void main(String[] str) throws InterruptedException {\n               System.out.println(\"Starting ExactlyOnceDynamicConsumer ...\");\n               readMessages();\n       }\n       private static void readMessages() throws InterruptedException {\n               KafkaConsumer&lt;String, String&gt; consumer = createConsumer()\n               \/\/ Manually controlling offset but register consumer to topics to get dynamically\n               \/\/ assigned partitions. Inside MyConsumerRebalancerListener use\n               \/\/ consumer.seek(topicPartition,offset) to control offset which messages to be read.\n               consumer.subscribe(Arrays.asList(\"normal-topic\"),\n                              new MyConsumerRebalancerListener(consumer));\n               processRecords(consumer);\n       }\n       private static KafkaConsumer&lt;String, String&gt; createConsumer() {\n               Properties props = new Properties();\n               props.put(\"bootstrap.servers\", \"localhost:9092\");\n               String consumeGroup = \"cg3\";\n               props.put(\"group.id\", consumeGroup);\n               \/\/ To turn off the auto commit, below is a key setting.\n               props.put(\"enable.auto.commit\", \"false\");\n               props.put(\"heartbeat.interval.ms\", \"2000\");\n               props.put(\"session.timeout.ms\", \"6001\");\n               \/\/ Control maximum data on each poll, make sure this value is bigger than the maximum                   \/\/ single message size\n               props.put(\"max.partition.fetch.bytes\", \"140\");\n               props.put(\"key.deserializer\",                                 \"org.apache.kafka.common.serialization.StringDeserializer\");\n               props.put(\"value.deserializer\",                         \"org.apache.kafka.common.serialization.StringDeserializer\");\n               return new KafkaConsumer&lt;String, String&gt;(props);\n       }\n       private static void processRecords(KafkaConsumer&lt;String, String&gt; consumer) {\n           while (true) {\n                   ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);\n                   for (ConsumerRecord&lt;String, String&gt; record : records) {\n                           System.out.printf(\"offset = %d, key = %s, value = %s\\n\", record.offset(),                                     record.key(), record.value());\n                           \/\/ Save processed offset in external storage.\n                           offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                             record.offset());\n                   }\n              }\n       }\n}\npublic class MyConsumerRebalancerListener implements                                 org.apache.kafka.clients.consumer.ConsumerRebalanceListener {\n       private OffsetManager offsetManager = new OffsetManager(\"storage2\");\n       private Consumer&lt;String, String&gt; consumer;\n       public MyConsumerRebalancerListener(Consumer&lt;String, String&gt; consumer) {\n               this.consumer = consumer;\n       }\n       public void onPartitionsRevoked(Collection&lt;TopicPartition&gt; partitions) {\n               for (TopicPartition partition : partitions) {\n                   offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),                     consumer.position(partition));\n               }\n       }\n       public void onPartitionsAssigned(Collection&lt;TopicPartition&gt; partitions) {\n               for (TopicPartition partition : partitions) {\n                       consumer.seek(partition,                             offsetManager.readOffsetFromExternalStore(partition.topic(),                             partition.partition()));\n               }\n       }\n}\n\/**\n* The partition offset are stored in an external storage. In this case in a local file system where\n* program runs.\n*\/\npublic class OffsetManager {\n       private String storagePrefix;\n       public OffsetManager(String storagePrefix) {\n               this.storagePrefix = storagePrefix;\n       }\n   \/**\n       * in an external storage, overwrite the offset for the topic.\n       *\n       * @param topic - Topic name.\n       * @param partition - Partition of the topic.\n       * @param offset - offset to be stored.\n       *\/\n       void saveOffsetInExternalStore(String topic, int partition, long offset) {\n           try {\n               FileWriter writer = new FileWriter(storageName(topic, partition), false);\n               BufferedWriter bufferedWriter = new BufferedWriter(writer);\n               bufferedWriter.write(offset + \"\");\n               bufferedWriter.flush();\n               bufferedWriter.close();\n           } catch (Exception e) {\n                   e.printStackTrace();\n                   throw new RuntimeException(e);\n           }\n       }\n       \/**\n           * @return he last offset + 1 for the provided topic and partition.\n       *\/\n       long readOffsetFromExternalStore(String topic, int partition) {\n               try {\n                       Stream&lt;String&gt; stream = Files.lines(Paths.get(storageName(topic, partition)));\n                       return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;\n               } catch (Exception e) {\n                   e.printStackTrace();\n               }\n               return 0;\n       }\n       private String storageName(String topic, int partition) {\n           return storagePrefix + \"-\" + topic + \"-\" + partition;\n       }\n}\nanager(String storagePrefix) {\n               this.storagePrefix = storagePrefix;\n       }\n   \/**\n       * in an external storage, overwrite the offset for the topic.\n       *\n       * @param topic - Topic name.\n       * @param partition - Partition of the topic.\n       * @param offset - offset to be stored.\n       *\/\n       void saveOffsetInExternalStore(String topic, int partition, long offset) {\n           try {\n               FileWriter writer = new FileWriter(storageName(topic, partition), false);\n               BufferedWriter bufferedWriter = new BufferedWriter(writer);\n               bufferedWriter.write(offset + \"\");\n               bufferedWriter.flush();\n               bufferedWriter.close();\n           } catch (Exception e) {\n                   e.printStackTrace();\n                   throw new RuntimeException(e);\n           }\n       }\n       \/**\n           * @return he last offset + 1 for the provided topic and partition.\n       *\/\n       long readOffsetFromExternalStore(String topic, int partition) {\n               try {\n                       Stream&lt;String&gt; stream = Files.lines(Paths.get(storageName(topic, partition)));\n                       return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;\n               } catch (Exception e) {\n                   e.printStackTrace();\n               }\n               return 0;\n       }\n       private String storageName(String topic, int partition) {\n           return storagePrefix + \"-\" + topic + \"-\" + partition;\n       }\n}<\/pre>\n<p><strong>iv. Exactly-Once Kafka Static Consumer via Assign (One and Only One Message Delivery)<\/strong><br \/>\n<span style=\"font-weight: 400\">Here, via an \u2018assign (2) registration method call, the consumer registers with Kafka clients.<\/span><\/p>\n<p><span style=\"font-weight: 400\">Make sure, the offset should be manually managed in this case. In order to setup Exactly-once Kafka Static Consumer via Assign, follow \u00a0these steps:<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">At first, set enable.auto.commit = false<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Remember, after processing the message, don\u2019t make calls to consumer.commitSync().<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Moreover, by using \u2018assign\u2019 call, register consumer to the specific partition.<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">by calling consumer.seek(topicPartition, offset), seek to specific message offset, on startup of the consumer.<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Also, as a safety net, implement idempotent.<\/span><\/li>\n<\/ul>\n<p><strong>Code:<\/strong><\/p>\n<pre class=\"EnlighterJSRAW\">public class ExactlyOnceStaticConsumer {\n       private static OffsetManager offsetManager = new OffsetManager(\"storage1\");\n       public static void main(String[] str) throws InterruptedException, IOException {\n               System.out.println(\"Starting ExactlyOnceStaticConsumer ...\");\n               readMessages();\n       }\n       private static void readMessages() throws InterruptedException, IOException {\n               KafkaConsumer&lt;String, String&gt; consumer = createConsumer();\n               String topic = \"normal-topic\";\n               int partition =1;\n               TopicPartition topicPartition =\n                               registerConsumerToSpecificPartition(consumer, topic, partition);\n               \/\/ Read the offset for the topic and partition from external storage.\n               long offset = offsetManager.readOffsetFromExternalStore(topic, partition);\n               \/\/ Use seek and go to exact offset for that topic and partition.\n               consumer.seek(topicPartition, offset);\n               processRecords(consumer);\n       }\n       private static KafkaConsumer&lt;String, String&gt; createConsumer() {\n               Properties props = new Properties();\n               props.put(\"bootstrap.servers\", \"localhost:9092\");\n               String consumeGroup = \"cg2\";\n               props.put(\"group.id\", consumeGroup);\n               \/\/ To turn off the auto commit, below is a key setting.\n               props.put(\"enable.auto.commit\", \"false\");\n               props.put(\"heartbeat.interval.ms\", \"2000\");\n               props.put(\"session.timeout.ms\", \"6001\");\n               \/\/ control maximum data on each poll, make sure this value is bigger than the maximum                 \/\/ single message size\n               props.put(\"max.partition.fetch.bytes\", \"140\");\n               props.put(\"key.deserializer\",                                     \"org.apache.kafka.common.serialization.StringDeserializer\");\n               props.put(\"value.deserializer\",                                     \"org.apache.kafka.common.serialization.StringDeserializer\");\n               return new KafkaConsumer&lt;String, String&gt;(props);\n       }\n       \/**\n           * Manually listens for specific topic partition. Now, see an example of how to                * dynamically listens to partition and want to manually control offset,\n           * ExactlyOnceDynamicConsumer.java\n           *\/\n        private static TopicPartition registerConsumerToSpecificPartition(\n                   KafkaConsumer&lt;String, String&gt; consumer, String topic, int partition) {\n                   TopicPartition topicPartition = new TopicPartition(topic, partition);\n                   List&lt;TopicPartition&gt; partitions = Arrays.asList(topicPartition);\n                   consumer.assign(partitions);\n                   return topicPartition;\n         }\n           \/**\n               * Process data and store offset in external store. Best practice is to do these operations\n               * atomically.\n               *\/\n           private static void processRecords(KafkaConsumer&lt;String, String&gt; consumer) throws {\n                   while (true) {\n                          ConsumerRecords&lt;String, String&gt; records = consumer.poll(100);\n                           for (ConsumerRecord&lt;String, String&gt; record : records) {\n                                   System.out.printf(\"offset = %d, key = %s, value = %s\\n\", record.offset(),                                                 record.key(), record.value());\n                                   offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                                 record.offset());\n                           }\n                   }\n           }\n}<\/pre>\n<h2><span style=\"font-weight: 400\">Avro Producer and Consumer<\/span><\/h2>\n<p><span style=\"font-weight: 400\">On defining Avro, it is an open source binary message exchange protocol. Basically, in order to send optimized messages across the wire,\u00a0which also reduces the network overhead, we use it. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Moreover, for messages that can be defined using JSON, Avro can enforce a schema. By using these schemas, Avro can generate binding objects in various programming languages.<\/span><\/p>\n<p><span style=\"font-weight: 400\"> Using Avro with Kafka is natively supported as well as highly recommended.<\/span><\/p>\n<p><span style=\"font-weight: 400\">Below is a simple Avro consumer and producer.<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">public class AvroConsumerExample {\n       public static void main(String[] str) throws InterruptedException {\n               System.out.println(\"Starting AutoOffsetAvroConsumerExample ...\");\n               readMessages();\n       }\n       private static void readMessages() throws InterruptedException {\n               KafkaConsumer&lt;String, byte[]&gt; consumer = createConsumer();\n               \/\/ Assign to specific topic and partition.\n               consumer.assign(Arrays.asList(new TopicPartition(\"avro-topic\", 0)));\n               processRecords(consumer);\n         }\n         private static void processRecords(KafkaConsumer&lt;String, byte[]&gt; consumer) throws {\n               while (true) {\n                       ConsumerRecords&lt;String, byte[]&gt; records = consumer.poll(100);\n                       long lastOffset = 0;\n                       for (ConsumerRecord&lt;String, byte[]&gt; record : records) {\n                               GenericRecord genericRecord                                        = AvroSupport.byteArrayToData(AvroSupport.getSchema(),                                             record.value());\n                               String firstName = AvroSupport.getValue(genericRecord,                                             \"firstName\", String.class);\n                               System.out.printf(\"\\n\\roffset = %d, key = %s, value = %s\", record.offset(),                                             record.key(), firstName);\n                               lastOffset = record.offset();\n                       }\n                   System.out.println(\"lastOffset read: \" + lastOffset);\n                   consumer.commitSync();\n               }\n           }\n           private static KafkaConsumer&lt;String, byte[]&gt; createConsumer() {\n                       Properties props = new Properties();\n                       props.put(\"bootstrap.servers\", \"localhost:9092\");\n                       String consumeGroup = \"cg1\";\n                       props.put(\"group.id\", consumeGroup);\n                       props.put(\"enable.auto.commit\", \"true\");\n                       props.put(\"auto.offset.reset\", \"earliest\");\n                       props.put(\"auto.commit.interval.ms\", \"100\");\n                       props.put(\"heartbeat.interval.ms\", \"3000\");\n                       props.put(\"session.timeout.ms\", \"30000\");\n                       props.put(\"key.deserializer\",                                     \"org.apache.kafka.common.serialization.StringDeserializer\");\n                       props.put(\"value.deserializer\",                                     \"org.apache.kafka.common.serialization.ByteArrayDeserializer\");\n                   return new KafkaConsumer&lt;String, byte[]&gt;(props);\n           }\n}\npublic class AvroProducerExample {\n       public static void main(String[] str) throws InterruptedException, IOException {\n               System.out.println(\"Starting ProducerAvroExample ...\");\n               sendMessages();\n       }\n       private static void sendMessages() throws InterruptedException, IOException {\n               Producer&lt;String, byte[]&gt; producer = createProducer();\n               sendRecords(producer);\n       }\n       private static Producer&lt;String, byte[]&gt; createProducer() {\n                   Properties props = new Properties();\n                   props.put(\"bootstrap.servers\", \"localhost:9092\");\n                   props.put(\"acks\", \"all\");\n                   props.put(\"retries\", 0);\n                   props.put(\"batch.size\", 16384);\n                   props.put(\"linger.ms\", 1);\n                   props.put(\"buffer.memory\", 33554432);\n                   props.put(\"key.serializer\", \"org.apache.kafka.common.serialization.StringSerializer\");\n                   props.put(\"value.serializer\",                                 \"org.apache.kafka.common.serialization.ByteArraySerializer\");\n               return new KafkaProducer(props);\n       }\n       private static void sendRecords(Producer&lt;String, byte[]&gt; producer) throws IOException, {\n               String topic = \"avro-topic\";\n               int partition = 0;\n               while (true) {\n                       for (int i = 1; i &lt; 100; i++)\n                           producer.send(new ProducerRecord&lt;String, byte[]&gt;(topic, partition,                                     Integer.toString(0), record(i + \"\")));\n               }\n        }\n        private static byte[] record(String name) throws IOException {\n                   GenericRecord record = new GenericData.Record(AvroSupport.getSchema());\n                   record.put(\"firstName\", name);\n                   return AvroSupport.dataToByteArray(AvroSupport.getSchema(), record);\n         }\n}<\/pre>\n<p>So, this was all about Kafka Clients. Hope you like our explanation of how to create Kafka Clients.<\/p>\n<h2><span style=\"font-weight: 400\">Conclusion<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Hence, we have seen all the ways in which we can create Kafka clients using Kafka API. Moreover, in this Kafka Clients tutorial, we discussed Kafka Producer Client, Kafka Consumer Client. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Along with this, we also learned Avro Kafka Producer &amp; Consumer Kafka Clients.<\/span><span style=\"font-weight: 400\"> However, if any doubt occurs regarding\u00a0Kafka clients, feel free to ask through the comment section.\u00a0<\/span><\/p>\n","protected":false},"excerpt":{"rendered":"<p>In this article of Kafka clients, we will learn to create Apache Kafka clients by using Kafka API. There are several ways of creating Kafka clients such as at-most-once, at-least-once, and exactly-once message processing&#46;&#46;&#46;<\/p>\n","protected":false},"author":5,"featured_media":16799,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[9],"tags":[1212,1214,1215,1297,2572,4235,6073,7856,7857,7873,7920],"class_list":["post-16345","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-kafka","tag-at-least-once","tag-at-most-once","tag-at-most-once-kafka-consumer","tag-avro-producer-and-consumer-client","tag-clients-in-kafka","tag-exactly-once","tag-how-to-create-kafka-clients","tag-kafka-client-example","tag-kafka-clients","tag-kafka-consumer-client","tag-kafka-producer-client"],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v27.8 - https:\/\/yoast.com\/product\/yoast-seo-wordpress\/ -->\n<title>How to Create Kafka Clients: Avro Producer &amp; Consumer Client - DataFlair<\/title>\n<meta name=\"description\" content=\"Kafka clients tutorial:Kafka Producer client, Consumer Client:At-least-once,At-most once,exactly-once Kafka consumer,Avro Producer and Consumer in client\" \/>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/data-flair.training\/blogs\/kafka-clients\/\" \/>\n<meta property=\"og:locale\" content=\"en_US\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"How to Create Kafka Clients: Avro Producer &amp; Consumer Client - DataFlair\" \/>\n<meta property=\"og:description\" content=\"Kafka clients tutorial:Kafka Producer client, Consumer Client:At-least-once,At-most once,exactly-once Kafka consumer,Avro Producer and Consumer in client\" \/>\n<meta property=\"og:url\" content=\"https:\/\/data-flair.training\/blogs\/kafka-clients\/\" \/>\n<meta property=\"og:site_name\" content=\"DataFlair\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/DataFlairWS\/\" \/>\n<meta property=\"article:published_time\" content=\"2018-06-05T06:00:42+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/How-to-create-Kafka-Clients-01.jpg\" \/>\n\t<meta property=\"og:image:width\" content=\"1200\" \/>\n\t<meta property=\"og:image:height\" content=\"628\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/jpeg\" \/>\n<meta name=\"author\" content=\"DataFlair Team\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:creator\" content=\"@DataFlairWS\" \/>\n<meta name=\"twitter:site\" content=\"@DataFlairWS\" \/>\n<meta name=\"twitter:label1\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data1\" content=\"DataFlair Team\" \/>\n\t<meta name=\"twitter:label2\" content=\"Est. reading time\" \/>\n\t<meta name=\"twitter:data2\" content=\"13 minutes\" \/>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"How to Create Kafka Clients: Avro Producer &amp; Consumer Client - DataFlair","description":"Kafka clients tutorial:Kafka Producer client, Consumer Client:At-least-once,At-most once,exactly-once Kafka consumer,Avro Producer and Consumer in client","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/data-flair.training\/blogs\/kafka-clients\/","og_locale":"en_US","og_type":"article","og_title":"How to Create Kafka Clients: Avro Producer &amp; Consumer Client - DataFlair","og_description":"Kafka clients tutorial:Kafka Producer client, Consumer Client:At-least-once,At-most once,exactly-once Kafka consumer,Avro Producer and Consumer in client","og_url":"https:\/\/data-flair.training\/blogs\/kafka-clients\/","og_site_name":"DataFlair","article_publisher":"https:\/\/www.facebook.com\/DataFlairWS\/","article_published_time":"2018-06-05T06:00:42+00:00","og_image":[{"width":1200,"height":628,"url":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/How-to-create-Kafka-Clients-01.jpg","type":"image\/jpeg"}],"author":"DataFlair Team","twitter_card":"summary_large_image","twitter_creator":"@DataFlairWS","twitter_site":"@DataFlairWS","twitter_misc":{"Written by":"DataFlair Team","Est. reading time":"13 minutes"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/data-flair.training\/blogs\/kafka-clients\/#article","isPartOf":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-clients\/"},"author":{"name":"DataFlair Team","@id":"https:\/\/data-flair.training\/blogs\/#\/schema\/person\/7f83c342f5d1632d6f7b4b0b0f447823"},"headline":"How to Create Kafka Clients: Avro Producer &amp; Consumer Client","datePublished":"2018-06-05T06:00:42+00:00","mainEntityOfPage":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-clients\/"},"wordCount":1076,"commentCount":2,"publisher":{"@id":"https:\/\/data-flair.training\/blogs\/#organization"},"image":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-clients\/#primaryimage"},"thumbnailUrl":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/How-to-create-Kafka-Clients-01.jpg","keywords":["At-least once","At-most once","At-most-once Kafka Consumer","Avro producer and consumer client","Clients in Kafka","Exactly-once","how to create kafka clients","kafka client example","Kafka clients","kafka consumer client","Kafka producer client"],"articleSection":["Apache Kafka Tutorials"],"inLanguage":"en-US","potentialAction":[{"@type":"CommentAction","name":"Comment","target":["https:\/\/data-flair.training\/blogs\/kafka-clients\/#respond"]}]},{"@type":"WebPage","@id":"https:\/\/data-flair.training\/blogs\/kafka-clients\/","url":"https:\/\/data-flair.training\/blogs\/kafka-clients\/","name":"How to Create Kafka Clients: Avro Producer &amp; Consumer Client - DataFlair","isPartOf":{"@id":"https:\/\/data-flair.training\/blogs\/#website"},"primaryImageOfPage":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-clients\/#primaryimage"},"image":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-clients\/#primaryimage"},"thumbnailUrl":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/How-to-create-Kafka-Clients-01.jpg","datePublished":"2018-06-05T06:00:42+00:00","description":"Kafka clients tutorial:Kafka Producer client, Consumer Client:At-least-once,At-most once,exactly-once Kafka consumer,Avro Producer and Consumer in client","breadcrumb":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-clients\/#breadcrumb"},"inLanguage":"en-US","potentialAction":[{"@type":"ReadAction","target":["https:\/\/data-flair.training\/blogs\/kafka-clients\/"]}]},{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/data-flair.training\/blogs\/kafka-clients\/#primaryimage","url":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/How-to-create-Kafka-Clients-01.jpg","contentUrl":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/How-to-create-Kafka-Clients-01.jpg","width":1200,"height":628,"caption":"How to create Kafka Clients: Avro Producer &amp; Consumer Client"},{"@type":"BreadcrumbList","@id":"https:\/\/data-flair.training\/blogs\/kafka-clients\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Blog Home","item":"https:\/\/data-flair.training\/blogs\/"},{"@type":"ListItem","position":2,"name":"Apache Kafka Tutorials","item":"https:\/\/data-flair.training\/blogs\/category\/kafka\/"},{"@type":"ListItem","position":3,"name":"How to Create Kafka Clients: Avro Producer &amp; Consumer Client"}]},{"@type":"WebSite","@id":"https:\/\/data-flair.training\/blogs\/#website","url":"https:\/\/data-flair.training\/blogs\/","name":"DataFlair","description":"Learn Today. Lead Tomorrow.","publisher":{"@id":"https:\/\/data-flair.training\/blogs\/#organization"},"potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/data-flair.training\/blogs\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"en-US"},{"@type":"Organization","@id":"https:\/\/data-flair.training\/blogs\/#organization","name":"DataFlair","url":"https:\/\/data-flair.training\/blogs\/","logo":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/data-flair.training\/blogs\/#\/schema\/logo\/image\/","url":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2016\/07\/Data-Flair.png","contentUrl":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2016\/07\/Data-Flair.png","width":106,"height":48,"caption":"DataFlair"},"image":{"@id":"https:\/\/data-flair.training\/blogs\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/DataFlairWS\/","https:\/\/x.com\/DataFlairWS","https:\/\/www.linkedin.com\/company\/dataflair-web-services-pvt-ltd\/","https:\/\/www.youtube.com\/user\/DataFlairWS"]},{"@type":"Person","@id":"https:\/\/data-flair.training\/blogs\/#\/schema\/person\/7f83c342f5d1632d6f7b4b0b0f447823","name":"DataFlair Team","image":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/secure.gravatar.com\/avatar\/4cf3a74600d131330b8c481d519afd1574093ed89f6d3396a95393ad223eb7cd?s=96&d=mm&r=g","url":"https:\/\/secure.gravatar.com\/avatar\/4cf3a74600d131330b8c481d519afd1574093ed89f6d3396a95393ad223eb7cd?s=96&d=mm&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/4cf3a74600d131330b8c481d519afd1574093ed89f6d3396a95393ad223eb7cd?s=96&d=mm&r=g","caption":"DataFlair Team"},"description":"DataFlair Team creates expert-level guides on programming, Java, Python, C++, DSA, AI, ML, data Science, Android, Flutter, MERN, Web Development, and technology. Our goal is to empower learners with easy-to-understand content. Explore our resources for career growth and practical learning.","url":"https:\/\/data-flair.training\/blogs\/author\/dfteam1\/"}]}},"amp_enabled":true,"_links":{"self":[{"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/posts\/16345","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/users\/5"}],"replies":[{"embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/comments?post=16345"}],"version-history":[{"count":0,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/posts\/16345\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/media\/16799"}],"wp:attachment":[{"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/media?parent=16345"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/categories?post=16345"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/tags?post=16345"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}