Kafka Serialization and Deserialization With Example

Free Kafka course with real-time projects Start Now!!

Today, in this Kafka SerDe article, we will learn the concept to create a custom serializer and deserializer with Kafka. Moreover, we will look at how serialization works in Kafka and why serialization is required.

Along with this, we will see Kafka serializer example and Kafka deserializer example. In addition, this Kafka Serialization and Deserialization tutorial provide us with the knowledge of Kafka string serializer and Kafka object serializer. 

Basically, Apache Kafka offers the ability that we can easily publish as well as subscribe to streams of records. Hence, we have the flexibility to create our own custom serializer as well as deserializer which helps to transmit different data type using it.

So, let’s start Kafka Serialization and Deserialization

Apache Kafka SerDe

However, the process of converting an object into a stream of bytes for the purpose of transmission is what we call Serialization. Although, Apache Kafka stores as well as transmit these bytes of arrays in its queue.

Whereas, the opposite of Serialization is Deserialization. Here we convert bytes of arrays into the data type we desire. However, make sure Kafka offers serializers and deserializers for only a few data types, such as

  • String
  • Long
  • Double
  • Integer
  • Bytes

Why Use Custom Serializer and Deserializer with Kafka?

Basically, in order to prepare the message for transmission from the producer to the broker, we use serializers.

In other words, before transmitting the entire message to the broker, let the producer know how to convert the message into byte array we use serializers. Similarly, to convert the byte array back to the object we use the deserializers by the consumer.

Implementation of Kafka SerDe

It is very important to implement the org.apache.kafka.common.serialization.Serializer interface to create a serializer class. Ans, for deserializer class, it is important to implement the org.apache.kafka.common.serialization.Deserializer interface.

There are 3 methods for both Kafka serialization and deserialization interfaces:

Implementation Methods for Kafka Serialization and Deserialization

Implementation Methods for Kafka Serialization and Deserialization

a. Configure

At startup with configuration, we call Configure method.

b. Serialize/deserialize

For the purpose of Kafka serialization and deserialization, we use this method.

c. Close

While Kafka session is to be closed, we use Close method.

Serializer Interface With Kafka

public interface Serializer extends Closeable {
 void configure(Map<String, ?> var1, boolean var2);
 byte[] serialize(String var1, T var2);
 void close();
}

Deserializer Interface With Kafka

public interface Deserializer extends Closeable {
 void configure(Map<String, ?> var1, boolean var2);
 T deserialize(String var1, byte[] var2);
 void close();
}

Example of Serializer and Deserializer

Here dependencies are:

  • Kafka (0.10.1.1).
  • FasterXML Jackson (2.8.6).
user.java:
public class User {
 private String firstname;
 private int age;
 public User() {
 }
 public User(String firstname, int age) {
   this.firstname = firstname;
   this.age = age;
 }
 public String getfirstName() {
   return this.firstname;
 }
 public int getAge() {
   return this.age;
 }
 @Override public String toString() {
   return "User(" + firstname + ", " + age + ")";
 }
}
userserializer.java:
public class UserSerializer implements Serializer {
 @Override public void configure(Map<String, ?> map, boolean b) {
 }
 @Override public byte[] serialize(String arg0, User arg1) {
   byte[] retVal = null;
   ObjectMapper objectMapper = new ObjectMapper();
   try {
     retVal = objectMapper.writeValueAsString(arg1).getBytes();
   } catch (Exception e) {
     e.printStackTrace();
   }
   return retVal;
 }
 @Override public void close() {
 }
}
Userdeserializer.java:
public class UserDeserializer implements Deserializer {
 @Override public void close() {
 }
 @Override public void configure(Map<String, ?> arg0, boolean arg1) {
 }
 @Override
 public User deserialize(String arg0, byte[] arg1) {
   ObjectMapper mapper = new ObjectMapper();
   User user = null;
   try {
     user = mapper.readValue(arg1, User.class);
   } catch (Exception e) {
     e.printStackTrace();
   }
   return user;
 }
}

Further, in order to use the above serializer, we have to use this property to register it:

props.put("value.serializer", "com.knoldus.serializers.UserSerializer");

So, the producer will be:

try (Producer<String, User> producer = new KafkaProducer<>(props)) {
  producer.send(new ProducerRecord<String, User>("MyTopic", user));
  System.out.println("Message " + user.toString() + " sent !!");
} catch (Exception e) {
  e.printStackTrace();
}

Now, again, we need to register this property, for the deserializer:

props.put("value.deserializer", "com.knoldus.deserializer.UserDeserializer");

Hence, the consumer will be:

try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) {
   consumer.subscribe(Collections.singletonList(topic));
   while (true) {
       ConsumerRecords<String, User> messages = consumer.poll(100);
       for (ConsumerRecord<String, User> message : messages) {
         System.out.println("Message received " + message.value().toString());
       }
   }
} catch (Exception e) {
   e.printStackTrace();
}

So, this was all Kafka Serialization and Deserialization. Hope you like and understand our explanation of the custom serializer and deserializer with Kafka.

Conclusion

Hence, in this Kafka Serialization and Deserialization tutorial, we have learned to create a custom Kafka SerDe example. Moreover, we saw the need for serializer and deserializer with Kafka. Along with this, we learned implementation methods for Kafka Serialization and Deserialization.

Also, we understood Kafka string serializer and Kafka object serializer with the help of an example. However, if any doubt occurs, feel free to ask in the comment section.

You give me 15 seconds I promise you best tutorials
Please share your happy experience on Google

follow dataflair on YouTube

1 Response

  1. Chetan says:

    Do this serialization and Deserialization work for kafka streams?

Leave a Reply

Your email address will not be published. Required fields are marked *