Alpakka : Creating Data Pipelines

Reading Time: 4 minutes
Alpakka

In my previous blogs I discussed about akka-streams , materialization and graphs in akka-stream . It was so easy to create sources, sinks and flows and connect them using methods and create runnable graphs. Now consider a situation where I’m getting data from Kafka and want to push it in Cassandra or Elasticsearch ? How to create such source or flow or sink that can do this work? We can create such Sinks and Sources from scratch but Alpakka makes it much easier for us. So, let’s begin!

What is Alpakka?

Alpakka is an open source project, that is built on top of Akka-Streams to provide a DSL for reactive and stream-oriented programming. It can be used using Java and Scala both. It has built-in support for backpressure to handle the flow of data.

Alpakka provides us a number of connectors for different technologies through which we can create sources, sinks and flows for them. It basically provides API that integrates a technology with Akka-Streams. This makes it easier to work with Akka-Streams and different technologies together.

There are a lot of connectors provided by Alpakka. Some of them are for following :

  • Apache Cassandra
  • Apache Solr
  • Elasticsearch
  • Apache Kafka
  • Aws DynamoDb
  • Aws S3
  • Azure IoT Hub
  • Aws Lambda
  • Couchbase
  • File

Here I’m not going to cover all of them rather I’ll focus on Alpakka Kafka ( Akka-Streams with Kafka) and Alpakka Elasticsearch. Also, I’ll show you an example in which I’ll be creating a data pipeline that reads data from kafka and sends it to cassandra and elasticsearch as well. I am naming it Alpakka Pipeline.

Alpakka Kafka

The Alpakka Kafka connector was originally known as Reactive Kafka or even Akka Streams Kafka. It let’s you connect Apache Kafka with Akka-Streams.

Let’s see how to create different components and connect them.

First of all, add dependencies in your build.sbt.

libraryDependencies ++= Seq("com.typesafe.akka" %% "akka-stream" % "2.6.5" ,
   "com.typesafe.akka" %% "akka-stream-kafka" % "2.0.2")

Producer

The producer publishes the message to Kafka topics.

We can create a producer by creating a sink or flow that will basically write to Kafka. There are different factory methods for this, provided by Producer API. Some of them are following:

  • plainsink
  • committableSink
  • flexiflow

Let’s look at an example to understand how to use it :

val producerSettings =
    ProducerSettings(system, new StringSerializer, new StringSerializer)
      .withBootstrapServers("localhost:9092")

  val kafkaPlainSink = Producer.plainSink(producerSettings)

It require producer settings.

Consumer

The consumer subscribes to topics and consumes the message from kafka topics subscribed.

Here, we can create a consumer by creating a source that will basically read from Kafka. There are different factory methods for this, provided by Consumer API. Some of them are following:

  • plainSource
  • commitrableSource

Here is an example demonstrating how to use them :

val consumerByteSettings = ConsumerSettings(system, new ByteArrayDeserializer, new StringDeserializer)
    .withBootstrapServers("localhost:9092")
    .withGroupId("akka-stream-kafka-test")
    .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

val kafkaPlainSource: Source[ConsumerRecord[Array[Byte], String], Consumer.Control] =
    Consumer.plainSource(consumerByteSettings, Subscriptions.topics("topic-person"))
  val consumerStringSettings =
    ConsumerSettings(system, new StringDeserializer, new StringDeserializer)
      .withBootstrapServers("localhost:9092")
      .withGroupId("akka-stream-kafka-test")
      .withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")

  val kafkaCommittableSource = Consumer
    .committableSource(consumerStringSettings, Subscriptions.topics("topic-input"))

They require consumer settings to create Sources for Kafka.

Kafka to Kafka

We can transfer data from one kafka topic to another as follows :

 kafkaCommittableSource
    .map(record => new ProducerRecord[String, String]("topic-output", record.record.value()))o
    .runWith(kafkaPlainSink)

Alpakka Elasticsearch

This is an implementation of Akka-Streams with elasticsearch. It basically provides us connectors to connect elasticsearch with Akka-Streams.

For this you first have to add dependency in build.sbt :

libraryDependencies ++= Seq(
  "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "2.0.0"
)

The next important step is setting up a restclient as it is required while creating Elasticsearch sink, source and flows. It is done in following way :

implicit val client: RestClient = RestClient.builder(new HttpHost("localhost", 9200)).build()

Reading from Elasticsearch

Here, we can use ElasticsearchSource for this purpose. Let us look at an example to know how to create elasticsearch source :

 val elasticSource = ElasticsearchSource
    .typed[Person](
      indexName = "sink-person",
      typeName = "person",
      query = """{"match_all": {}}"""
    )

Writing to Elasticsearch

For this purpose either we can use ElasticsearchSink or Elasticsearchflow that creates a sink an flow respectively.

For example :

 val elasticSink = ElasticsearchSink.create[Person](
    indexName = "sinkOutput",
    typeName = "person"
  )

Kafka to Elasticsearch

Let’s look at an another example. In this I’m reading data from Kafka and pushing it to Elasticsearch.

 val intermediateFlow = Flow[ConsumerRecord[Array[Byte], String]].map { kafkaMessage =>

    val person = Json.parse(kafkaMessage.value()).as[Person]
    val id = person.id

    WriteMessage.createIndexMessage(id, person)
  }

  val esSink = ElasticsearchSink.create[Person](
    indexName = "sink-person",
    typeName = "person"
  )


  kafkaPlainSource
    .via(intermediateFlow)
    .runWith(esSink)

In this we have a kafka source that is created using Consumer.plainSource() and a sink for elasticsearch. Also there is an intermediateFlow that basically converts the incoming data to elasticsearch writable format.

Alpakka Pipeline

In this I have created a Kafka source, an elasticsearch sink and a cassandra flow as well.

An alpakka pipeline that reads data from kafka and writes it to elasticsearch as well as cassandra.

You will have to add dependency for alpakka cassandra too.

libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-cassandra" % "2.0.0"

So the kafka source and elasticsearch sink will be created in the same way as explained above. We will create a cassandra flow after that. For that, first you will have to create a CassandraSession. It will be as follows :

implicit val cassandraSession: CassandraSession =
    CassandraSessionRegistry.get(system).sessionFor(sessionSettings)

Also you will have to create a statementBinder that basically creates Boundstatement to bind the input data. It will be as follows :

val statementBinder: (Person, PreparedStatement) => BoundStatement =
    (person, preparedStatement) => preparedStatement.bind(person.id, person.name, person.city)

Now we will create a cassandra flow, as follows :

val written: Flow[Person, Person, NotUsed] = CassandraFlow.create(CassandraWriteSettings.defaults,
    s"INSERT INTO demo.emp(id, name, city) VALUES (?, ?, ?)",
    statementBinder)

After that we will connect these parts together to create the data pipeline.

val flow: Flow[Person, WriteMessage[Person, NotUsed], NotUsed] = Flow[Person].map(person => WriteMessage.createIndexMessage(person.id, person))
  val convert: Flow[ConsumerRecord[Array[Byte], String], Person, NotUsed] = Flow[ConsumerRecord[Array[Byte], String]].map { message =>

    // Parsing the record as Person Object
    Json.parse(message.value()).as[Person]
  }
    kafkaSource
      .via(convert)
      .via(written)
      .via(flow)
      .runWith(esSink)

You can download this Alpakka Pipeline example from here.

And for the other examples go here.

That’s all for Alpakka in this blog. I hope this blog was helpful.

Reference

Knoldus-blog-footer-image

Leave a Reply