Apache Kafka v0.10 introduced a new feature Kafka Streams API – a client library which can be used for building applications and microservices, where the input and output data can be stored in Kafka clusters.
Kafka Streams provides state stores, which can be used by stream processing applications to store and query data. Every task in Kafka Streams uses one or more state stores which can be accessed via APIs to store and query data required for processing. These state stores can either be a persistent key-value store, an in-memory hashmap, or another convenient data structure. It also offers fault-tolerance and automatic recovery for local state stores.
Kafka Streams allows read-only queries of these state stores by methods, processes or external applications. This is provided through a feature called Interactive Queries. Interactive queries were designed to give developers access to the internal state or state stores of a stream processing application (KIP-67)
Eliminating the need for an external database
With Interactive Queries and Apache Kafka, we can eliminate the need for an external database by making all the states of your application queryable through Interactive Query API.
Kafka Streams library maintains the internal state through embedded databases (default is RocksDB, but one can plug their own). Now, These embedded databases act as materialized views of logs that are stored in Apache Kafka. With the help of Interactive Queries, we can directly expose this embedded state to applications.
Making Kafka Streams Application queryable
Kafka Streams handle most of the low-level querying, metadata discovery and data fault tolerance. Depending on the application, one can query directly with no extra work (local state stores), or might have to implement a layer of indirection for distributed querying (remote state stores).
Querying local state stores
Since Kafka Streams application typically runs on multiple instances, the state that is locally available on any given instance is only a subset of the entire state of application Querying the local state stores on an instance returns the data which is locally available on that particular instance.
The example below has a KStream of textLines and it further creates a key-value store named “CountsKeyValueStore” which will be responsible for holding the latest count for any word that is found on the topic “word-count-input”.
// Define the processing topology (here: WordCount) KGroupedStream&amp;amp;amp;amp;lt;String, String&amp;amp;amp;amp;gt; groupedByWord = textLines .flatMapValues(value -&amp;amp;amp;amp;gt; Arrays.asList(value.toLowerCase().split("\\W+"))) .groupBy((key, word) -&amp;amp;amp;amp;gt; word, Serialized.with(stringSerde, stringSerde)); // Create a key-value store named "CountsKeyValueStore" for all the word counts groupedByWord.count(Materialized.&amp;amp;amp;amp;lt;String, String, KeyValueStore&amp;amp;amp;amp;lt;Bytes, byte&amp;amp;amp;amp;gt;as("CountsKeyValueStore")); // Start an instance of the topology KafkaStreams streams = new KafkaStreams(builder, config); streams.start();
After the application has started, you can query the “CountsKeyValueStore” using the ReadOnlyKeyValueStore API. It can be done in the following manner,
// Get the key-value store CountsKeyValueStore ReadOnlyKeyValueStore keyValueStore = streams.store("CountsKeyValueStore", QueryableStoreTypes.keyValueStore());
Querying Remote State Stores
To query remote states for the entire app, you must expose the application’s full state to other applications, including applications that are running on different machines.
Below are the steps needed to query remote state stores and make the full state of the stream processing application queryable:
Add an RPC layer to your application
- This helps the instances of your application to interact via the network
- The RPC layer should be embedded within the Kafka Streams application and expose an endpoint through which application instances and applications can connect to it.
Expose the RPC endpoints.
- To enable remote state store discovery in a distributed Kafka Streams application, you must expose the RPC endpoint of application instances using the application.server configuration setting of Kafka Streams.
- The value of this configuration property is unique for each instance of your application.
- It helps in keeping track of the RPC endpoint information for every instance of an application, its state stores, and assigned stream partitions through instances of StreamsMetadata.
Discover remote application instances and their state stores
- The RPC layer should discover remote application instances and their state stores and query locally available state stores to make the full state of your application queryable.
- The remote application instances can forward queries to other app instances if a particular instance lacks the local data to respond to a query. The locally available state stores can directly respond to queries.