KSQL: Getting started with Streaming SQL for Apache Kafka

Reading Time: 3 minutes

KSQL is a SQL streaming engine for Apache Kafka which puts the power of stream processing into the hands of anyone who knows SQL.

In this blog, we shall understand the basics of KSQL and how to get it up and running it in the easiest way on your local machines.

What is KSQL?

KSQL is a is distributed, scalable, reliable, and real time SQL engine for Apache Kafka. It allows you to write SQL queries to analyze a stream of data in real time. A query with KSQL will keep generating results over the stream of unbounded data until you stop it.

It is built on top of Kafka Streams which means that KSQL offers similar concepts as to what Kafka Streams offers, but all with a SQL language: streams (KStreams), tables (KTables), joins, windowing etc.

Comparing a KSQL query to a Relational Database query

Queries in relational databases are one-time queries that are run once to completion over a data set, eg. SELECT statement on finite rows. In contrast, what KSQL runs are continuous queries, transformations that run continuously as new data passes through them, on streams of data in Kafka topics.

When to use KSQL?


1. Real-time monitoring and real-time analytics

It can be used in defining custom business-level metrics that are computed in real-time and that you can monitor and alert off of at real-time

Example KSQL query,

CREATE TABLE error_counts AS SELECT error_code, count(*)FROM monitoring_stream WINDOW TUMBLING (SIZE 1 MINUTE) WHERE type = 'ERROR'

2. Security and anomaly detection

KSQL gives a simple, sophisticated, and real-time way of defining aggregation and anomaly detection queries on real-time streams.

Example KSQL query,

CREATE TABLE possible_fraud AS SELECT card_number,
count() FROM authorization_attempts
WINDOW TUMBLING (SIZE 5 SECONDS)
GROUP BY card_number HAVING count() > 3;

3. Online Data Integration

KSQL queries can be used to integrate and join different streams of data in real-time.

CREATE STREAM vip_users AS SELECT userid, page, action
FROM clickstream c
LEFT JOIN users u ON c.userid = u.user_id
WHERE u.level = ‘Platinum’;

Starting the KSQL Server

The confluent platform comes along with KSQL. In this blog, we will be using Confluent Platform 5.0 assuming that Kafka is already up and running on your systems.

We will start the KSQL Server with the ksql-server-start command:

:~/confluent-5.0.0$ ./bin/ksql-server-start ./etc/ksql/ksql-server.properties 

Let’s start the KSQL CLI with the ksql command:

:~/confluent-5.0.0$ ./bin/ksql

The CLI offers a prompt that is very similar to the one offered by SQL databases. You can enter SQL commands or type help to see additional commands.

Create Topics and produce Data

We will create and produce data to the Kafka topics pageviews and users using the KSQL datagen that is included Confluent Platform.

  1. Create the pageviews topic and produce data using the data generator
$ <path-to-confluent>/bin/ksql-datagen quickstart=pageviews format=delimited topic=pageviews maxInterval=500

Inspect Kafka Topics

In the KSQL CLI, run the following statement:

SHOW TOPICS;

Inspect the users topic by using the PRINT statement:

PRINT ‘pageviews’;

ksql> PRINT 'pageviews'; Format:STRING 31/3/19 12:03:27 AM IST , 82351 , 1553970807360,User_4,Page_34 31/3/19 12:03:27 AM IST , 82361 , 1553970807586,User_7,Page_83 31/3/19 12:03:27 AM IST , 82371 , 1553970807922,User_5,Page_75 31/3/19 12:03:28 AM IST , 82381 , 1553970808323,User_5,Page_15 31/3/19 12:03:28 AM IST , 82391 , 1553970808601,User_8,Page_46 31/3/19 12:03:29 AM IST , 82401 , 1553970809025,User_4,Page_81 31/3/19 12:03:29 AM IST , 82411 , 1553970809174,User_4,Page_49 31/3/19 12:03:29 AM IST , 82421 , 1553970809208,User_8,Page_71 31/3/19 12:03:29 AM IST , 82431 , 1553970809403,User_6,Page_28 31/3/19 12:03:29 AM IST , 82441 , 1553970809683,User_1,Page_33

Create a Kafka Stream and query it using KSQL

Create a stream pageviews_original from the Kafka topic pageviews, specifying the value_format of DELIMITED.

CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH \
ksql> CREATE STREAM pageviews_original (viewtime bigint, userid varchar, pageid varchar) WITH \ > (kafka_topic='pageviews', value_format='DELIMITED'); 
Message
----------------
Stream created
----------------
ksql>

We will query the created stream using the SELECT statement

SELECT pageid FROM pageviews_original LIMIT 5;

Exit KSQL

Run the exit command to leave the KSQL CLI.

ksql> exit

If you are running Confluent Platform using the CLI, you can stop it with this command.

$ <path-to-confluent>/bin/confluent stop

You can refer to the next blog in the series, KSQL: Streams and Tables for in depth knowledge of dual streaming model used by KSQL to process events.

You can now learn more about Apache Kafka with the help of our book, Putting Apache Kafka to Use

References:

Written by 

Himani is a Software Consultant, having experience of more than 2.5 years. She is very dedicated, hardworking and focussed. She is Familiar with C#, C++, C , PHP, Scala and Java and has interest in Functional programming. She is very helpful and loves to share her knowledge. Her hobbies include reading books and cooking.

2 thoughts on “KSQL: Getting started with Streaming SQL for Apache Kafka4 min read

Comments are closed.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading