Hands-on: Apache Kafka with Scala

Apache Kafka is an open sourced distributed streaming platform used for building real-time data pipelines and streaming applications. It is horizontally scalable, fault-tolerant, wicked fast, and runs in production in thousands of companies.

Before the introduction of Apache Kafka, data pipleines used to be very complex and time-consuming. A separate streaming pipeline was needed for every consumer. You can guess the complexity of it with the help of the below diagram,

Apache Kafka solved this problem and provided a universal pipleine that is fault tolerant, scalable and simple to use. There is now a single pipeline needed to cater multiple consumers, which can be also seen with the help of the below diagram,

                                                                                                          

This blog will help you in getting started with Apache Kafka, understand its basic terminologies and how to create Kafka producers and consumers using its APIs in Scala.

Apache Kafka is an open source project initially created by LinkedIn, that is designed to be a distributed, partitioned, replicated commit log service. It provides the functionality of a messaging system.

Topic and Messages in Apache Kafka

A topic in Kafka is where all the messages are stored that are produced. Messages are a unit of data which can be byte arrays and any object can be stored in any format

There are two components of any message, a key, and a value. The key is used to represent the data about the message and the value represents the body of the message.

Apache Kafka is a feed of messages which are organized into what is called a topic. Think of it as a category of messages. Kafka topics can be divided into a number of Partitions as shown in below diagram.

Apache Kafka uses partitions to scale a topic across many servers for producer writes. A Kafka cluster is comprised of one or more servers which are called “brokers“. Each of these Kafka brokers stores one or more partitions on it.

Kafka Producer and Consumer

Kafka provided Producer API and Consumer API. Producers are used to publish messages to Kafka topics that are stored in different topic partitions. Each of these topic partitions is an ordered, immutable sequence of messages that are continually appended to.

The Kafka Producer maps each message it would like to produce to a topic. Kafka Producer is the client that publishes records to the Kafka cluster and note that it is thread-safe. The producer client controls which partition it publishes messages to.

Consumers are to subscribe to the Kafka topics and process the feed of published messages in real-time. Kafka retains all the messages that are published regardless if they have been consumed or not for a configurable period of time. Below diagram give eagle point of view

Here we have multiple producers were they publish message into the topic on the different broker and from where the consumers read from any topic which they have subscribed for.

Apache Kafka is able to spread a single topic partition across multiple brokers, which allows for the horizontal scaling. By spreading the topic’s partitions across multiple brokers, consumers can read from a single topic in parallel.

So, this was a basic introduction to common terminologies used while working with Apache Kafka. Now, we will move ahead and understand how to create simple producer-consumer in Kafka.

As a pre-requisite, we should have zookeeper and Kafka server up and running. You can refer to this quickstart for setting up a single node Kafka cluster on your local machine.

Assuming that you have your server started, we will now start building a simple producer-consumer application where the producer will publish the message in a Kafka topic and a consumer can subscribe to the topic and fetch mesages in real-time.

In your sbt project add the following library dependency,

libraryDependencies += "org.apache.kafka" %% "kafka" % "2.1.0"

With the help of the following code, we will be publishing messages into Kafka topic “quick-start”

import java.util.Properties
import org.apache.kafka.clients.producer._

class Producer {

  def main(args: Array[String]): Unit = {
    writeToKafka("quick-start")
  }

  def writeToKafka(topic: String): Unit = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9094")
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
    val record = new ProducerRecord[String, String](topic, "key", "value")
    producer.send(record)
    producer.close()
  }
}

At the same time, we can have our Kafka Consumer up and running which is subscribing to the Kafka topic “quick-start” and displaying the messages.

import java.util
import org.apache.kafka.clients.consumer.KafkaConsumer
import java.util.Properties
import scala.collection.JavaConverters._

class Consumer {

  def main(args: Array[String]): Unit = {
    consumeFromKafka("quick-start")
  }

  def consumeFromKafka(topic: String) = {
    val props = new Properties()
    props.put("bootstrap.servers", "localhost:9094")
    props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer")
    props.put("auto.offset.reset", "latest")
    props.put("group.id", "consumer-group")
    val consumer: KafkaConsumer[String, String] = new KafkaConsumer[String, String](props)
    consumer.subscribe(util.Arrays.asList(topic))
    while (true) {
      val record = consumer.poll(1000).asScala
      for (data <- record.iterator)
        println(data.value())
    }
  }
}


References:

  1. https://kafka.apache.org/documentation/
  2. https://www.confluent.io
  3. https://github.com/shubhamdangare/Kafka-producer-consumer
Knoldus Pune Careers - Hiring Freshers

Get a head start on your career at Knoldus. Join us!