Apache Kafka Producer For Beginners

Free Kafka course with real-time projects Start Now!!

In our last Kafka Tutorial, we discussed Kafka Cluster. Today, we will discuss Kafka Producer with the example. Moreover, we will see KafkaProducer API and Producer API.

Also, we will learn configurations settings in Kafka Producer. At last, we will discuss simple producer application in Kafka Producer tutorial. In order to publish messages to an Apache Kafka topic, we use Kafka Producer. 

So, let’s explore Apache Kafka Producer in detail.

What is Kafka Producer?

A Kafka Producer in Apache Kafka is a component in charge of creating and transmitting data to Kafka topics. Kafka is a distributed streaming technology made to deal with large amounts of data quickly. Building scalable and fault-tolerant data pipelines is made much easier by the ability to publish and subscribe to data as streams.

A client application called the Kafka Producer commonly sends messages or data records to Kafka topics.

Kafka is a popular option for developing scalable and real-time data streaming systems because of its design, which assures that Producers can transmit data to Brokers rapidly and Brokers can disseminate the data to Consumers in real-time.

Further, the picture below is showing the working of Apache Kafka Producer.

Kafka Producer - Apache Kafka Producer Working

Kafka Producer – Apache Kafka Producer Working

There are some API’s available in Kafka Producer Client.

KafkaProducer API

However, to publish a stream of records to one or more Kafka topics, this Kafka Producer API permits to an application. Moreover, its central part is KafkaProducer class. Basically, with the following methods, this class offers an option to connect a Kafka broker in its constructor:

  • In order to send messages asynchronously to a topic, KafkaProducer class provides send method. So, the signature of send() is:
producer.send(new ProducerRecord<byte[],byte[]>(topic,
partition, key1, value1) , callback);
  • ProducerRecord − Generally, the producer manages a buffer of records waiting to be sent.
  • Callback − When the record has been acknowledged by the server, a user-supplied callback to execute.

Note: Here, null indicates no callback.

  • Moreover, to ensure all previously sent messages have been actually completed, KafkaProducer class provides a flush method. So, the syntax of the flush method is −
public void flush()
  •  Also, to get the partition metadata for a given topic, KafkaProducer class provides the partition for method. Moreover, we can use it for custom partitioning. So, the signature of this method is:
public Map metrics()

In this way, this method returns the map of internal metrics maintained by the producer.

  • public void close() − It also offers a close method blocks until all previously sent requests are completed.

Producer API

Producer class is the central part of the Kafka Producer API. By the following methods, it offers an option to connect the Kafka broker in its constructor.

a. Kafka Producer Class

Basically, to send messages to either single or multiple topics, the producer class offers an send method. The following are the signatures we can use for it.

public void send(KeyedMessaget<k,v> message)

 – sends the data to a single topicpartitioned by key using either sync or async producer.

public void send(List<KeyedMessage<k,v>>messages)

– sends data to multiple topics.

Properties prop = new Properties();
prop.put(producer.type,”async”)
ProducerConfig config = new ProducerConfig(prop);

However, there are two types of producers, such as Sync and Async.

Although, to Sync producer, the same API configuration applies. There is only one difference in both: Sync producer sends messages directly but in the background whereas, when we want higher throughput, we prefer the Async producer.

However, an Async producer does not have a callback for send() to register error handlers in the previous releases like 0.8. It is only available in the current release of 0.9.

b. Public Void Close()

In order to close the producer pool connections to all Kafka brokers, producer class offers a public void close() method.

Configuration Settings For Kafka Producer API

Here, we are listing the Kafka Producer API’s main configuration settings:

a. client.id
It identifies producer application.

b. producer.type
Either sync or async.

c. acks
Basically, it controls the criteria for producer requests that are considered complete.

d. retries
“Retries” means if somehow producer request fails, then automatically retry with the specific value.

e. bootstrap.servers
It bootstraps list of brokers.

f. linger.ms
Basically, we can set linger.ms to something greater than some value, if we want to reduce the number of requests.

g. key.serializer
It is a key for the serializer interface.

h. value.serializer
A value for the serializer interface.

i. batch.size
Simply, Buffer size.

j. buffer.memory
“buffer.memory” controls the total amount of memory available to the producer for buffering.

ProducerRecord API

By using the following signature, it is a key/value pair that is sent to the Kafka cluster. ProducerRecord class constructor is for creating a record with partition, key and value pairs.
public ProducerRecord (string topic, int partition, k key, v value)

  1. Topic − user-defined topic name that will append to record.
  2. Partition − partition count.
  3. Key − The key that will be included in the record.
  4. Value − Record contents.

public ProducerRecord (string topic, k key, v value)
To create a record with the key, value pairs and without partition, we use the ProducerRecord class constructor.

  1. Topic − Create a topic to assign record.
  2. Key − key for the record.
  3. Value − Record contents.

public ProducerRecord (string topic, v value)
Moreover, without partition and key, ProducerRecord class creates a record.

  1. Topic − Create a topic.
  2. Value − Record contents.

Now, here we are listing the ProducerRecord class methods −

1. public string topic()
The topic will append to the record.

2. public K key()
Key that will be included in the record. If no such key, null will be returned here.

3. public V value()
To record contents.

4. partition()
Partition count for the record.

Simple Kafka Producer Application

However, make sure that first start ZooKeeper and Kafka broker then create your own topic in Kafka broker using create topic command. Then create a Java class named Sim-pleProducer.java and proceed with the following coding:

//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
  public static void main(String[] args) throws Exception{
     // Check arguments length value
     if(args.length == 0){
        System.out.println("Enter topic name”);
        return;
     }
     //Assign topicName to string variable
     String topicName = args[0].toString();
     // create instance for properties to access producer configs
     Properties props = new Properties();
     //Assign localhost id
     props.put("bootstrap.servers", “localhost:9092");
     //Set acknowledgements for producer requests.
     props.put("acks", “all");
     //If the request fails, the producer can automatically retry,
     props.put("retries", 0);
     //Specify buffer size in config
     props.put("batch.size", 16384);
     //Reduce the no of requests less than 0
     props.put("linger.ms", 1);
     //The buffer.memory controls the total amount of memory available to the producer for buffering.
     props.put("buffer.memory", 33554432);
     props.put("key.serializer",
        "org.apache.kafka.common.serializa-tion.StringSerializer");
     props.put("value.serializer",
        "org.apache.kafka.common.serializa-tion.StringSerializer");
     Producer<String, String> producer = new KafkaProducer
        <String, String>(props);
     for(int i = 0; i < 10; i++)
        producer.send(new ProducerRecord<String, String>(topicName,
           Integer.toString(i), Integer.toString(i)));
              System.out.println(“Message sent successfully”);
              producer.close();
  }
}

a. Compilation

By using the following command, we can compile the application.

//import util.properties packages
import java.util.Properties;
//import simple producer packages
import org.apache.kafka.clients.producer.Producer;
//import KafkaProducer packages
import org.apache.kafka.clients.producer.KafkaProducer;
//import ProducerRecord packages
import org.apache.kafka.clients.producer.ProducerRecord;
//Create java class named “SimpleProducer”
public class SimpleProducer {
  public static void main(String[] args) throws Exception{
     // Check arguments length value
     if(args.length == 0){
        System.out.println("Enter topic name”);
        return;
     }
     //Assign topicName to string variable
     String topicName = args[0].toString();
     // create instance for properties to access producer configs
     Properties props = new Properties();
     //Assign localhost id
     props.put("bootstrap.servers", “localhost:9092");
     //Set acknowledgements for producer requests.
     props.put("acks", “all");
     //If the request fails, the producer can automatically retry,
     props.put("retries", 0);
     //Specify buffer size in config
     props.put("batch.size", 16384);
     //Reduce the no of requests less than 0
     props.put("linger.ms", 1);
     //The buffer.memory controls the total amount of memory available to the producer for buffering.
     props.put("buffer.memory", 33554432);
     props.put("key.serializer",
        "org.apache.kafka.common.serializa-tion.StringSerializer");
     props.put("value.serializer",
        "org.apache.kafka.common.serializa-tion.StringSerializer");
     Producer<String, String> producer = new KafkaProducer
        <String, String>(props);
     for(int i = 0; i < 10; i++)
        producer.send(new ProducerRecord<String, String>(topicName,
           Integer.toString(i), Integer.toString(i)));
              System.out.println(“Message sent successfully”);
              producer.close();
  }
}

b. Execution

Further, using the following command, we can execute the application.

java -cp “/path/to/kafka/kafka_2.11-0.9.0.0/lib/*”:. SimpleProducer <topic-name>

c. Output

Message sent successfully
To check the above output open the new terminal and type Consumer CLI command to receive messages.

>> bin/kafka-console-consumer.sh --zookeeper localhost:2181 —topic <topic-name> —from-beginning

1
2
3
4
5
6
7
8
9
10
So, this was all about Apache Kafka Producer. Hope you like our explanation.

Summary: Kafka Producer

Hence, in this Kafka Tutorial, we have seen the concept of Kafka Producer along with the example. Now, in the next tutorial, we will learn about the Kafka Consumer, in order to consume messages from the Kafka cluster. Further, we have learned Producer API, Producer class, public void close.

Also, we discussed the configuration setting for the Kafka Producer API and Producer Record API. Finally, we saw SimpleProducer Application with the help of compilation, execution, and output. Furthermore, if you have any doubt, feel free to ask in the comment section. 

Did we exceed your expectations?
If Yes, share your valuable feedback on Google

follow dataflair on YouTube

3 Responses

  1. Mitch says:

    i get this error “Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers” when i try running the producer . I have even tried giving the localhost ip instead of “localhost” as url for bootstrap server, but it still gives this error . Any idea why?

    • Sumit Chauhan says:

      same happened to me .. resovled it using the port number too

      props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, “localhost:9092”)

  2. Ankita Gulati says:

    do u have git repo of kafka serializer and deserializer ?

Leave a Reply

Your email address will not be published. Required fields are marked *