This post will show you how to write and read messages in Avro format to/from Kafka.
Instead of using with plain-text messages, though, we will serialize our messages with Avro. That will allow us to send much more complex data structures over the wire.
Avro
Apache Avro is a language neutral data serialization format. A avro data is described in a language independent schema. The schema is usually written in JSON format and the serialization is usually to binary files although serialization to JSON is also supported.
Let’s add Avro dependency in build:
"org.apache.avro" % "avro" % "1.7.7"
We will consider schema like this:
{ "namespace": "kakfa-avro.test", "type": "record", "name": "user", "fields":[ { "name": "id", "type": "int"}, { "name": "name", "type": "string"}, { "name": "email", "type": ["string", "null"]} ] }
You can instantiate schema as follows:
val schema: Schema = new Schema.Parser().parse(SCHEMA_STRING)
Here, SCHEMA_STRING is the JSON listed above as a Java String.
Now, we can create a Avro generic record object with instantiated schema and put user data into it.
val genericRecord: GenericRecord = new GenericData.Record(schema) genericUser.put("id", "1") genericUser.put("name", "singh") genericUser.put("email", null)
After creating the generic record. Now we need to serialize the above generic record object. Here we will use Avro binary encoder to encode object into byte array.
val writer = new SpecificDatumWriter[GenericRecord](schema) val out = new ByteArrayOutputStream() val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null) writer.write(genericUser, encoder) encoder.flush() out.close() val serializedBytes: Array[Byte] = out.toByteArray()
You can also use many third party API to serialize and deserialize and may be most friendly API.
So, it’s time to send serialized message to Kafka using producer. Here is entire Kafka Producer code:
Producer
import java.util.{Properties, UUID} import org.apache.avro.Schema import org.apache.avro.Schema.Parser import domain.User import org.apache.avro.generic.GenericData import org.apache.avro.generic.GenericRecord import org.apache.avro.specific.SpecificDatumWriter import java.io.ByteArrayOutputStream import org.apache.avro.io._ import kafka.producer.{KeyedMessage, Producer, ProducerConfig} import scala.io.Source class KafkaProducer() { private val props = new Properties() props.put("metadata.broker.list", "localhost:9092") props.put("message.send.max.retries", "5") props.put("request.required.acks", "-1") props.put("serializer.class", "kafka.serializer.DefaultEncoder") props.put("client.id", UUID.randomUUID().toString()) private val producer = new Producer[String, Array[Byte]](new ProducerConfig(props)) //Read avro schema file val schema: Schema = new Parser().parse(Source.fromURL(getClass.getResource("/schema.avsc")).mkString) // Create avro generic record object val genericUser: GenericRecord = new GenericData.Record(schema) //Put data in that generic record genericUser.put("id", "1") genericUser.put("name", "sushil") genericUser.put("email", null) // Serialize generic record into byte array val writer = new SpecificDatumWriter[GenericRecord](schema) val out = new ByteArrayOutputStream() val encoder: BinaryEncoder = EncoderFactory.get().binaryEncoder(out, null) writer.write(genericUser, encoder) encoder.flush() out.close() val serializedBytes: Array[Byte] = out.toByteArray() val queueMessage = new KeyedMessage[String, Array[Byte]](topic, serializedBytes) producer.send(queueMessage)
Now, in the same way we updated the producer to send binary message, we will create consumer which consume message from Kafka, deserialize and make generic record from it.
Consumer
import java.util.Properties import domain.User import org.apache.avro.Schema import org.apache.avro.io.DatumReader import org.apache.avro.io.Decoder import org.apache.avro.specific.SpecificDatumReader import org.apache.avro.generic.GenericRecord import org.apache.avro.io.DecoderFactory import kafka.consumer.{Consumer, ConsumerConfig, ConsumerTimeoutException, Whitelist} import kafka.serializer.DefaultDecoder import scala.io.Source class KafkaConsumer() { private val props = new Properties() val groupId = "demo-topic-consumer" val topic = "demo-topic" props.put("group.id", groupId) props.put("zookeeper.connect", "localhost:2181") props.put("auto.offset.reset", "smallest") props.put("consumer.timeout.ms", "120000") props.put("auto.commit.interval.ms", "10000") private val consumerConfig = new ConsumerConfig(props) private val consumerConnector = Consumer.create(consumerConfig) private val filterSpec = new Whitelist(topic) private val streams = consumerConnector.createMessageStreamsByFilter(filterSpec, 1, new DefaultDecoder(), new DefaultDecoder())(0) lazy val iterator = streams.iterator() //read avro schema file val schemaString = Source.fromURL(getClass.getResource("/schema.avsc")).mkString // Initialize schema val schema: Schema = new Schema.Parser().parse(schemaString) def read() = try { if (hasNext) { println("Getting message from queue.............") val message: Array[Byte] = iterator.next().message() getUser(message) } else { None } } catch { case ex: Exception => ex.printStackTrace() None } private def hasNext: Boolean = try iterator.hasNext() catch { case timeOutEx: ConsumerTimeoutException => false case ex: Exception => ex.printStackTrace() println("Got error when reading message ") false } private def getUser(message: Array[Byte]) = { // Deserialize and create generic record val reader: DatumReader[GenericRecord] = new SpecificDatumReader[GenericRecord](schema) val decoder: Decoder = DecoderFactory.get().binaryDecoder(message, null) val userData: GenericRecord = reader.read(null, decoder) // Make user object val user = User(userData.get("id").toString.toInt, userData.get("name").toString, try { Some(userData.get("email").toString) } catch { case _ => None }) Some(user) } }
Conclusion
In this post, we have seen how to produce messages encoded with Avro, how to send them into Kafka, how to consume with consumer and finally how to decode them. This help us to make messaging system with complex data with the help of Kafka and Avro.
The one thing you have to note that the same Avro schema must be present on the both side (Producer and Consumer) to encode and decode message. Any change to schema, must be applied on both side. To overcome this problem, Confluent Platform comes into play with its Schema Registry which allow us to share Avro schema and handle changes of schema’s.
You can find complete code on GitHub.
References:
Reblogged this on anuragknoldus.
Reblogged this on knoldermanish.