How to Create Kafka Clients: Avro Producer & Consumer Client

1. Objective – Kafka Client

In this article of Kafka clients, we will learn to create Apache Kafka clients by using Kafka API. There are several ways of creating Kafka clients such as at-most-once, at-least-once, and exactly-once message processing needs. So, in this Kafka Clients tutorial, we’ll learn the detailed description of all three ways. Moreover, we will see how to use the Avro client in detail.

So, let’s start Kafka Client Tutorial.

Kafka Clients

How to create Kafka Clients: Avro Producer & Consumer Client

2. What are Kafka Clients?

  • Prerequisites to create Kafka clients
  1. Initially, for creating Kafka Clients, we have to setup Apache Kafka middleware on our local machine. 
  2. Moreover, before starting to create Kafka clients, a locally installed single node Kafka instance must run on our local machine along with a running Zookeeper and a running Kafka node.

learn Apache Kafka Use cases | Kafka Applications
Further, in Kafka Clients to create a topic named normal-topic with two partitions the command is:

bin/kafka-topics --zookeeper localhost:2181 --create --topic normal-topic --partitions 2 --replication-factor 1

Further, execute the following command, to check the status of the created topic:

bin/kafka-topics --list --topic normal-topic --zookeeper localhost:2181

Also, to increase the partition if the topic needs to be altered, execute the following command:

bin/kafka-topics.sh --alter --topic normal-topic --zookeeper localhost:2181 --partitions 2

3. Kafka Producer Client

Here, is the following code to implement a Kafka producer client. It will help to send text messages and also to adjust the loop in order to control the number of messages that need to be sent to create Kafka Clients:

public class ProducerExample {
   public static void main(String[] str) throws InterruptedException, IOException {
           System.out.println("Starting ProducerExample ...");
           sendMessages();
   }
   private static void sendMessages() throws InterruptedException, IOException {
           Producer<String, String> producer = createProducer();
           sendMessages(producer);
           // Allow the producer to complete sending of the messages before program exit.
           Thread.sleep(20);
   }
   private static Producer<String, String> createProducer() {
       Properties props = new Properties();
       props.put("bootstrap.servers", "localhost:9092");
       props.put("acks", "all");
       props.put("retries", 0);
       // Controls how much bytes sender would wait to batch up before publishing to Kafka.
       props.put("batch.size", 10);
       props.put("linger.ms", 1);
       props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
       return new KafkaProducer(props);
   }
   private static void sendMessages(Producer<String, String> producer) {
       String topic = "normal-topic";
       int partition = 0;
       long record = 1;
       for (int i = 1; i <= 10; i++) {
           producer.send(
               new ProducerRecord<String, String>(topic, partition,                                 Long.toString(record),Long.toString(record++)));
       }
   }
}

4. Consumer Can Register With Kafka

At first, let’s learn several ways, by which a Kafka consumer client can register with a Kafka broker. Specifically, there are two methods, either using the subscribe method call or using an assign method call. Let’s learn both these Kafka Clients methods in detail.

a. Using the Subscribe Method Call

When using a subscribe method call, Kafka automatically rebalances the available consumers at the time of topic or partition gets added/deleted, or at the time a consumer gets added or deleted.

b. Using the Assign Method Call.

However, Kafka clients do not offer an automatic re-balance of the consumers, when a consumer is registered with an assign method call.
Let’s revise Kafka Architecture and its fundamental concepts
Either of the above registration options can be used by at-most-once, at-least-once or exactly-once consumers.
i. At-most-once Kafka Consumer (Zero or More Deliveries)
Basically, it is the default behavior of a Kafka Consumer.
In order to configure this type of consumer in Kafka Clients, follow these steps:

  • First, set ‘enable.auto.commit’ to true.
  • Also, set ‘auto.commit.interval.ms’ to a lower timeframe.
  • Make sure, don’t make calls to consumer.commitSync(); from the consumer. Moreover,  Kafka would auto-commit offset at the specified interval, with this configuration of the consumer.

However, there is a possibility that consumer could exhibit at-most-once or at-least-once behavior, while a consumer is configured this way. Although, let’s declare this consumer as at-most-once because at-most-once is the lower messaging guarantee. Let’s discuss both consumer behaviors in detail:

  • At-most-once scenario

The moment when commit interval has occurred, and also which triggers Kafka to automatically commit the last used offset, this scenario happens. However, let’s suppose the messages and consumer have crashed between the processing. Then it starts to receive messages from the last committed offset when consumer restarts. Meanwhile, a consumer could lose a few messages.
Explore Advantages and Disadvantages of Kafka

  • At-least-once scenario

While consumer processes a message and commits the message into its persistent store and consumer crashes at that point this scenario happens. However, let’s suppose Kafka could not get a chance to commit the offset to the broker since commit interval has not passed. Then, it gets delivered with a few older messages from the last committed offset when the consumer restarts.
Code for Kafka Consumer:

public class AtMostOnceConsumer {
       public static void main(String[] str) throws InterruptedException {
           System.out.println("Starting  AtMostOnceConsumer ...");
           execute();
       }
       private static void execute() throws InterruptedException {
               KafkaConsumer<String, String> consumer = createConsumer();
               // Subscribe to all partition in that topic. 'assign' could be used here
               // instead of 'subscribe' to subscribe to specific partition.
               consumer.subscribe(Arrays.asList("normal-topic"));
               processRecords(consumer);
       }
       private static KafkaConsumer<String, String> createConsumer() {
               Properties props = new Properties();
               props.put("bootstrap.servers", "localhost:9092");
               String consumeGroup = "cg1";
               props.put("group.id", consumeGroup);
               // Set this property, if auto commit should happen.
               props.put("enable.auto.commit", "true");
               // Auto commit interval, kafka would commit offset at this interval.
               props.put("auto.commit.interval.ms", "101");
               // This is how to control number of records being read in each poll
               props.put("max.partition.fetch.bytes", "135");
               // Set this if you want to always read from beginning.
               // props.put("auto.offset.reset", "earliest");
               props.put("heartbeat.interval.ms", "3000");
               props.put("session.timeout.ms", "6001");
               props.put("key.deserializer",
                       "org.apache.kafka.common.serialization.StringDeserializer");
               props.put("value.deserializer",
                       "org.apache.kafka.common.serialization.StringDeserializer");
               return new KafkaConsumer<String, String>(props);
       }
       private static void processRecords(KafkaConsumer<String, String> consumer)  {
               while (true) {
                       ConsumerRecords<String, String> records = consumer.poll(100);
                       long lastOffset = 0;
                       for (ConsumerRecord<String, String> record : records) {
                               System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),                                             record.key(), record.value());
                              lastOffset = record.offset();
                        }
               System.out.println("lastOffset read: " + lastOffset);
               process();
               }
       }
       private static void process() throws InterruptedException {
               // create some delay to simulate processing of the message.
               Thread.sleep(20);
       }
}

ii. At-least-once Kafka Consumer (One or More Message Deliveries, Duplicate Possible)
In order to configure this type of consumer, follow these steps:

  • First, set ‘enable.auto.commit’ to false  or
  • Also, set ‘enable.auto.commit’ to true with ‘auto.commit.interval.ms’ to a higher number.

By making the following call consumer.commitSync(), Consumer should now then take control of the message offset commits to Kafka;
In addition, to avoid reprocessing of the duplicate messages, implement ‘idempotent’ behavior within consumer, especially, for this type of consumer because in the following scenario, duplicate message delivery could happen.
Let’s discuss Apache Kafka Security | Need and Components of Kafka
Code:

public class AtLeastOnceConsumer {
   public static void main(String[] str) throws InterruptedException {
           System.out.println("Starting AutoOffsetGuranteedAtLeastOnceConsumer ...");
           execute();
    }
   private static void execute() throws InterruptedException {
           KafkaConsumer<String, String> consumer = createConsumer();
           // Subscribe to all partition in that topic. 'assign' could be used here
           // instead of 'subscribe' to subscribe to specific partition.
           consumer.subscribe(Arrays.asList("normal-topic"));
           processRecords(consumer);
    }
    private static KafkaConsumer<String, String> createConsumer() {
           Properties props = new Properties();
           props.put("bootstrap.servers", "localhost:9092");
           String consumeGroup = "cg1";
           props.put("group.id", consumeGroup);
           // Set this property, if auto commit should happen.
           props.put("enable.auto.commit", "true");
           // Make Auto commit interval to a big number so that auto commit does not happen,
           // we are going to control the offset commit via consumer.commitSync(); after processing             // message.
           props.put("auto.commit.interval.ms", "999999999999");
           // This is how to control number of messages being read in each poll
           props.put("max.partition.fetch.bytes", "135");
           props.put("heartbeat.interval.ms", "3000");
           props.put("session.timeout.ms", "6001");
           props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
           props.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer");
           return new KafkaConsumer<String, String>(props);
   }
    private static void processRecords(KafkaConsumer<String, String> consumer) throws {
           while (true) {
                   ConsumerRecords<String, String> records = consumer.poll(100);
                   long lastOffset = 0;
                   for (ConsumerRecord<String, String> record : records) {
                       System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),                                         record.key(), record.value());
                       lastOffset = record.offset();
                   }
                   System.out.println("lastOffset read: " + lastOffset);
                   process();
                   // Below call is important to control the offset commit. Do this call after you
                   // finish processing the business process.
                   consumer.commitSync();
           }
   }
   private static void process() throws InterruptedException {
       // create some delay to simulate processing of the record.
       Thread.sleep(20);
   }
}

iii. Exactly-once Kafka Dynamic Consumer via Subscribe (One and Only One Message Delivery)
Here, via a ‘subscribe’(1, a) registration method call, a consumer registers with Kafka.
Make sure, the offset should be manually managed in this case. In order to setup exactly-once scenario in Kafka Clients, follow these steps:

  • At first, set enable.auto.commit = false.
  • After processing the message, don’t make calls to consumer.commitSync().
  • Moreover, by making a ‘subscribe’ call, Register consumer to a topic.
  • In order to start reading from a specific offset of that topic/partition, implement a ConsumerRebalanceListener. Also, perform consumer.seek(topicPartition, offset), within the listener.
  • As a safety net, implement idempotent.

Code:

public class ExactlyOnceDynamicConsumer {
      private static OffsetManager offsetManager = new OffsetManager("storage2");
       public static void main(String[] str) throws InterruptedException {
               System.out.println("Starting ExactlyOnceDynamicConsumer ...");
               readMessages();
       }
       private static void readMessages() throws InterruptedException {
               KafkaConsumer<String, String> consumer = createConsumer();
               // Manually controlling offset but register consumer to topics to get dynamically
               // assigned partitions. Inside MyConsumerRebalancerListener use
               // consumer.seek(topicPartition,offset) to control offset which messages to be read.
               consumer.subscribe(Arrays.asList("normal-topic"),
                               new MyConsumerRebalancerListener(consumer));
               processRecords(consumer);
       }
       private static KafkaConsumer<String, String> createConsumer() {
               Properties props = new Properties();
               props.put("bootstrap.servers", "localhost:9092");
               String consumeGroup = "cg3";
               props.put("group.id", consumeGroup);
               // To turn off the auto commit, below is a key setting.
               props.put("enable.auto.commit", "false");
               props.put("heartbeat.interval.ms", "2000");
               props.put("session.timeout.ms", "6001");
               // Control maximum data on each poll, make sure this value is bigger than the maximum                   // single message size
               props.put("max.partition.fetch.bytes", "140");
               props.put("key.deserializer",                                 "org.apache.kafka.common.serialization.StringDeserializer");
               props.put("value.deserializer",                         "org.apache.kafka.common.serialization.StringDeserializer");
               return new KafkaConsumer<String, String>(props);
       }
       private static void processRecords(KafkaConsumer<String, String> consumer)
           while (true) {
                   ConsumerRecords<String, String> records = consumer.poll(100);
                   for (ConsumerRecord<String, String> record : records) {
                           System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(),                                     record.key(), record.value());
                           // Save processed offset in external storage.
                           offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                             record.offset());
                   }
              }
       }
}
public class MyConsumerRebalancerListener implements                                 org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
       private OffsetManager offsetManager = new OffsetManager("storage2");
       private Consumer<String, String> consumer;
       public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
               this.consumer = consumer;
       }
       public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
               for (TopicPartition partition : partitions) {
                   offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),                     consumer.position(partition));
               }
       }
       public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
               for (TopicPartition partition : partitions) {
                       consumer.seek(partition,                             offsetManager.readOffsetFromExternalStore(partition.topic(),                             partition.partition()));
               }
       }
}
/**
* The partition offset are stored in an external storage. In this case in a local file system where
* program runs.
*/
public class OffsetManager {
       private String storagePrefix;
       public OffsetMpublic class ExactlyOnceDynamicConsumer {
      private static OffsetManager offsetManager = new OffsetManager("storage2");
       public static void main(String[] str) throws InterruptedException {
               System.out.println("Starting ExactlyOnceDynamicConsumer ...");
               readMessages();
       }
       private static void readMessages() throws InterruptedException {
               KafkaConsumer<String, String> consumer = createConsumer()
               // Manually controlling offset but register consumer to topics to get dynamically
               // assigned partitions. Inside MyConsumerRebalancerListener use
               // consumer.seek(topicPartition,offset) to control offset which messages to be read.
               consumer.subscribe(Arrays.asList("normal-topic"),
                              new MyConsumerRebalancerListener(consumer));
               processRecords(consumer);
       }
       private static KafkaConsumer<String, String> createConsumer() {
               Properties props = new Properties();
               props.put("bootstrap.servers", "localhost:9092");
               String consumeGroup = "cg3";
               props.put("group.id", consumeGroup);
               // To turn off the auto commit, below is a key setting.
               props.put("enable.auto.commit", "false");
               props.put("heartbeat.interval.ms", "2000");
               props.put("session.timeout.ms", "6001");
               // Control maximum data on each poll, make sure this value is bigger than the maximum                   // single message size
               props.put("max.partition.fetch.bytes", "140");
               props.put("key.deserializer",                                 "org.apache.kafka.common.serialization.StringDeserializer");
               props.put("value.deserializer",                         "org.apache.kafka.common.serialization.StringDeserializer");
               return new KafkaConsumer<String, String>(props);
       }
       private static void processRecords(KafkaConsumer<String, String> consumer) {
           while (true) {
                   ConsumerRecords<String, String> records = consumer.poll(100);
                   for (ConsumerRecord<String, String> record : records) {
                           System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(),                                     record.key(), record.value());
                           // Save processed offset in external storage.
                           offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                             record.offset());
                   }
              }
       }
}
public class MyConsumerRebalancerListener implements                                 org.apache.kafka.clients.consumer.ConsumerRebalanceListener {
       private OffsetManager offsetManager = new OffsetManager("storage2");
       private Consumer<String, String> consumer;
       public MyConsumerRebalancerListener(Consumer<String, String> consumer) {
               this.consumer = consumer;
       }
       public void onPartitionsRevoked(Collection<TopicPartition> partitions) {
               for (TopicPartition partition : partitions) {
                   offsetManager.saveOffsetInExternalStore(partition.topic(), partition.partition(),                     consumer.position(partition));
               }
       }
       public void onPartitionsAssigned(Collection<TopicPartition> partitions) {
               for (TopicPartition partition : partitions) {
                       consumer.seek(partition,                             offsetManager.readOffsetFromExternalStore(partition.topic(),                             partition.partition()));
               }
       }
}
/**
* The partition offset are stored in an external storage. In this case in a local file system where
* program runs.
*/
public class OffsetManager {
       private String storagePrefix;
       public OffsetManager(String storagePrefix) {
               this.storagePrefix = storagePrefix;
       }
   /**
       * in an external storage, overwrite the offset for the topic.
       *
       * @param topic - Topic name.
       * @param partition - Partition of the topic.
       * @param offset - offset to be stored.
       */
       void saveOffsetInExternalStore(String topic, int partition, long offset) {
           try {
               FileWriter writer = new FileWriter(storageName(topic, partition), false);
               BufferedWriter bufferedWriter = new BufferedWriter(writer);
               bufferedWriter.write(offset + "");
               bufferedWriter.flush();
               bufferedWriter.close();
           } catch (Exception e) {
                   e.printStackTrace();
                   throw new RuntimeException(e);
           }
       }
       /**
           * @return he last offset + 1 for the provided topic and partition.
       */
       long readOffsetFromExternalStore(String topic, int partition) {
               try {
                       Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
                       return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
               } catch (Exception e) {
                   e.printStackTrace();
               }
               return 0;
       }
       private String storageName(String topic, int partition) {
           return storagePrefix + "-" + topic + "-" + partition;
       }
}
anager(String storagePrefix) {
               this.storagePrefix = storagePrefix;
       }
   /**
       * in an external storage, overwrite the offset for the topic.
       *
       * @param topic - Topic name.
       * @param partition - Partition of the topic.
       * @param offset - offset to be stored.
       */
       void saveOffsetInExternalStore(String topic, int partition, long offset) {
           try {
               FileWriter writer = new FileWriter(storageName(topic, partition), false);
               BufferedWriter bufferedWriter = new BufferedWriter(writer);
               bufferedWriter.write(offset + "");
               bufferedWriter.flush();
               bufferedWriter.close();
           } catch (Exception e) {
                   e.printStackTrace();
                   throw new RuntimeException(e);
           }
       }
       /**
           * @return he last offset + 1 for the provided topic and partition.
       */
       long readOffsetFromExternalStore(String topic, int partition) {
               try {
                       Stream<String> stream = Files.lines(Paths.get(storageName(topic, partition)));
                       return Long.parseLong(stream.collect(Collectors.toList()).get(0)) + 1;
               } catch (Exception e) {
                   e.printStackTrace();
               }
               return 0;
       }
       private String storageName(String topic, int partition) {
           return storagePrefix + "-" + topic + "-" + partition;
       }
}

Have a look at Storm Kafka Integration With Configurations and Code
iv. Exactly-Once Kafka Static Consumer via Assign (One and Only One Message Delivery)
Here, via an ‘assign (2) registration method call, the consumer registers with Kafka clients.
Make sure, the offset should be manually managed in this case. In order to setup Exactly-once Kafka Static Consumer via Assign, follow  these steps:

  • At first, set enable.auto.commit = false
  • Remember, after processing the message, don’t make calls to consumer.commitSync().
  • Moreover, by using ‘assign’ call, register consumer to the specific partition.
  • by calling consumer.seek(topicPartition, offset), seek to specific message offset, on startup of the consumer.
  • Also, as a safety net, implement idempotent.

Code:

public class ExactlyOnceStaticConsumer {
       private static OffsetManager offsetManager = new OffsetManager("storage1");
       public static void main(String[] str) throws InterruptedException, IOException {
               System.out.println("Starting ExactlyOnceStaticConsumer ...");
               readMessages();
       }
       private static void readMessages() throws InterruptedException, IOException {
               KafkaConsumer<String, String> consumer = createConsumer();
               String topic = "normal-topic";
               int partition =1;
               TopicPartition topicPartition =
                               registerConsumerToSpecificPartition(consumer, topic, partition);
               // Read the offset for the topic and partition from external storage.
               long offset = offsetManager.readOffsetFromExternalStore(topic, partition);
               // Use seek and go to exact offset for that topic and partition.
               consumer.seek(topicPartition, offset);
               processRecords(consumer);
       }
       private static KafkaConsumer<String, String> createConsumer() {
               Properties props = new Properties();
               props.put("bootstrap.servers", "localhost:9092");
               String consumeGroup = "cg2";
               props.put("group.id", consumeGroup);
               // To turn off the auto commit, below is a key setting.
               props.put("enable.auto.commit", "false");
               props.put("heartbeat.interval.ms", "2000");
               props.put("session.timeout.ms", "6001");
               // control maximum data on each poll, make sure this value is bigger than the maximum                 // single message size
               props.put("max.partition.fetch.bytes", "140");
               props.put("key.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");
               props.put("value.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");
               return new KafkaConsumer<String, String>(props);
       }
       /**
           * Manually listens for specific topic partition. Now, see an example of how to                * dynamically listens to partition and want to manually control offset,
           * ExactlyOnceDynamicConsumer.java
           */
        private static TopicPartition registerConsumerToSpecificPartition(
                   KafkaConsumer<String, String> consumer, String topic, int partition) {
                   TopicPartition topicPartition = new TopicPartition(topic, partition);
                   List<TopicPartition> partitions = Arrays.asList(topicPartition);
                   consumer.assign(partitions);
                   return topicPartition;
         }
           /**
               * Process data and store offset in external store. Best practice is to do these operations
               * atomically.
               */
           private static void processRecords(KafkaConsumer<String, String> consumer) throws {
                   while (true) {
                          ConsumerRecords<String, String> records = consumer.poll(100);
                           for (ConsumerRecord<String, String> record : records) {
                                   System.out.printf("offset = %d, key = %s, value = %s\n", record.offset(),                                                 record.key(), record.value());
                                   offsetManager.saveOffsetInExternalStore(record.topic(), record.partition(),                                                 record.offset());
                           }
                   }
           }
}

5. Avro Producer and Consumer

On defining Avro, it is an open source binary message exchange protocol. Basically, in order to send optimized messages across the wire, which also reduces the network overhead, we use it. Moreover, for messages that can be defined using JSON, Avro can enforce a schema. By using these schemas, Avro can generate binding objects in various programming languages. Using Avro with Kafka is natively supported as well as highly recommended.
Read Apache Kafka + Spark Streaming Integration
Below is a simple Avro consumer and producer.

public class AvroConsumerExample {
       public static void main(String[] str) throws InterruptedException {
               System.out.println("Starting AutoOffsetAvroConsumerExample ...");
               readMessages();
       }
       private static void readMessages() throws InterruptedException {
               KafkaConsumer<String, byte[]> consumer = createConsumer();
               // Assign to specific topic and partition.
               consumer.assign(Arrays.asList(new TopicPartition("avro-topic", 0)));
               processRecords(consumer);
         }
         private static void processRecords(KafkaConsumer<String, byte[]> consumer) throws {
               while (true) {
                       ConsumerRecords<String, byte[]> records = consumer.poll(100);
                       long lastOffset = 0;
                       for (ConsumerRecord<String, byte[]> record : records) {
                               GenericRecord genericRecord                                        = AvroSupport.byteArrayToData(AvroSupport.getSchema(),                                             record.value());
                               String firstName = AvroSupport.getValue(genericRecord,                                             "firstName", String.class);
                               System.out.printf("\n\roffset = %d, key = %s, value = %s", record.offset(),                                             record.key(), firstName);
                               lastOffset = record.offset();
                       }
                   System.out.println("lastOffset read: " + lastOffset);
                   consumer.commitSync();
               }
           }
           private static KafkaConsumer<String, byte[]> createConsumer() {
                       Properties props = new Properties();
                       props.put("bootstrap.servers", "localhost:9092");
                       String consumeGroup = "cg1";
                       props.put("group.id", consumeGroup);
                       props.put("enable.auto.commit", "true");
                       props.put("auto.offset.reset", "earliest");
                       props.put("auto.commit.interval.ms", "100");
                       props.put("heartbeat.interval.ms", "3000");
                       props.put("session.timeout.ms", "30000");
                       props.put("key.deserializer",                                     "org.apache.kafka.common.serialization.StringDeserializer");
                       props.put("value.deserializer",                                     "org.apache.kafka.common.serialization.ByteArrayDeserializer");
                   return new KafkaConsumer<String, byte[]>(props);
           }
}
public class AvroProducerExample {
       public static void main(String[] str) throws InterruptedException, IOException {
               System.out.println("Starting ProducerAvroExample ...");
               sendMessages();
       }
       private static void sendMessages() throws InterruptedException, IOException {
               Producer<String, byte[]> producer = createProducer();
               sendRecords(producer);
       }
       private static Producer<String, byte[]> createProducer() {
                   Properties props = new Properties();
                   props.put("bootstrap.servers", "localhost:9092");
                   props.put("acks", "all");
                   props.put("retries", 0);
                   props.put("batch.size", 16384);
                   props.put("linger.ms", 1);
                   props.put("buffer.memory", 33554432);
                   props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
                   props.put("value.serializer",                                 "org.apache.kafka.common.serialization.ByteArraySerializer");
               return new KafkaProducer(props);
       }
       private static void sendRecords(Producer<String, byte[]> producer) throws IOException, {
               String topic = "avro-topic";
               int partition = 0;
               while (true) {
                       for (int i = 1; i < 100; i++)
                           producer.send(new ProducerRecord<String, byte[]>(topic, partition,                                     Integer.toString(0), record(i + "")));
               }
        }
        private static byte[] record(String name) throws IOException {
                   GenericRecord record = new GenericData.Record(AvroSupport.getSchema());
                   record.put("firstName", name);
                   return AvroSupport.dataToByteArray(AvroSupport.getSchema(), record);
         }
}

So, this was all about Kafka Clients. Hope you like our explanation of how to create Kafka Clients.

6. Conclusion

Hence, we have seen all the ways in which we can create Kafka clients using Kafka API. Moreover, in this Kafka Clients tutorial, we discussed Kafka Producer Client, Kafka Consumer Client. Along with this, we also learned Avro Kafka Producer & Consumer Kafka Clients. However, if any doubt occurs regarding Kafka clients, feel free to ask through the comment section. 
See also- Apache Kafka Quiz
For reference

Leave a Reply

Your email address will not be published. Required fields are marked *

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.