In this blog post, we will discuss mainly Kafka Consumer and its Offsets. We will understand this using a case study implemented in Scala. This blog post assumes that you are aware of basic Kafka terminology.
CASE STUDY: The Producer is continuously producing records to the source topic. The Consumer is consuming those records from the same topic as it has subscribed for that topic. Obviously, in the real-world scenario, the speed of consumer and producer do not match. In fact, the consumer is mostly slow in consuming records. The reason can be, it has some processing to do on that records. Whatever may the reason, our aim for this post is to find how much our consumer lags behind in reading data/records from the source topic.
Well, it can be done by calculating the difference between the last offset the consumer has read and the latest offset which has been produced by the producer in the Kafka source topic.
First of all, let us make a Kafka consumer and set some of its properties.
val properties = new Properties() properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092") properties.put(ConsumerConfig.GROUP_ID_CONFIG, "KafkaExampleNewConsumer") properties.put("key.deserializer","org.apache.kafka.common.serialization.StringDeserializer") properties.put("value.deserializer","org.apache.kafka.common.serialization.StringDeserializer") properties.put("auto.offset.reset", "latest") val consumer = new KafkaConsumer[String, String](properties)
These are necessary Consumer Config properties which you need to set.
Bootstrap_Servers config as specified in the Kafka official site is “A list of host/port pairs to use for establishing the initial connection to the Kafka cluster. ” A Kafka server by default starts at port 9092.
Group_Id is the id of the group to which our consumer belongs.
Key.deserializer and Value.deserializer are to specify how to deserialize record’s key and value. As my producer serializes the records’ key and value using String Serializer. I need to deserialize it using String deserializer.
Note: You can see the code for my Kafka Producer from my Github repository “https://github.com/ksimar/Sample-Scala-Programs-to-use-Kafka-using-its-JavaAPI“. I am not showing the code for my Kafka Producer in this blog, as the blog is about Kafka Consumers.
Auto.offset.reset property is to specify whether you want to consume the records from the beginning (Earliest) or from the last committed offset (Latest).
Next, I have just created my Consumer with the properties set above.
Let us now make our consumer subscribe to a topic. To subscribe to topic, you can use –
Here, “topic-1” is the name of my topic.
The Consumer can subscribe to multiple topics, you need to pass the list of topics you want to consume from. For the sake of simplicity, I have just passed a single topic to consume from.
Now that the consumer has subscribed to the topic, it can consume from that topic.
The consumer maintains an offset to keep the track of the next record it needs to read.
Now, let us see how we can find the consumer offsets.
The Consumer offsets can be found using the method offset of class ConsumerRecord. This offset points to the record in a Kafka partition. The consumer consumes the records from the topic in the form of an object of class ConsumerRecord. The class ConsumerRecord also consists of a topic name and a partition number from which the record is being received, and a timestamp as marked by the corresponding ProducerRecord (the record sent by the producer).
Now, the consumer can consume the data from the subscribed topic using command poll(long).
Method poll accepts a long parameter to specify timeout – the time, in milliseconds, spent waiting in the poll if data is not available in the buffer.
Note – It is an error to not have subscribed to any topics or partitions before polling for data. That is, the consumer needs to be subscribed to some topic or partition before making a call to poll.
On each poll, consumer will try to use the last consumed offset as the starting offset and fetch sequentially.
When consumer polls the data from the topic, we get all the records of that topic read by the consumer in the form of an object of class ConsumerRecords,
val recordsFromConsumer = consumer.poll(10000)
which acts as a container to hold the list of ConsumerRecord per partition for a particular topic. We can retrieve all the records of a particular Topic read by the consumer as a list of ConsumerRecord using method records of class ConsumerRecords.
val recordsFromConsumerList = recordsFromConsumer.records("topic-1").toList
Or you can do
val recordsFromConsumerList = recordsFromConsumer.asScala.toList
For this, you need to import
To find the offset of the latest record read by the consumer, we can retrieve the last ConsumerRecord from the list of records in ConsumerRecords and then call offset method on that record.
val lastOffset = recordsFromConsumerList.last.offset()
Now, this offset is the last offset which is read by the consumer from the topic.
Now, to find the last offset of the topic, i.e the offset of the last record present in the topic, we can use endOffsets method of KafkaConsumer. It gives the last offset for the given partitions. Its return type is Map<TopicPartition, Long>.
The last offset of a partition is the offset of the upcoming message, i.e. the offset of the last available message + 1.
val partitionsAssigned = consumer.assignment() val endOffsetsPartitionMap = consumer.endOffsets(partitionsAssigned)
Method endOffsets accepts a collection of TopicPartition, for which you want to find the endOffsets.
As I want to find the endOffsets of the partitions which are assigned to my topic, I have passed the value of consumer.assignment() in the parameter of endOffsets.
The consumer.assignment gives the set of TopicPartitions the consumer has been assigned.
Note: You should call method “assignment” only after calling “poll” on the consumer; otherwise it will give null as the result.
Note: Method “endOffsets” doesn’t change the position of the consumer, unlike “seek” methods which do change the consumer position/offset.
Anytime if you want to check the current position of the consumer, you can find that using
val currentPosition = consumer.position(consumer.assignment().toList.head)
This method accepts a TopicPartition as a parameter for which you want to find the current position.
Now that we have with us the last read offset by the consumer and the endOffset of a partition of the source topic, we can find their difference to find the consumer lag.
val consumerLag = endOffsets.get(topicPartition.head) - lastReadOffset
Now, finally we have the consumer lag which we wanted in this case study. Thanks to class ConsumerRecords which not only lets you find the offsets but very other useful things.
That’s all for this blog post. Hope you find this blog useful. You can download the complete code from my Github repository “https://github.com/ksimar/Sample-Scala-Programs-to-use-Kafka-using-its-JavaAPI“.
And to know more about the Kafka and its API, you can see its official site which has explained everything very clearly.
Also, if you have any queries, you can comment down this post. I will be very happy to help you.
Thank you all for reading this blog 🙂
Happy Coding !!
Happy Blogging !!