Error Registering Avro Schema | Multiple Schemas In One Topic


org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"schema1","namespace":"test","fields":[{"name":"Name","type":"string"},{"name":"Age","type":"int"},{"name":"Location","type":"string"}]}
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:170)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:188)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:245)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:237)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:232)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:59)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:91)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:72)
at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:158)
at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:57)
at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

You might have come across a similar exception while working with AVRO schemas.

Kafka throws this exception due to a compatibility issue since the current schema is not compatible with the earlier schema registered on this topic.

You can check the current schema(s) on the topic using:
curl -X GET <a href="http://localhost:8081/subjects//versions/”&gt;http://localhost:8081/subjects//versions/

Schema Compatibility in Kafka

The schema registry server enforces certain compatibility check when new schemas are registered in a subject.

The schema compatibility defines whether records produced with the new schema will be able to cope up with the existing data or the vice versa.

Currently, the schema registry server supports the following compatibilities:

  • Backward compatibility (default): A new schema is backward compatible if it can be used to read the data written in all previous schemas. Backward compatibility is useful for loading data into systems like Hadoop since one can always query data of all versions using the latest schema.
  • Forward compatibility: A new schema is forward compatible if all previous schemas can read data written in this schema. Forward compatibility is useful for consumer applications that can only deal with data in a particular version that may not always be the latest version.
  • Full compatibility: A new schema is fully compatible if it’s both backward and forward compatible.
  • No compatibility: A new schema can be any schema as long as it’s a valid Avro.

The Need

After the initial schema is defined, applications may need to evolve it over time. When this happens, it’s critical for the downstream consumers to be able to handle data encoded with both the old and the new schema seamlessly.

Solution

Approach 1: Set Compatibility NONE

A simple workaround to avoid the exception is to set the compatibility as NONE, that will allow records with any schema to be produced to the same topic.

Provided, the schema still must be a valid AVRO.

curl -X PUT -H “Content-Type: application/json” \
   –data ‘{“compatibility”: “NONE”}’ \
   http://localhost:8081/config/-value

But Turning off the compatibility check won’t be a good and a safe idea.
So we must register the new schema with the next approach.

Approach 2: Register Multiple Schemas On a Single Topic

To register multiple schemas on one topic, we must ensure that the compatibility between new & previous schema is preserved.

BACKWARD

Consider a topic with the following initial schema

{“type”:”record”,”name”:”schema1″,”namespace”:”test”,”fields”:[{“name”:”Name”,”type”:”string”},{“name”:”Age”,”type”:”int”}]}

Start producing data to this topic
 ./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic topic-with-multiple-schemas --property value.schema='{"type":"record","name":"schema1","namespace":"test","fields":[{"name":"Name","type":"string"},{"name":"Age","type":"int"}]}'
{"Name":"Neha","Age":20}
{"Name":"Green","Age":45}

Let’s add one more field to the schema this time, and try producing again

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic topic-with-multiple-schemas --property value.schema='{"type":"record","name":"schema2","namespace":"test","fields":[{"name":"Name","type":"string"},{"name":"Age","type":"int"},{"name":"Address","type":"string"}]}'
 {"Name":"Neha","Age":20,"Address":"Delhi"}
org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: {"type":"record","name":"schema2","namespace":"test","fields":[{"name":"Name","type":"string"},{"name":"Age","type":"int"},{"name":"Address","type":"string"}]}
 Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema; error code: 409
  at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:182)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:203)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:292)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:284)
  at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:279)
  at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:61)
  at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:93)
  at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:74)
  at io.confluent.kafka.formatter.AvroMessageReader.readMessage(AvroMessageReader.java:159)
  at kafka.tools.ConsoleProducer$.main(ConsoleProducer.scala:58)
  at kafka.tools.ConsoleProducer.main(ConsoleProducer.scala)

We got the exception since we are trying to register a new schema that is not compatible enough to cope up with the existing data.

Since the new field is missing in the previous records so it’s not clear what value should be assigned to the new field.

So, The solution will be adding a default value to the new field.

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic topic-with-multiple-schemas --property value.schema='{"type":"record","name":"schema2","namespace":"test","fields":[{"name":"Name","type":"string"},{"name":"Age","type":"int"},{"name":"Address","type":"string","default":"India"}]}'
{"Name":"Neha","Age":20,"Address":"Delhi"}
{"Name":"Sam","Age":30,"Address":"UK"}

The default value specified in the new schema will be used for the missing field when deserializing the data encoded with the old schema.

Consider registering a completely different schema to the same topic.

./bin/kafka-avro-console-producer --broker-list localhost:9092 --topic topic-with-multiple-schemas --property value.schema='{"type":"record","name":"schema3","namespace":"test","fields":[{"name":"Name","type":"string","default":"Default Name"},{"name":"Salary","type":"double","default":"25000.00"},{"name":"Designation","type":"string","default":"Software Consultant"}]}'
{"Name":"Sam","Salary":30000,"Designation":"Consultant"}

Note: You can always check the schema(s) for this topic using
curl -X GET http://localhost:8081/subjects//versions/

So now, despite being having a completely different schema, I can still produce records of the new schema type.

FORWARD

Consider a use case where a consumer has application logic tied to a particular version of the schema. Even after the schema evolves, the application logic may not be updated immediately.

To support this use case, we can evolve the schemas in a forward compatible way: data encoded with the new schema can be read with the old schema.

Any new schema that has all the fields from the previous schema in it, will be forward compatible. So while projecting data written in the new schema to the old one, the new field is simply dropped.

Example
The schema ‘{“type”:”record”,”name”:”schema2″,”namespace”:”test”,”fields”:[{“name”:”Name”,”type”:”string”},{“name”:”Age”,”type”:”int”},{“name”:”Address”,”type”:”string”,”default”:”India”}]}’ was forward compatible as well since it had all the old fields(Name, Age) along with the new fields.

But, even if a single field is dropped in the new schema, the schema won’t be forward compatible anymore since we wouldn’t know how to fill in the value for the missing field for the new data.

Example
Removing field Age will disrupt the forward compatibility

‘{“type”:”record”,”name”:”schema4″,”namespace”:”test”,”fields”:[{“name”:”Name”,”type”:”string”},{“name”:”Address”,”type”:”string”,”default”:”India”}]}’

FULL

To ensure old data can be read with the new schema and new data can also be read with the old schema, we must set the compatibility as FULL.

References:

https://docs.confluent.io/2.0.1/schema-registry/docs/api.html#put–config-(string-

https://docs.confluent.io/2.0.1/avro.html#serialization-and-evolution



knoldus-advt-sticker
 

This entry was posted in Apache Kafka, Java, Scala and tagged , , , , . Bookmark the permalink.

One Response to Error Registering Avro Schema | Multiple Schemas In One Topic

  1. Pingback: Avro Schema Compatibility In Kafka – Curated SQL

Leave a Reply

Fill in your details below or click an icon to log in:

WordPress.com Logo

You are commenting using your WordPress.com account. Log Out /  Change )

Google+ photo

You are commenting using your Google+ account. Log Out /  Change )

Twitter picture

You are commenting using your Twitter account. Log Out /  Change )

Facebook photo

You are commenting using your Facebook account. Log Out /  Change )

w

Connecting to %s