KSQL is a SQL streaming engine for Apache Kafka which puts the power of stream processing into the hands of anyone who knows SQL.
In this blog, we shall understand the basics of KSQL and how to get it up and running it in the easiest way on your local machines.
What is KSQL?
KSQL is a is distributed, scalable, reliable, and real time SQL engine for Apache Kafka. It allows you to write SQL queries to analyze a stream of data in real time. A query with KSQL will keep generating results over the stream of unbounded data until you stop it.
It is built on top of Kafka Streams which means that KSQL offers similar concepts as to what Kafka Streams offers, but all with a SQL language: streams (KStreams), tables (KTables), joins, windowing etc.
Comparing a KSQL query to a Relational Database query
Queries in relational databases are one-time queries that are run once to completion over a data set, eg. SELECT statement on finite rows. In contrast, what KSQL runs are continuous queries, transformations that run continuously as new data passes through them, on streams of data in Kafka topics.
When to use KSQL?
1. Real-time monitoring and real-time analytics
It can be used in defining custom business-level metrics that are computed in real-time and that you can monitor and alert off of at real-time
Example KSQL query,
CREATE TABLE error_counts AS SELECT error_code, count(*)FROM monitoring_stream WINDOW TUMBLING (SIZE 1 MINUTE) WHERE type = 'ERROR'
2. Security and anomaly detection
KSQL gives a simple, sophisticated, and real-time way of defining aggregation and anomaly detection queries on real-time streams.
Example KSQL query,
CREATE TABLE possible_fraud AS SELECT card_number,
count() FROM authorization_attempts
WINDOW TUMBLING (SIZE 5 SECONDS)
GROUP BY card_number HAVING count() > 3;
3. Online Data Integration
KSQL queries can be used to integrate and join different streams of data in real-time.
CREATE STREAM vip_users AS SELECT userid, page, action
FROM clickstream c
LEFT JOIN users u ON c.userid = u.user_id
WHERE u.level = ‘Platinum’;
Starting the KSQL Server
The confluent platform comes along with KSQL. In this blog, we will be using Confluent Platform 5.0 assuming that Kafka is already up and running on your systems.
We will start the KSQL Server with the ksql-server-start command:
:~/confluent-5.0.0$ ./bin/ksql-server-start ./etc/ksql/ksql-server.properties
Let’s start the KSQL CLI with the ksql command:
The CLI offers a prompt that is very similar to the one offered by SQL databases. You can enter SQL commands or type help to see additional commands.
Create Topics and produce Data
We will create and produce data to the Kafka topics pageviews and users using the KSQL datagen that is included Confluent Platform.
- Create the pageviews topic and produce data using the data generator
$ <path-to-confluent>/bin/ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500
Inspect Kafka Topics
In the KSQL CLI, run the following statement:
Inspect the users topic by using the PRINT statement:
ksql> PRINT 'pageviews'; Format:STRING 31/3/19 12:03:27 AM IST , 82351 , 1553970807360,User_4,Page_34 31/3/19 12:03:27 AM IST , 82361 , 1553970807586,User_7,Page_83 31/3/19 12:03:27 AM IST , 82371 , 1553970807922,User_5,Page_75 31/3/19 12:03:28 AM IST , 82381 , 1553970808323,User_5,Page_15 31/3/19 12:03:28 AM IST , 82391 , 1553970808601,User_8,Page_46 31/3/19 12:03:29 AM IST , 82401 , 1553970809025,User_4,Page_81 31/3/19 12:03:29 AM IST , 82411 , 1553970809174,User_4,Page_49 31/3/19 12:03:29 AM IST , 82421 , 1553970809208,User_8,Page_71 31/3/19 12:03:29 AM IST , 82431 , 1553970809403,User_6,Page_28 31/3/19 12:03:29 AM IST , 82441 , 1553970809683,User_1,Page_33
Create a Kafka Stream and query it using KSQL
Create a stream pageviews_original from the Kafka topic pageviews, specifying the value_format of DELIMITED.
CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH \
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH \ > (kafka_topic='pageviews', value_format='DELIMITED');
We will query the created stream using the SELECT statement
SELECT pageid FROM pageviews_original LIMIT 5;
Run the exit command to leave the KSQL CLI.
If you are running Confluent Platform using the CLI, you can stop it with this command.
$ <path-to-confluent>/bin/confluent stop
You can refer to the next blog in the series, KSQL: Streams and Tables for in depth knowledge of dual streaming model used by KSQL to process events.
You can now learn more about Apache Kafka with the help of our book, Putting Apache Kafka to Use