Apache Kafka Producer For Beginners 2019

In our last Kafka Tutorial, we discussed Kafka Cluster. Today, we will discuss Kafka Producer with the example. Moreover, we will see KafkaProducer API and Producer API. Also, we will learn configurations settings in Kafka Producer. At last, we will discuss simple producer application in Kafka Producer tutorial. In order to publish messages to an Apache Kafka topic, we use Kafka Producer. 
So, let’s explore Apache Kafka Producer in detail.

Apache Kafka Producer

Kafka Producer for Beginners

1. What is Kafka Producer?

Basically, an application that is the source of the data stream is what we call a producer. In order to generate tokens or messages and further publish it to one or more topics in the Kafka cluster, we use Apache Kafka Producer. Also, the Producer API from Kafka helps to pack the message or token and deliver it to Kafka Server.
Learn How to Create Kafka Clients
Further, the picture below is showing the working of Apache Kafka Producer.

Kafka Producer - Apache Kafka Producer Working

Kafka Producer – Apache Kafka Producer Working

There are some API’s available in Kafka Producer Client.

2. KafkaProducer API

However, to publish a stream of records to one or more Kafka topics, this Kafka Producer API permits to an application. Moreover, its central part is KafkaProducer class. Basically, with the following methods, this class offers an option to connect a Kafka broker in its constructor:

  • In order to send messages asynchronously to a topic, KafkaProducer class provides send method. So, the signature of send() is:
producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
  • ProducerRecord − Generally, the producer manages a buffer of records waiting to be sent.
  • Callback − When the record has been acknowledged by the server, a user-supplied callback to execute.

Note: Here, null indicates no callback.
Let’s discuss Kafka- serialization and deserialization

  • Moreover, to ensure all previously sent messages have been actually completed, KafkaProducer class provides a flush method. So, the syntax of the flush method is −
public void flush()
  •  Also, to get the partition metadata for a given topic, KafkaProducer class provides the partition for method. Moreover, we can use it for custom partitioning. So, the signature of this method is:
public Map metrics()

In this way, this method returns the map of internal metrics maintained by the producer.

  • public void close() − It also offers a close method blocks until all previously sent requests are completed.

3. Producer API

Producer class is the central part of the Kafka Producer API. By the following methods, it offers an option to connect the Kafka broker in its constructor.

a. Kafka Producer Class

Basically, to send messages to either single or multiple topics, the producer class offers an send method. The following are the signatures we can use for it.

public void send(KeyedMessaget<k,v> message)

 – sends the data to a single topicpartitioned by key using either sync or async producer.

public void send(List<KeyedMessage<k,v>>messages)

– sends data to multiple topics.
Have a look at Apache Kafka-Load Test with JMeter

Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

However, there are two types of producers, such as Sync and Async.
Although, to Sync producer, the same API configuration applies. There is only one difference in both: Sync producer sends messages directly but in the background whereas, when we want higher throughput, we prefer the Async producer. However, an Async producer does not have a callback for send() to register error handlers in the previous releases like 0.8. It is only available in the current release of 0.9.

Join DataFlair on Telegram

b. Public Void Close()

In order to close the producer pool connections to all Kafka brokers, producer class offers a public void close() method.
Read Kafka Use Cases and Applications

4. Configuration Settings For Kafka Producer API

Here, we are listing the Kafka Producer API’s main configuration settings:
a. client.id
It identifies producer application.
b. producer.type
Either sync or async.
c. acks
Basically, it controls the criteria for producer requests that are considered complete.
d. retries
“Retries” means if somehow producer request fails, then automatically retry with the specific value.
e. bootstrap.servers
It bootstraps list of brokers.
f. linger.ms
Basically, we can set linger.ms to something greater than some value, if we want to reduce the number of requests.
g. key.serializer
It is a key for the serializer interface.
h. value.serializer
A value for the serializer interface.
i. batch.size
Simply, Buffer size.
j. buffer.memory
“buffer.memory” controls the total amount of memory available to the producer for buffering.

5. ProducerRecord API

By using the following signature, it is a key/value pair that is sent to the Kafka cluster. ProducerRecord class constructor is for creating a record with partition, key and value pairs.
public ProducerRecord (string topic, int partition, k key, v value)

  1. Topic − user-defined topic name that will append to record.
  2. Partition − partition count.
  3. Key − The key that will be included in the record.
  4. Value − Record contents.

public ProducerRecord (string topic, k key, v value)
To create a record with the key, value pairs and without partition, we use the ProducerRecord class constructor.

  1. Topic − Create a topic to assign record.
  2. Key − key for the record.
  3. Value − Record contents.

public ProducerRecord (string topic, v value)
Moreover, without partition and key, ProducerRecord class creates a record.

  1. Topic − Create a topic.
  2. Value − Record contents.

Now, here we are listing the ProducerRecord class methods −
1. public string topic()
The topic will append to the record.
2. public K key()
Key that will be included in the record. If no such key, null will be returned here.
3. public V value()
To record contents.
4. partition()
Partition count for the record.

6. Simple Kafka Producer Application

However, make sure that first start ZooKeeper and Kafka broker then create your own topic in Kafka broker using create topic command. Then create a Java class named Sim-pleProducer.java and proceed with the following coding:

//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
  public static void main(String[] args) throws Exception{
     // Check arguments length value
     if(args.length == 0){
        System.out.println("Enter topic name”);
        return;
     }
     //Assign topicName to string variable
     String topicName = args[0].toString();
     // create instance for properties to access producer configs
     Properties props = new Properties();
     //Assign localhost id
     props.put("bootstrap.servers", “localhost:9092");
     //Set acknowledgements for producer requests.
     props.put("acks", “all");
     //If the request fails, the producer can automatically retry,
     props.put("retries", 0);
     //Specify buffer size in config
     props.put("batch.size", 16384);
     //Reduce the no of requests less than 0
     props.put("linger.ms", 1);
     //The buffer.memory controls the total amount of memory available to the producer for buffering.
     props.put("buffer.memory", 33554432);
     props.put("key.serializer",
        "org.apache.kafka.common.serializa-tion.StringSerializer");
     props.put("value.serializer",
        "org.apache.kafka.common.serializa-tion.StringSerializer");
     Producer<String, String> producer = new KafkaProducer
        <String, String>(props);
     for(int i = 0; i < 10; i++)
        producer.send(new ProducerRecord<String, String>(topicName,
           Integer.toString(i), Integer.toString(i)));
              System.out.println(“Message sent successfully”);
              producer.close();
  }
}

a. Compilation

By using the following command, we can compile the application.
Let’s revise Kafka Commands

//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
  public static void main(String[] args) throws Exception{
     // Check arguments length value
     if(args.length == 0){
        System.out.println("Enter topic name”);
        return;
     }
     //Assign topicName to string variable
     String topicName = args[0].toString();
     // create instance for properties to access producer configs
     Properties props = new Properties();
     //Assign localhost id
     props.put("bootstrap.servers", “localhost:9092");
     //Set acknowledgements for producer requests.
     props.put("acks", “all");
     //If the request fails, the producer can automatically retry,
     props.put("retries", 0);
     //Specify buffer size in config
     props.put("batch.size", 16384);
     //Reduce the no of requests less than 0
     props.put("linger.ms", 1);
     //The buffer.memory controls the total amount of memory available to the producer for buffering.
     props.put("buffer.memory", 33554432);
     props.put("key.serializer",
        "org.apache.kafka.common.serializa-tion.StringSerializer");
     props.put("value.serializer",
        "org.apache.kafka.common.serializa-tion.StringSerializer");
     Producer<String, String> producer = new KafkaProducer
        <String, String>(props);
     for(int i = 0; i < 10; i++)
        producer.send(new ProducerRecord<String, String>(topicName,
           Integer.toString(i), Integer.toString(i)));
              System.out.println(“Message sent successfully”);
              producer.close();
  }
}

b. Execution

Further, using the following command, we can execute the application.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

c. Output

Message sent successfully
To check the above output open the new terminal and type Consumer CLI command to receive messages.

>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning

1
2
3
4
5
6
7
8
9
10
So, this was all about Apache Kafka Producer. Hope you like our explanation.
Read Apache Kafka Workflow | Kafka Pub-Sub Messaging

7. Summary: Kafka Producer

Hence, in this Kafka Tutorial, we have seen the concept of Kafka Producer along with the example. Now, in the next tutorial, we will learn about the Kafka Consumer, in order to consume messages from the Kafka cluster. Further, we have learned Producer API, Producer class, public void close. Also, we discussed the configuration setting for the Kafka Producer API and Producer Record API. Finally, we saw SimpleProducer Application with the help of compilation, execution, and output. Furthermore, if you have any doubt, feel free to ask in the comment section. 

2 Responses

  1. Mitch says:

    i get this error “Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers” when i try running the producer . I have even tried giving the localhost ip instead of “localhost” as url for bootstrap server, but it still gives this error . Any idea why?

  2. Ankita Gulati says:

    do u have git repo of kafka serializer and deserializer ?

Leave a Reply

Your email address will not be published. Required fields are marked *

This site is protected by reCAPTCHA and the Google Privacy Policy and Terms of Service apply.