

{"id":15651,"date":"2018-05-29T06:05:24","date_gmt":"2018-05-29T06:05:24","guid":{"rendered":"https:\/\/data-flair.training\/blogs\/?p=15651"},"modified":"2018-05-29T06:05:24","modified_gmt":"2018-05-29T06:05:24","slug":"kafka-schema-registry","status":"publish","type":"post","link":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/","title":{"rendered":"Kafka Schema Registry | Learn Avro Schema"},"content":{"rendered":"<p><span style=\"font-weight: 400\">In this Kafka Schema Registry tutorial, we will learn what the Schema Registry is and why we should use it with <strong>Apache Kafka<\/strong>. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Also, we will see the concept of Avro schema evolution and set up and using Schema Registry with Kafka Avro Serializers. <\/span><span style=\"font-weight: 400\">Moreover, we will learn to manage Avro Schemas with the REST interface of the Schema Registry.\u00a0<\/span><\/p>\n<p>So, let&#8217;s discuss Apache Kafka Schema Registry.<\/p>\n<h2><span style=\"font-weight: 400\">What is Kafka Schema Registry?<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Basically, for both <strong>Kafka Producers<\/strong> and <strong>Kafka Consumers<\/strong>, Schema Registry in Kafka stores Avro Schemas.<\/span><\/p>\n<ul>\n<li><span style=\"font-weight: 400\">It offers a RESTful interface for managing Avro schemas.<\/span><\/li>\n<li><span style=\"font-weight: 400\">It permits for the storage of a history of schemas that are versioned. <\/span><\/li>\n<li><span style=\"font-weight: 400\">Moreover, it supports checking schema compatibility for Kafka.<\/span><\/li>\n<li><span style=\"font-weight: 400\">Using Avro Schema, we can configure compatibility settings to support the evolution of Kafka schemas.<\/span><\/li>\n<\/ul>\n<p><span style=\"font-weight: 400\">Basically, the Kafka Avro serialization project offers serializers. With the help of Avro and Kafka Schema Registry, both the Kafka Producers and Kafka Consumers that use Kafka Avro serialization handles the schema management as well as the serialization of records.<\/span><\/p>\n<div id=\"attachment_15933\" style=\"width: 1210px\" class=\"wp-caption aligncenter\"><a href=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry.png\"><img loading=\"lazy\" decoding=\"async\" aria-describedby=\"caption-attachment-15933\" class=\"wp-image-15933 size-full\" src=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry.png\" alt=\"Kafka Schema Registry\" width=\"1200\" height=\"628\" srcset=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry.png 1200w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry-150x79.png 150w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry-300x157.png 300w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry-768x402.png 768w, https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry-1024x536.png 1024w\" sizes=\"auto, (max-width: 1200px) 100vw, 1200px\" \/><\/a><p id=\"caption-attachment-15933\" class=\"wp-caption-text\">Introduction to Kafka Schema Registry<\/p><\/div>\n<p><span style=\"font-weight: 400\">Moreover, producers don\u2019t have to send schema, while using the Confluent Schema Registry in Kafka, \u2014 just the unique schema ID. <\/span><\/p>\n<p><span style=\"font-weight: 400\">So, in order to look up the full schema from the Confluent Schema Registry if it&#8217;s not already cached, the consumer uses the schema ID. <\/span><span style=\"font-weight: 400\">That implies don\u2019t have to send the schema with each set of records, that results in saving the time as well.\u00a0<\/span><\/p>\n<p><span style=\"font-weight: 400\">However, also the Kafka producer creates a record\/message, that is an Avro record. That record contains a schema ID and data. Also, the schema is registered if needed and then it serializes the data and schema ID, with the Kafka Avro Serializer.\u00a0<\/span><\/p>\n<h2><span style=\"font-weight: 400\">Why Use Schema Registry in Kafka?\u00a0<\/span><\/h2>\n<p><span style=\"font-weight: 400\">The consumer&#8217;s schema could differ from the producer&#8217;s. On defining a consumer schema, it is what the consumer is expecting the record\/message to conform to. <\/span><\/p>\n<p><span style=\"font-weight: 400\">As the check is performed,\u00a0the payload transformation happens via Avro Schema Evolution, with the Schema Registry, if the two schemas don\u2019t match but are compatible. In addition, Kafka records can have a key and a value and both can have a schema.<\/span><\/p>\n<h2><span style=\"font-weight: 400\">Kafka Schema Registry Operations<\/span><\/h2>\n<p><span style=\"font-weight: 400\">However, for keys and values of Kafka records, the Schema Registry can store schemas. Also, it lists schemas by subject. Moreover, it can list all versions of a subject (schema). Also, it can retrieve a schema by version or ID. And can get the latest version of a schema. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Some compatibility levels for the Apache Kafka Schema Registry are backward, forward, full, none. In addition, we can manage schemas via a REST API with the Schema registry.<\/span><\/p>\n<h2><span style=\"font-weight: 400\">Kafka Schema Compatibility Settings<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Let\u2019s understand all compatibility levels. Basically, Backward compatibility, refers to data written with an older schema that is readable with a newer schema. Moreover, Forward compatibility refers to data written with a newer schema is readable with old schemas. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Further, Full compatibility refers to a new version of a schema is backward- and forward-compatible. And, \u00a0&#8220;none&#8221; status, means it disables schema validation and it is not recommended. Hence, Schema Registry just stores the schema and it will not be validated for compatibility if we set the level to &#8220;none&#8221;.<\/span><\/p>\n<h3>a. Schema Registry Configuration<\/h3>\n<p><span style=\"font-weight: 400\">Either globally or per subject.<\/span><br \/>\n<span style=\"font-weight: 400\">The compatibility value will be:<\/span><br \/>\n<strong>A. None<\/strong><br \/>\n<span style=\"font-weight: 400\">It means don\u2019t check for schema compatibility.<\/span><br \/>\n<strong>B. Forward<\/strong><br \/>\n<span style=\"font-weight: 400\">That says, check to make sure the last schema version is forward-compatible with new schemas.<\/span><br \/>\n<strong>C. Backward (default)<\/strong><br \/>\n<span style=\"font-weight: 400\">It means making sure the new schema is backward-compatible with the latest.<\/span><br \/>\n<strong>D. Full<\/strong><br \/>\n<span style=\"font-weight: 400\">\u201cFull,\u201d says to make sure the new schema is forward- and backward-compatible from the latest to newest and from the newest to latest.<\/span><\/p>\n<h2><span style=\"font-weight: 400\">Schema Evolution<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Although, if using an older version of that schema, an Avro schema is changed after data has been written to store, then it is a possibility that Avro does a schema evolution when we try to read that data.<\/span><\/p>\n<p><span style=\"font-weight: 400\">However, schema evolution happens only during deserialization at the consumer (read), from Kafka perspective. <\/span><\/p>\n<p><span style=\"font-weight: 400\">And, if possible then the value or key is automatically modified during deserialization to conform to the consumer&#8217;s read schema if the consumer\u2019s schema is different from the producer\u2019s schema.<\/span><\/p>\n<p><span style=\"font-weight: 400\">In simple words, it is an automatic transformation of Avro schemas between the consumer schema version and what schema the producer put into the Kafka log. <\/span><\/p>\n<p><span style=\"font-weight: 400\">However, a data transformation is performed on the Kafka record\u2019s key or value, when the consumer schema is not identical to the producer schema which used to serialize the Kafka record. Although, there is no need to do a transformation if the schemas match.<\/span><\/p>\n<h3>a. Allowed Modification During Schema Evolution<\/h3>\n<p><span style=\"font-weight: 400\">It is possible to add a field with a default to a schema. we can remove a field that had a default value. Also, we can change a field\u2019s order attribute. Moreover, we can change a field\u2019s default value to another value or add a default value to a field that did not have one. <\/span><\/p>\n<p><span style=\"font-weight: 400\">However, it is possible to remove or add a field alias, but that may result in breaking some consumers that depend on the alias. Also, we can change a type to a union that contains original type. These above changes will result as our schema can use Avro\u2019s schema evolution when reading with an old schema.<\/span><\/p>\n<h3>b. Rules of the Road for Modifying Schemas<\/h3>\n<p><span style=\"font-weight: 400\">We have to follow these guidelines if we want to make our schema evolvable. At first, we need to provide a default value for fields in our schema, because that allows us to delete the field later. Keep in mind that never change a field\u2019s data type. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Also, we have to provide a default value for the field, when adding a new field to your schema. And, make sure don\u2019t rename an existing field (use aliases instead).<\/span><\/p>\n<p><span style=\"font-weight: 400\">For Example:<\/span><\/p>\n<ul>\n<li><strong>Employee example Avro schema:<\/strong><\/li>\n<\/ul>\n<pre class=\"EnlighterJSRAW\">{\"namespace\": \"com.dataflair.phonebook\",\n \"type\": \"record\",\n \"name\": \"Employee\",\n \"doc\" : \"Represents an Employee at a company\",\n \"fields\": [\n   {\"name\": \"firstName\", \"type\": \"string\", \"doc\": \"The persons given name\"},\n   {\"name\": \"nickName\", \"type\": [\"null\", \"string\"], \"default\" : null},\n   {\"name\": \"lastName\", \"type\": \"string\"},\n   {\"name\": \"age\",  \"type\": \"int\", \"default\": -1},\n   {\"name\": \"emails\", \"default\":[], \"type\":{\"type\": \"array\", \"items\": \"string\"}},\n   {\"name\": \"phoneNumber\",  \"type\":\n   [ \"null\",\n     { \"type\": \"record\",   \"name\": \"PhoneNumber\",\n       \"fields\": [\n         {\"name\": \"areaCode\", \"type\": \"string\"},\n         {\"name\": \"countryCode\", \"type\": \"string\", \"default\" : \"\"},\n         {\"name\": \"prefix\", \"type\": \"string\"},\n         {\"name\": \"number\", \"type\": \"string\"}\n       ]\n     }\n   ]\n   },\n   {\"name\":\"status\", \"default\" :\"SALARY\", \"type\": { \"type\": \"enum\", \"name\": \"Status\",\n     \"symbols\" : [\"RETIRED\", \"SALARY\", \"HOURLY\", \"PART_TIME\"]}\n   }\n ]\n}<\/pre>\n<h2><span style=\"font-weight: 400\">Avro Schema Evolution Scenario<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Assume in version 1 of the schema, our Employee record did not have an age factor. But now we want to add an age field with a default value of -1. So, let\u2019s suppose we have a consumer using version 1 with no age and a producer using version 2 of the schema with age<\/span><\/p>\n<p><span style=\"font-weight: 400\">Now, by using version 2 of the Employee schema the Producer, creates a com.dataflair.Employee record sets age field to 42, then sends it to Kafka topic new-Employees. Afterwards, using version 1 the consumer consumes records from new-Employees of the Employee schema. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Hence, the age field gets removed during deserialization just because the consumer is using version 1 of the schema.<\/span><\/p>\n<p><span style=\"font-weight: 400\">Furthermore, the same consumer modifies some records and then writes the record to a NoSQL store. As a result, the age field is missing from the record that it writes to the NoSQL store. Now, using version 2 of the schema another client, which has the age, reads the record from the NoSQL store. \u00a0<\/span><\/p>\n<p><span style=\"font-weight: 400\">Hence, because the Consumer wrote it with version 1, the age field is missing from the record, thus the client reads the record and the age is set to default value of -1.<\/span><\/p>\n<p><span style=\"font-weight: 400\">So, the Schema Registry could reject the schema and the producer could never add it to the Kafka log, if we added the age and it was not optional, i.e. the age field did not have a default.<\/span><\/p>\n<h2><span style=\"font-weight: 400\">Using the Schema Registry REST API<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Moreover, by using the following operations, the Schema Registry in Kafka allows us to manage schemas:<\/span><\/p>\n<ol>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Store schemas for keys and values of Kafka records<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">List schemas by subject<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">List all versions of a subject (schema)<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Retrieves a schema by version<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Retrieves a schema by ID<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Retrieve the latest version of a schema<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Perform compatibility checks<\/span><\/li>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Set compatibility level globally<\/span><\/li>\n<\/ol>\n<p><span style=\"font-weight: 400\">However, all of this is available via a REST API with the Schema Registry in Kafka.<\/span><br \/>\n<span style=\"font-weight: 400\">We can do the following, in order to post a new schema:<\/span><\/p>\n<h3>a. Posting a new schema<\/h3>\n<pre class=\"EnlighterJSRAW\">curl -X POST -H \"Content-Type:\napplication\/vnd.schemaregistry.v1+json\" \\\n   --data '{\"schema\": \"{\\\"type\\\": \u2026}\u2019 \\\n   http:\/\/localhost:8081\/subjects\/Employee\/versions<\/pre>\n<h3>b.\u00a0To list all of the schemas<\/h3>\n<p><b>curl -X GET <\/b><b>http:\/\/localhost:8081\/subjects<\/b><br \/>\n<span style=\"font-weight: 400\">We can basically perform all of the above operations via the REST interface for the Schema Registry, only if you have a good HTTP client. For example, Schema Registry a little better using the OkHttp client from Square (com.squareup.okhttp3:okhttp:3.7.0+) as follows:<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\">\n<h3><span style=\"font-weight: 400\">Using REST endpoints to try out all of the Schema Registry options:<\/span><\/h3>\n<\/li>\n<\/ul>\n<pre class=\"EnlighterJSRAW\">package com.dataflair.kafka.schema;\nimport okhttp3.*;\nimport java.io.IOException;\npublic class SchemaMain {\n   private final static MediaType SCHEMA_CONTENT =\n           MediaType.parse(\"application\/vnd.schemaregistry.v1+json\");\n   private final static String Employee_SCHEMA = \"{\\n\" +\n           \" \\\"schema\\\": \\\"\" +\n           \" {\" +\n           \" \\\\\\\"namespace\\\\\\\": \\\\\\\"com.dataflair.phonebook\\\\\\\",\" +\n           \" \\\\\\\"type\\\\\\\": \\\\\\\"record\\\\\\\",\" +\n           \" \\\\\\\"name\\\\\\\": \\\\\\\"Employee\\\\\\\",\" +\n           \" \\\\\\\"fields\\\\\\\": [\" +\n           \" {\\\\\\\"name\\\\\\\": \\\\\\\"fName\\\\\\\", \\\\\\\"type\\\\\\\": \\\\\\\"string\\\\\\\"},\" +\n           \" {\\\\\\\"name\\\\\\\": \\\\\\\"lName\\\\\\\", \\\\\\\"type\\\\\\\": \\\\\\\"string\\\\\\\"},\" +\n           \" {\\\\\\\"name\\\\\\\": \\\\\\\"age\\\\\\\",  \\\\\\\"type\\\\\\\": \\\\\\\"int\\\\\\\"},\" +\n           \" {\\\\\\\"name\\\\\\\": \\\\\\\"phoneNumber\\\\\\\",  \\\\\\\"type\\\\\\\": \\\\\\\"string\\\\\\\"}\" +\n           \" ]\" +\n           \" }\\\"\" +\n           \"}\";\n   public static void main(String... args) throws IOException {\n       System.out.println(Employee_SCHEMA);\n       final OkHttpClient client = new OkHttpClient();\n       \/\/POST A NEW SCHEMA\n       Request request = new Request.Builder()\n               .post(RequestBody.create(SCHEMA_CONTENT, Employee_SCHEMA))\n               .url(\"http:\/\/localhost:8081\/subjects\/Employee\/versions\")\n               .build();\n       String output = client.newCall(request).execute().body().string();\n       System.out.println(output);\n       \/\/LIST ALL SCHEMAS\n       request = new Request.Builder()\n               .url(\"http:\/\/localhost:8081\/subjects\")\n               .build();\n       output = client.newCall(request).execute().body().string();\n       System.out.println(output);\n       \/\/SHOW ALL VERSIONS OF Employee\n       request = new Request.Builder()\n               .url(\"http:\/\/localhost:8081\/subjects\/Employee\/versions\/\")\n               .build();\n       output = client.newCall(request).execute().body().string();\n       System.out.println(output);\n       \/\/SHOW VERSION 2 OF Employee\n       request = new Request.Builder()\n               .url(\"http:\/\/localhost:8081\/subjects\/Employee\/versions\/2\")\n               .build();\n       output = client.newCall(request).execute().body().string();\n       System.out.println(output);\n       \/\/SHOW THE SCHEMA WITH ID 3\n       request = new Request.Builder()\n               .url(\"http:\/\/localhost:8081\/schemas\/ids\/3\")\n               .build();\n       output = client.newCall(request).execute().body().string();\n       System.out.println(output);\n       \/\/SHOW THE LATEST VERSION OF Employee 2\n       request = new Request.Builder()\n               .url(\"http:\/\/localhost:8081\/subjects\/Employee\/versions\/latest\")\n               .build();\n       output = client.newCall(request).execute().body().string();\n       System.out.println(output);\n       \/\/CHECK IF SCHEMA IS REGISTERED\n       request = new Request.Builder()\n               .post(RequestBody.create(SCHEMA_CONTENT, Employee_SCHEMA))\n               .url(\"http:\/\/localhost:8081\/subjects\/Employee\")\n               .build();\n       output = client.newCall(request).execute().body().string();\n       System.out.println(output);\n       \/\/TEST COMPATIBILITY\n       request = new Request.Builder()\n               .post(RequestBody.create(SCHEMA_CONTENT, Employee_SCHEMA))\n               .url(\"http:\/\/localhost:8081\/compatibility\/subjects\/Employee\/versions\/latest\")\n               .build();\n       output = client.newCall(request).execute().body().string();\n       System.out.println(output);\n       \/\/ TOP LEVEL CONFIG\n       request = new Request.Builder()\n               .url(\"http:\/\/localhost:8081\/config\")\n               .build();\n       output = client.newCall(request).execute().body().string();\n       System.out.println(output);\n       \/\/ SET TOP LEVEL CONFIG\n       \/\/ VALUES are none, backward, forward and full\n       request = new Request.Builder()\n               .put(RequestBody.create(SCHEMA_CONTENT, \"{\\\"compatibility\\\": \\\"none\\\"}\"))\n               .url(\"http:\/\/localhost:8081\/config\")\n               .build();\n       output = client.newCall(request).execute().body().string();\n       System.out.println(output);\n       \/\/ SET CONFIG FOR Employee\n       \/\/ VALUES are none, backward, forward and full\n       request = new Request.Builder()\n               .put(RequestBody.create(SCHEMA_CONTENT, \"{\\\"compatibility\\\": \\\"backward\\\"}\"))\n               .url(\"http:\/\/localhost:8081\/config\/Employee\")\n              .build();\n       output = client.newCall(request).execute().body().string();\n      System.out.println(output);\n   }\n}<\/pre>\n<p>We suggest to run the example to try to force incompatible schemas to the Schema Registry, and also to note the behavior for the various compatibility settings.<\/p>\n<h3>c. Running Kafka Schema Registry:<\/h3>\n<p><b>$ cat ~\/tools\/confluent-3.2.1\/etc\/schema-registry\/schema-registry.properties<\/b><br \/>\n<b>listeners=http:\/\/0.0.0.0:8081<\/b><br \/>\n<b>kafkastore.connection.url=localhost:2181<\/b><br \/>\n<b>kafkastore.topic=_schemas<\/b><br \/>\n<b>debug=false<\/b><br \/>\n<b>~\/tools\/confluent-3.2.1\/bin\/schema-registry-start \u00a0~\/tools\/confluent-3.2.1\/etc\/schema-registry\/schema-registry.properties<\/b><\/p>\n<h2><span style=\"font-weight: 400\">Writing Consumers and Producers\u00a0<\/span><\/h2>\n<p><span style=\"font-weight: 400\">Here, we will require to start up the Schema Registry server pointing to our ZooKeeper cluster. Further, we may require importing the Kafka Avro Serializer and Avro JARs into our Gradle project. Afterwards, we will require configuring the producer to use Schema Registry and the KafkaAvroSerializer. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Also, we will require to configure it to use Schema Registry and to use the KafkaAvroDeserializer, to write the consumer.<\/span><\/p>\n<p><span style=\"font-weight: 400\">Hence, this build file shows the Avro JAR files and such that we need.<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Gradle build file for Kafka Avro Serializer examples:<\/span><\/li>\n<\/ul>\n<pre class=\"EnlighterJSRAW\">plugins {\n   id \"com.commercehub.gradle.plugin.avro\" version \"0.9.0\"\n}\ngroup 'dataflair'\nversion '1.0-SNAPSHOT'\napply plugin: 'java'\nsourceCompatibility = 1.8\ndependencies {\n   compile \"org.apache.avro:avro:1.8.1\"\n   compile 'com.squareup.okhttp3:okhttp:3.7.0'\n   testCompile 'junit:junit:4.11'\n   compile 'org.apache.kafka:kafka-clients:0.10.2.0'\n   compile 'io.confluent:kafka-avro-serializer:3.2.1'\n}\nrepositories {\n   jcenter()\n   mavenCentral()\n   maven {\n       url \"http:\/\/packages.confluent.io\/maven\/\"\n   }\n}\navro {\n   createSetters = false\n   fieldVisibility = \"PRIVATE\"\n}<\/pre>\n<p><span style=\"font-weight: 400\">Remember to include the Kafka Avro Serializer lib (io.confluent:kafka-avro-serializer:3.2.1) and the Avro lib (org.apache.avro:avro:1.8.1).<\/span><\/p>\n<h3><span style=\"font-weight: 400\">a. Writing a Producer<\/span><\/h3>\n<p><span style=\"font-weight: 400\">Further, let\u2019s write the producer as follows.<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Producer that uses Kafka Avro Serialization and Kafka Registry:<\/span><\/li>\n<\/ul>\n<p><b>package com.dataflair.kafka.schema;<\/b><br \/>\n<b>import com.dataflair.phonebook.Employee;<\/b><br \/>\n<b>import com.dataflair.phonebook.PhoneNumber;<\/b><br \/>\n<b>import io.confluent.kafka.serializers.KafkaAvroSerializerConfig;<\/b><br \/>\n<b>import org.apache.kafka.clients.producer.KafkaProducer;<\/b><br \/>\n<b>import org.apache.kafka.clients.producer.Producer;<\/b><br \/>\n<b>import org.apache.kafka.clients.producer.ProducerConfig;<\/b><br \/>\n<b>import org.apache.kafka.clients.producer.ProducerRecord;<\/b><br \/>\n<b>import org.apache.kafka.common.serialization.LongSerializer;<\/b><br \/>\n<b>import io.confluent.kafka.serializers.KafkaAvroSerializer;<\/b><br \/>\n<b>import java.util.Properties;<\/b><br \/>\n<b>import java.util.stream.IntStream;<\/b><\/p>\n<pre class=\"EnlighterJSRAW\">public class AvroProducer {\n   private static Producer&lt;Long, Employee&gt; createProducer() {\n       Properties props = new Properties();\n       props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, \"localhost:9092\");\n       props.put(ProducerConfig.CLIENT_ID_CONFIG, \"AvroProducer\");\n       props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,\n               LongSerializer.class.getName());\n       \/\/ Configure the KafkaAvroSerializer.\n      props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,\n               KafkaAvroSerializer.class.getName());\n       \/\/ Schema Registry location.\n       props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,\n               \"http:\/\/localhost:8081\");\n       return new KafkaProducer&lt;&gt;(props);\n   }\n   private final static String TOPIC = \"new-Employees\";\n   public static void main(String... args) {\n       Producer&lt;Long, Employee&gt; producer = createProducer();\n        Employee bob = Employee.newBuilder().setAge(35)\n               .setFirstName(\"Bob\")\n               .setLastName(\"Jones\")\n               .setPhoneNumber(\n                       PhoneNumber.newBuilder()\n                               .setAreaCode(\"301\")\n                               .setCountryCode(\"1\")\n                               .setPrefix(\"555\")\n                               .setNumber(\"1234\")\n                              .build())\n               .build();\n       IntStream.range(1, 100).forEach(index-&gt;{\n           producer.send(new ProducerRecord&lt;&gt;(TOPIC, 1L * index, bob));\n       });\n       producer.flush();\n       producer.close();\n   }\n}<\/pre>\n<p><span style=\"font-weight: 400\">Also, make sure we configure the Schema Registry and the KafkaAvroSerializer as part of the producer setup.<\/span><\/p>\n<pre class=\"EnlighterJSRAW\">\/\/ Configure the KafkaAvroSerializer.\nprops.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,\n               KafkaAvroSerializer.class.getName());\n\/\/ Schema Registry location.        props.put(KafkaAvroSerializerConfig.SCHEMA_REGISTRY_URL_CONFIG,\n               \"http:\/\/localhost:8081\");\n<\/pre>\n<p><span style=\"font-weight: 400\">Further, we use the producer as expected<\/span><\/p>\n<h3><span style=\"font-weight: 400\">b. Writing a Consumer<\/span><\/h3>\n<p><span style=\"font-weight: 400\">Afterward, we will write to the consumer.<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Kafka Consumer that uses Kafka Avro Serialization and Schema Registry:<\/span><\/li>\n<\/ul>\n<pre class=\"EnlighterJSRAW\">package com.dataflair.kafka.schema;\nimport com.dataflair.phonebook.Employee;\nimport io.confluent.kafka.serializers.KafkaAvroDeserializer;\nimport io.confluent.kafka.serializers.KafkaAvroDeserializerConfig;\nimport org.apache.kafka.clients.consumer.Consumer;\nimport org.apache.kafka.clients.consumer.ConsumerConfig;\nimport org.apache.kafka.clients.consumer.ConsumerRecords;\nimport org.apache.kafka.clients.consumer.KafkaConsumer;\nimport org.apache.kafka.common.serialization.LongDeserializer;\nimport java.util.Collections;\nimport java.util.Properties;\nimport java.util.stream.IntStream;\npublic class AvroConsumer {\n   private final static String BOOTSTRAP_SERVERS = \"localhost:9092\";\n   private final static String TOPIC = \"new-Employee\";\n   private static Consumer&lt;Long, Employee&gt; createConsumer() {\n       Properties props = new Properties();\n       props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, BOOTSTRAP_SERVERS);\n       props.put(ConsumerConfig.GROUP_ID_CONFIG, \"KafkaExampleAvroConsumer\");\n       props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,\n               LongDeserializer.class.getName());\n       \/\/Use Kafka Avro Deserializer.\n       props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,\n               KafkaAvroDeserializer.class.getName());  \/\/&lt;----------------------\n       \/\/Use Specific Record or else you get Avro GenericRecord.\n       props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, \"true\");\n       \/\/Schema registry location.\n       props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,\n               \"http:\/\/localhost:8081\"); \/\/&lt;----- Run Schema Registry on 8081\n       return new KafkaConsumer&lt;&gt;(props);\n   }\n   public static void main(String... args) {\n       final Consumer&lt;Long, Employee&gt; consumer = createConsumer();\n       consumer.subscribe(Collections.singletonList(TOPIC));\n       IntStream.range(1, 100).forEach(index -&gt; {\n           final ConsumerRecords&lt;Long, Employee&gt; records =\n                   consumer.poll(100);\n           if (records.count() == 0) {\n               System.out.println(\"None found\");\n           } else records.forEach(record -&gt; {\n               Employee EmployeeRecord = record.value();\n               System.out.printf(\"%s %d %d %s \\n\", record.topic(),\n                       record.partition(), record.offset(), EmployeeRecord);\n           });\n       });\n   }\n}<\/pre>\n<p>Make sure, we have to tell the consumer where to find the Registry, as same as the producer, and we have to configure the Kafka Avro Deserializer.<\/p>\n<ul>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Configuring Schema Registry for the consumer:<\/span><\/li>\n<\/ul>\n<pre class=\"EnlighterJSRAW\">\/\/Use Kafka Avro Deserializer.\nprops.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG,\n               KafkaAvroDeserializer.class.getName());\n\/\/Use Specific Record or else you get Avro GenericRecord.\nprops.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, \"true\");\n\/\/Schema registry location.        props.put(KafkaAvroDeserializerConfig.SCHEMA_REGISTRY_URL_CONFIG,\n               \"http:\/\/localhost:8081\"); \/\/&lt;----- Run Schema Registry on 8081<\/pre>\n<p><span style=\"font-weight: 400\">In addition, use the generated version of the Employee object. Because, if we did not, instead of our generated Employee object, then it would use the Avro GenericRecord, which is a SpecificRecord. <\/span><br \/>\n<span style=\"font-weight: 400\">Moreover, we need to start up Kafka and ZooKeeper, to run the above example:<\/span><\/p>\n<ul>\n<li style=\"font-weight: 400\"><span style=\"font-weight: 400\">Running ZooKeeper and Kafka:<\/span><\/li>\n<\/ul>\n<pre class=\"EnlighterJSRAW\">kafka\/bin\/zookeeper-server-start.sh kafka\/config\/zookeeper.properties &amp;\nkafka\/bin\/kafka-server-start.sh kafka\/config\/server.properties<\/pre>\n<p>So, this was all about Kafka Schema Registry. Hope you like our explanation.<\/p>\n<h2><span style=\"font-weight: 400\">Conclusion<\/span><\/h2>\n<p><span style=\"font-weight: 400\">As a result, we have seen that Kafka Schema Registry manages Avro Schemas for Kafka consumers and Kafka producers. Also, Avro offers schema migration, which is important for streaming and big data architectures. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Hence, we have learned the whole concept to Kafka Schema Registry. Here, we discussed the need of Schema registry in Kafka. <\/span><\/p>\n<p><span style=\"font-weight: 400\">In addition, we have learned schema Registry Operations and Compatability Settings. At last, we saw Kafka Avro Schema and use of Schema Registry Rest API. <\/span><\/p>\n<p><span style=\"font-weight: 400\">Finally, we moved on to Writing Kafka Consumers and Producers using Schema registry and Avro Serialization. Furthermore, if you have any query, feel free to ask through the comment section.<\/span><\/p>\n","protected":false},"excerpt":{"rendered":"<p>In this Kafka Schema Registry tutorial, we will learn what the Schema Registry is and why we should use it with Apache Kafka. Also, we will see the concept of Avro schema evolution and&#46;&#46;&#46;<\/p>\n","protected":false},"author":5,"featured_media":15937,"comment_status":"open","ping_status":"closed","sticky":false,"template":"","format":"standard","meta":{"footnotes":""},"categories":[9],"tags":[1304,16550,7933,7934,9033,12626,12628,12632,16173],"class_list":["post-15651","post","type-post","status-publish","format-standard","has-post-thumbnail","hentry","category-kafka","tag-avro-schema","tag-kafka-schema","tag-kafka-schema-example","tag-kafka-schema-registry","tag-need-of-schema-registry","tag-schema-compatability-setting","tag-schema-registry-operations","tag-schema-registry","tag-why-schema-registry"],"yoast_head":"<!-- This site is optimized with the Yoast SEO plugin v27.8 - https:\/\/yoast.com\/product\/yoast-seo-wordpress\/ -->\n<title>Kafka Schema Registry | Learn Avro Schema - DataFlair<\/title>\n<meta name=\"description\" content=\"Kafka Schema registry, why Schema registry, compatability settings for Schema, Avro Schema, Schema Registry rest API, operations for schema registry\" \/>\n<meta name=\"robots\" content=\"index, follow, max-snippet:-1, max-image-preview:large, max-video-preview:-1\" \/>\n<link rel=\"canonical\" href=\"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/\" \/>\n<meta property=\"og:locale\" content=\"en_US\" \/>\n<meta property=\"og:type\" content=\"article\" \/>\n<meta property=\"og:title\" content=\"Kafka Schema Registry | Learn Avro Schema - DataFlair\" \/>\n<meta property=\"og:description\" content=\"Kafka Schema registry, why Schema registry, compatability settings for Schema, Avro Schema, Schema Registry rest API, operations for schema registry\" \/>\n<meta property=\"og:url\" content=\"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/\" \/>\n<meta property=\"og:site_name\" content=\"DataFlair\" \/>\n<meta property=\"article:publisher\" content=\"https:\/\/www.facebook.com\/DataFlairWS\/\" \/>\n<meta property=\"article:published_time\" content=\"2018-05-29T06:05:24+00:00\" \/>\n<meta property=\"og:image\" content=\"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry-01.jpg\" \/>\n\t<meta property=\"og:image:width\" content=\"1200\" \/>\n\t<meta property=\"og:image:height\" content=\"628\" \/>\n\t<meta property=\"og:image:type\" content=\"image\/jpeg\" \/>\n<meta name=\"author\" content=\"DataFlair Team\" \/>\n<meta name=\"twitter:card\" content=\"summary_large_image\" \/>\n<meta name=\"twitter:creator\" content=\"@DataFlairWS\" \/>\n<meta name=\"twitter:site\" content=\"@DataFlairWS\" \/>\n<meta name=\"twitter:label1\" content=\"Written by\" \/>\n\t<meta name=\"twitter:data1\" content=\"DataFlair Team\" \/>\n\t<meta name=\"twitter:label2\" content=\"Est. reading time\" \/>\n\t<meta name=\"twitter:data2\" content=\"12 minutes\" \/>\n<!-- \/ Yoast SEO plugin. -->","yoast_head_json":{"title":"Kafka Schema Registry | Learn Avro Schema - DataFlair","description":"Kafka Schema registry, why Schema registry, compatability settings for Schema, Avro Schema, Schema Registry rest API, operations for schema registry","robots":{"index":"index","follow":"follow","max-snippet":"max-snippet:-1","max-image-preview":"max-image-preview:large","max-video-preview":"max-video-preview:-1"},"canonical":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/","og_locale":"en_US","og_type":"article","og_title":"Kafka Schema Registry | Learn Avro Schema - DataFlair","og_description":"Kafka Schema registry, why Schema registry, compatability settings for Schema, Avro Schema, Schema Registry rest API, operations for schema registry","og_url":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/","og_site_name":"DataFlair","article_publisher":"https:\/\/www.facebook.com\/DataFlairWS\/","article_published_time":"2018-05-29T06:05:24+00:00","og_image":[{"width":1200,"height":628,"url":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry-01.jpg","type":"image\/jpeg"}],"author":"DataFlair Team","twitter_card":"summary_large_image","twitter_creator":"@DataFlairWS","twitter_site":"@DataFlairWS","twitter_misc":{"Written by":"DataFlair Team","Est. reading time":"12 minutes"},"schema":{"@context":"https:\/\/schema.org","@graph":[{"@type":"Article","@id":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/#article","isPartOf":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/"},"author":{"name":"DataFlair Team","@id":"https:\/\/data-flair.training\/blogs\/#\/schema\/person\/7f83c342f5d1632d6f7b4b0b0f447823"},"headline":"Kafka Schema Registry | Learn Avro Schema","datePublished":"2018-05-29T06:05:24+00:00","mainEntityOfPage":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/"},"wordCount":1930,"commentCount":1,"publisher":{"@id":"https:\/\/data-flair.training\/blogs\/#organization"},"image":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/#primaryimage"},"thumbnailUrl":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry-01.jpg","keywords":["Avro Schema","Kafka Schema","Kafka Schema Example","Kafka Schema Registry","Need of Schema Registry","Schema Compatability setting","Schema Registry Operations","Schema-Registry","Why Schema Registry"],"articleSection":["Apache Kafka Tutorials"],"inLanguage":"en-US","potentialAction":[{"@type":"CommentAction","name":"Comment","target":["https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/#respond"]}]},{"@type":"WebPage","@id":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/","url":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/","name":"Kafka Schema Registry | Learn Avro Schema - DataFlair","isPartOf":{"@id":"https:\/\/data-flair.training\/blogs\/#website"},"primaryImageOfPage":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/#primaryimage"},"image":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/#primaryimage"},"thumbnailUrl":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry-01.jpg","datePublished":"2018-05-29T06:05:24+00:00","description":"Kafka Schema registry, why Schema registry, compatability settings for Schema, Avro Schema, Schema Registry rest API, operations for schema registry","breadcrumb":{"@id":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/#breadcrumb"},"inLanguage":"en-US","potentialAction":[{"@type":"ReadAction","target":["https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/"]}]},{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/#primaryimage","url":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry-01.jpg","contentUrl":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2018\/05\/Kafka-Schema-Registry-01.jpg","width":1200,"height":628,"caption":"Kafka Schema Registry"},{"@type":"BreadcrumbList","@id":"https:\/\/data-flair.training\/blogs\/kafka-schema-registry\/#breadcrumb","itemListElement":[{"@type":"ListItem","position":1,"name":"Blog Home","item":"https:\/\/data-flair.training\/blogs\/"},{"@type":"ListItem","position":2,"name":"Apache Kafka Tutorials","item":"https:\/\/data-flair.training\/blogs\/category\/kafka\/"},{"@type":"ListItem","position":3,"name":"Kafka Schema Registry | Learn Avro Schema"}]},{"@type":"WebSite","@id":"https:\/\/data-flair.training\/blogs\/#website","url":"https:\/\/data-flair.training\/blogs\/","name":"DataFlair","description":"Learn Today. Lead Tomorrow.","publisher":{"@id":"https:\/\/data-flair.training\/blogs\/#organization"},"potentialAction":[{"@type":"SearchAction","target":{"@type":"EntryPoint","urlTemplate":"https:\/\/data-flair.training\/blogs\/?s={search_term_string}"},"query-input":{"@type":"PropertyValueSpecification","valueRequired":true,"valueName":"search_term_string"}}],"inLanguage":"en-US"},{"@type":"Organization","@id":"https:\/\/data-flair.training\/blogs\/#organization","name":"DataFlair","url":"https:\/\/data-flair.training\/blogs\/","logo":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/data-flair.training\/blogs\/#\/schema\/logo\/image\/","url":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2016\/07\/Data-Flair.png","contentUrl":"https:\/\/data-flair.training\/blogs\/wp-content\/uploads\/sites\/2\/2016\/07\/Data-Flair.png","width":106,"height":48,"caption":"DataFlair"},"image":{"@id":"https:\/\/data-flair.training\/blogs\/#\/schema\/logo\/image\/"},"sameAs":["https:\/\/www.facebook.com\/DataFlairWS\/","https:\/\/x.com\/DataFlairWS","https:\/\/www.linkedin.com\/company\/dataflair-web-services-pvt-ltd\/","https:\/\/www.youtube.com\/user\/DataFlairWS"]},{"@type":"Person","@id":"https:\/\/data-flair.training\/blogs\/#\/schema\/person\/7f83c342f5d1632d6f7b4b0b0f447823","name":"DataFlair Team","image":{"@type":"ImageObject","inLanguage":"en-US","@id":"https:\/\/secure.gravatar.com\/avatar\/4cf3a74600d131330b8c481d519afd1574093ed89f6d3396a95393ad223eb7cd?s=96&d=mm&r=g","url":"https:\/\/secure.gravatar.com\/avatar\/4cf3a74600d131330b8c481d519afd1574093ed89f6d3396a95393ad223eb7cd?s=96&d=mm&r=g","contentUrl":"https:\/\/secure.gravatar.com\/avatar\/4cf3a74600d131330b8c481d519afd1574093ed89f6d3396a95393ad223eb7cd?s=96&d=mm&r=g","caption":"DataFlair Team"},"description":"DataFlair Team creates expert-level guides on programming, Java, Python, C++, DSA, AI, ML, data Science, Android, Flutter, MERN, Web Development, and technology. Our goal is to empower learners with easy-to-understand content. Explore our resources for career growth and practical learning.","url":"https:\/\/data-flair.training\/blogs\/author\/dfteam1\/"}]}},"amp_enabled":true,"_links":{"self":[{"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/posts\/15651","targetHints":{"allow":["GET"]}}],"collection":[{"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/posts"}],"about":[{"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/types\/post"}],"author":[{"embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/users\/5"}],"replies":[{"embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/comments?post=15651"}],"version-history":[{"count":0,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/posts\/15651\/revisions"}],"wp:featuredmedia":[{"embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/media\/15937"}],"wp:attachment":[{"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/media?parent=15651"}],"wp:term":[{"taxonomy":"category","embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/categories?post=15651"},{"taxonomy":"post_tag","embeddable":true,"href":"https:\/\/data-flair.training\/blogs\/wp-json\/wp\/v2\/tags?post=15651"}],"curies":[{"name":"wp","href":"https:\/\/api.w.org\/{rel}","templated":true}]}}