Kafka Timestamp Extractor

Reading Time: 3 minutes

Hi folks, I hope you all’re doing well, so if you land up here you probably looking for Timestamp Extractor for kafka streams, so whats the buzz is all about? So in this blog we are going to look what it is and would explore it as well, so buckle up.

The Timestamp Extractor

As per docs, A timestamp extractor extracts a timestamp from an instance of ConsumerRecord. Timestamps are used to control the progress of streams.

So a ConsumerRecord, along with key/value pairs values consist of the following additional information, the partition number on which the record exist, offset from which the record is being read in that partition and the timestamp which could be either set by corresponding producer there are also different ways or types of retrieving the timestamp for a kafka message.

Now if you are or have working/worked on a Kafka streams application you might have already got the idea how crucial these timestamps are as on the basis of these the progress for streams, windows etc are being tracked, these timestamps that are embedded in each of the consumer records, so when a Kafka streams application gets initialize if no config is set for default.timestamp.extractor FailOnInvalidTimestamp class is used that’s where I got in to exploring this while working on one of my assignments.

Since the assignment includes multiple streams of real time data, it was a little troublesome if the stream application stops, the problem we were getting is that we were getting negative or null timestamps for some of the kafka messages not for all but in between since the produce was also using C++ based libraries librdkafka and cppkafka they sometimes were misbehaving . So to deal with this problem the kafka streams api provides some already implemented classes for interface TimestampExtractor which you can find under package org.apache.kafka.streams.processor.

  • LogAndSkipOnInvalidTimestamp if you use this, the messages with invalid timestamp would be logged and skipped, but in order for your application to log them you require to configure logging a little bit, well I used the following config
log4j.logger.kafka=WARN,stdout
log4j.logger.kafka.producer.async.DefaultEventHandler=WARN,stdout
log4j.logger.kafka.consumer.PartitionTopicInfo=WARN,stdout
log4j.logger.kafka.request.logger=WARN,fileAppender
log4j.additivity.kafka.request.logger=false
log4j.logger.kafka.network.Processor=WARN,fileAppender
log4j.additivity.kafka.network.Processor=false
log4j.logger.org.I0Itec.zkclient.ZkClient=WARN

The above config for log4j would be sufficient and prints pretty useful information on console, try use the log4j for logging in your streams application as that is being used by the kafka as default.

  • UsePreviousTimeOnInvalidTimestamp – You may want to use this if you want to use the timestamp from previous message as the timestamp of last message in case it has an invalid timestamp.
  • WallclockTimestampExtractor – Using this the extractor simply use System.currentTimeMillis() and insert that timestamp in the message that lacks it.

The kafka streams API also provides an interface TimestampExtractor which you could use give your custom implementation for timestamp extraction but if you just want to handle timestamp for invalid messages, I would suggest to use one of the implementation of abstract class ExtractRecordMetadataTimestamp.

Now once you are done with, implementing your custom extractor you could simply use the key value pair
– Key: default.timestamp.extractor
– Value: <Fully qualified name of the class which contains the extraction logic>

Below is one example of the custom timestamp extractor

package edu.knoldus.kafka.streams.extraction;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.streams.processor.TimestampExtractor;
public class CustomExtractor implements TimestampExtractor {
@Override
public long extract(ConsumerRecord<Object, Object> consumerRecord, long l) {
return consumerRecord.timestamp() + 60000;
}
}

now just add

default.timestamp.extractor key for value edu.knoldus.kafka.streams.extraction.CustomExtractor and you are good to go and with all that logging turned on you would be able to see all the logs for the application and some components.

So, guys that’s all for this blog happy coding, keep coding 🙂

References

Written by 

Shubham Verma is a software consultant. He likes to explore new technologies and trends in the IT world. Shubham is familiar with programming languages such as Java, Scala, C, C++, HTML, Javascript and he is currently working on reactive technologies like Scala, Akka , spark and Kafka. His hobbies includes playing computer games and watching hollywood movies.