Knime Analytics Platform provides it’s users a way to consume messages from Apache Kafka and publish the transformed results back to Kafka. This allows the users to integrate their knime workflows easily with a distributed streaming pub-sub mechanism.
With Knime 3.6 +, the users get a Kafka extension with three new nodes:
1. Kafka Connector
2. Kafka Consumer
3. Kafka Producer
A user familiar with Apache Kafka would already know what these three nodes do in a Knime workflow.
Kafka Extension to Knime
With the Kafka Extension, Knime users receive 3 new nodes:
- Kafka Connector Node: A knime connector node allows the workflow to connect to a Kafka cluster. This node takes a list of host/pair values to establish a connection.
- Kafka Producer Node: Once the user configures kafka connector node, set the Kafka Producer Node to publish records from an input to a topic in the connected cluster. To configure this node, set properties like client ID, Topic to publish records to and, the column of input record to publish. Also, the user can define how to send messages – Synchronously, Asynchronously, or Fire and Forget.
- Kafka Consumer Node: Finally, the Kafka consumer node allows the workflow to consume messages from a cluster. This node consumes records from the topic(s) and store them in a table. Along with setting the Client ID and Group ID to which this consumer belongs, the user can specify the topic(s) from which the data should be consumed. Also, configure a stop condition to tell the consumer node when to stop consuming messages from the topic. To do this, set a limit on the number of messages to consume in one execution, or set a timestamp up until which the node can consume records.
Apart from these configurations, settings related to Kafka’s producer or consumer options (like, request timeout, number of retries) can also be set in the Advanced Settings tab.
Integrating kafka in Knime workflow
Consider a scenario where we are getting each day’s customer transactions in Apache Kafka. We want to append extra information to these records from the stored data and pass them back to Kafka. Our Knime workflow can easily do that using the nodes from Kafka extension and other ETL nodes.
Consuming customer transactions
In the Kafka connector node, we specify the address of the cluster on which the customer transactions will be published. Once connected, we use the Kafka consumer node to set the topic from which transaction records are to be read. Also, we set the timestamp post in which messages are not consumed. Once the consumer node is executed, all the records are now available to us in a table.
ETL on the consumed records
We can use a Java Snippet node to parse the received JSON records into relevant columns. From the Parsed JSON, we have CustomerID, ProductID, OrderNumber, and Date. Now we use CustomerID as a key to search for customer information from our customer table in the database. Similarly, we use the ProductID column as a key to look for Product Information from a Product Table. Finally, we can publish the appended data back to a separate Kafka Topic.
Publishing transformed customer-transaction data
Finally, we have our appended data ready to be published to a Kafka cluster using Kafka producer node. In the node’s configuration, we can set the topic to which the records should be published. Before publishing the results, an intermediate step is to convert the table record to JSON using knime’s Table to JSON node. This ensures that only JSON format records are passed between the steps of the workflow.
The complete workflow is available here
With Kafka’s extension to knime, the user can now create a dynamic flow of information from different services to the knime workflow. A point of caution is that currently the Kafka extension is in early development, and it is therefore recommended not to use this extension in production.