Kafka Serialization and Deserialization With Example
Kafka course with real-time projects Start Now!!
Today, in this Kafka SerDe article, we will learn the concept to create a custom serializer and deserializer with Kafka. Moreover, we will look at how serialization works in Kafka and why serialization is required.
Along with this, we will see Kafka serializer example and Kafka deserializer example. In addition, this Kafka Serialization and Deserialization tutorial provide us with the knowledge of Kafka string serializer and Kafka object serializer.Â
Basically, Apache Kafka offers the ability that we can easily publish as well as subscribe to streams of records. Hence, we have the flexibility to create our own custom serializer as well as deserializer which helps to transmit different data type using it.
So, let’s start Kafka Serialization and Deserialization
Apache Kafka SerDe
However, the process of converting an object into a stream of bytes for the purpose of transmission is what we call Serialization. Although, Apache Kafka stores as well as transmit these bytes of arrays in its queue.
Whereas, the opposite of Serialization is Deserialization. Here we convert bytes of arrays into the data type we desire. However, make sure Kafka offers serializers and deserializers for only a few data types, such as
- String
- Long
- Double
- Integer
- Bytes
Why Use Custom Serializer and Deserializer with Kafka?
Basically, in order to prepare the message for transmission from the producer to the broker, we use serializers.
In other words, before transmitting the entire message to the broker, let the producer know how to convert the message into byte array we use serializers. Similarly, to convert the byte array back to the object we use the deserializers by the consumer.
Implementation of Kafka SerDe
It is very important to implement the org.apache.kafka.common.serialization.Serializer interface to create a serializer class. Ans, for deserializer class, it is important to implement the org.apache.kafka.common.serialization.Deserializer interface.
There are 3 methods for both Kafka serialization and deserialization interfaces:
a. Configure
At startup with configuration, we call Configure method.
b. Serialize/deserialize
For the purpose of Kafka serialization and deserialization, we use this method.
c. Close
While Kafka session is to be closed, we use Close method.
Serializer Interface With Kafka
public interface Serializer extends Closeable { void configure(Map<String, ?> var1, boolean var2); byte[] serialize(String var1, T var2); void close(); }
Deserializer Interface With Kafka
public interface Deserializer extends Closeable { void configure(Map<String, ?> var1, boolean var2); T deserialize(String var1, byte[] var2); void close(); }
Example of Serializer and Deserializer
Here dependencies are:
- Kafka (0.10.1.1).
- FasterXML Jackson (2.8.6).
user.java: public class User { private String firstname; private int age; public User() { } public User(String firstname, int age) { this.firstname = firstname; this.age = age; } public String getfirstName() { return this.firstname; } public int getAge() { return this.age; } @Override public String toString() { return "User(" + firstname + ", " + age + ")"; } }
userserializer.java: public class UserSerializer implements Serializer { @Override public void configure(Map<String, ?> map, boolean b) { } @Override public byte[] serialize(String arg0, User arg1) { byte[] retVal = null; ObjectMapper objectMapper = new ObjectMapper(); try { retVal = objectMapper.writeValueAsString(arg1).getBytes(); } catch (Exception e) { e.printStackTrace(); } return retVal; } @Override public void close() { } }
Userdeserializer.java: public class UserDeserializer implements Deserializer { @Override public void close() { } @Override public void configure(Map<String, ?> arg0, boolean arg1) { } @Override public User deserialize(String arg0, byte[] arg1) { ObjectMapper mapper = new ObjectMapper(); User user = null; try { user = mapper.readValue(arg1, User.class); } catch (Exception e) { e.printStackTrace(); } return user; } }
Further, in order to use the above serializer, we have to use this property to register it:
props.put("value.serializer", "com.knoldus.serializers.UserSerializer");
So, the producer will be:
try (Producer<String, User> producer = new KafkaProducer<>(props)) { producer.send(new ProducerRecord<String, User>("MyTopic", user)); System.out.println("Message " + user.toString() + " sent !!"); } catch (Exception e) { e.printStackTrace(); }
Now, again, we need to register this property, for the deserializer:
props.put("value.deserializer", "com.knoldus.deserializer.UserDeserializer");
Hence, the consumer will be:
try (KafkaConsumer<String, User> consumer = new KafkaConsumer<>(props)) { consumer.subscribe(Collections.singletonList(topic)); while (true) { ConsumerRecords<String, User> messages = consumer.poll(100); for (ConsumerRecord<String, User> message : messages) { System.out.println("Message received " + message.value().toString()); } } } catch (Exception e) { e.printStackTrace(); }
So, this was all Kafka Serialization and Deserialization. Hope you like and understand our explanation of the custom serializer and deserializer with Kafka.
Conclusion
Hence, in this Kafka Serialization and Deserialization tutorial, we have learned to create a custom Kafka SerDe example. Moreover, we saw the need for serializer and deserializer with Kafka. Along with this, we learned implementation methods for Kafka Serialization and Deserialization.
Also, we understood Kafka string serializer and Kafka object serializer with the help of an example. However, if any doubt occurs, feel free to ask in the comment section.
Your 15 seconds will encourage us to work even harder
Please share your happy experience on Google
Do this serialization and Deserialization work for kafka streams?