Hi folks, I hope you all’re doing well, so if you land up here you probably looking for Timestamp Extractor for kafka streams, so whats the buzz is all about? So in this blog we are going to look what it is and would explore it as well, so buckle up.
The Timestamp Extractor
As per docs, A timestamp extractor extracts a timestamp from an instance of ConsumerRecord
. Timestamps are used to control the progress of streams.
So a ConsumerRecord
, along with key/value pairs values consist of the following additional information, the partition number on which the record exist, offset from which the record is being read in that partition and the timestamp which could be either set by corresponding producer there are also different ways or types of retrieving the timestamp for a kafka message.
Now if you are or have working/worked on a Kafka streams application you might have already got the idea how crucial these timestamps are as on the basis of these the progress for streams, windows etc are being tracked, these timestamps that are embedded in each of the consumer records, so when a Kafka streams application gets initialize if no config is set for default.timestamp.extractor
FailOnInvalidTimestamp
class is used that’s where I got in to exploring this while working on one of my assignments.
Since the assignment includes multiple streams of real time data, it was a little troublesome if the stream application stops, the problem we were getting is that we were getting negative or null timestamps for some of the kafka messages not for all but in between since the produce was also using C++ based libraries librdkafka
and cppkafka
they sometimes were misbehaving . So to deal with this problem the kafka streams api provides some already implemented classes for interface TimestampExtractor
which you can find under package org.apache.kafka.streams.processor
.
LogAndSkipOnInvalidTimestamp
if you use this, the messages with invalid timestamp would be logged and skipped, but in order for your application to log them you require to configure logging a little bit, well I used the following config
log4j.logger.kafka=WARN,stdout
log4j.logger.kafka.producer.async.DefaultEventHandler=WARN,stdout
log4j.logger.kafka.consumer.PartitionTopicInfo=WARN,stdout
log4j.logger.kafka.request.logger=WARN,fileAppender
log4j.additivity.kafka.request.logger=false
log4j.logger.kafka.network.Processor=WARN,fileAppender
log4j.additivity.kafka.network.Processor=false
log4j.logger.org.I0Itec.zkclient.ZkClient=WARN
The above config for log4j
would be sufficient and prints pretty useful information on console, try use the log4j for logging in your streams application as that is being used by the kafka as default.
UsePreviousTimeOnInvalidTimestamp
– You may want to use this if you want to use the timestamp from previous message as the timestamp of last message in case it has an invalid timestamp.WallclockTimestampExtractor
– Using this the extractor simply useSystem.currentTimeMillis()
and insert that timestamp in the message that lacks it.
The kafka streams API also provides an interface TimestampExtractor
which you could use give your custom implementation for timestamp extraction but if you just want to handle timestamp for invalid messages, I would suggest to use one of the implementation of abstract class ExtractRecordMetadataTimestamp
.
Now once you are done with, implementing your custom extractor you could simply use the key value pair
– Key: default.timestamp.extractor
– Value: <Fully qualified name of the class which contains the extraction logic>
Below is one example of the custom timestamp extractor
now just add
default.timestamp.extractor
key for value edu.knoldus.kafka.streams.extraction.CustomExtractor
and you are good to go and with all that logging turned on you would be able to see all the logs for the application and some components.
So, guys that’s all for this blog happy coding, keep coding 🙂
References
- https://kafka.apache.org/documentation.html
- https://docs.confluent.io/3.0.0/streams/developer-guide.html#timestamp-extractor

