In our previous blog, we had a look at what Akka streams are and how they are different from the other streaming mechanisms we have.
In this blog, we will be taking a little step forward into the world of Akka Streams. In order to work with Akka streams, we need a mechanism to connect Akka Streams to the existing system components. That is where Alpakka comes into the picture.
According to the documentation :
The Alpakka project is an open source initiative to implement stream- aware, reactive, integration pipelines for Java and Scala. It is built on top of Akka Streams and has been designed from ground up to understand streaming natively and provide a DSL for reactive and stream-oriented programming, with built-in support for backpressure.
Alpakka provides a large number of options to choose from when it comes to connectors.
Some of them include
- Apache Cassandra
- Apache Kafka
- AWS S3
- and many more.
To explore other connectors, you can refer to the official documentation here.
It is not possible to go through all of these in a single blog. So today we will be focusing on the Alpakka Elasticsearch and Alpakka Kafka connectors as we had an opportunity to get some hands-on in our current project.
For those who are new to ElasticSearch can go through its documentation here which is quite easy to understand and self-explanatory. Also, for those who wish to explore Kafka a bit, can go through the documentation here
Now that you have an idea of what Akka streams, Kafka and Elasticsearch are, we can now move to Alpakka Elasticsearch and Alpakka Kafka.
A glance at the documentation :
The Alpakka Elasticsearch connector provides Akka Streams integration for Elasticsearch. The Alpakka Kafka connector lets you connect Apache Kafka to Akka Streams. It was formerly known as Akka Streams Kafka and even Reactive Kafka.
Pretty basic, Right?
Today, we will try to integrate the pipeline that would consume the data from Kafka and using Akka streams populate that data into the Elastic Search i.e. some actual use case of these connectors.
To begin with, you will need to import the following dependencies :
For ElasticSearch Alpakka :
libraryDependencies += "com.lightbend.akka" %% "akka-stream-alpakka-elasticsearch" % "1.0-M1"
For Kafka Alpakka :
libraryDependencies += "com.typesafe.akka" %% "akka-stream-kafka" % "0.18"
NOTE: As you can see, for the Alpakka ElasticSearch, that the latest version is 1.0-M1 which was released recently only. The previous version was 0.20. There are a few updates and changes that were brought in this version. Some of these upgrades include:
For other changes refer to the GitHub issue here.
As these changes are now a part of the Alpakka Elastic search, so we will continue with these only.
Now, Let’s come to the actual implementation of our code.
As we all know that Akka Streams work on 3 important things :
– Flow and
In our case, the source will be Kafka and the sink would be the Elastic Search as we are planning to stream the data from Kafka to the Elastic search.
To begin with our code, i.e To create a Kafka source, we would add the following :
- Consumer properties – to identify the brokers and consumer offsets
- Kafka Source – a plainsource in our case which would subscribe to a topic
Please see that here we are assuming that the topic8 has JSON data for the Book objects.
NOTE: In case you need to commit offsets in Kafka, you can use commitableSource instead of a plainsource. This is useful when “at-least-once delivery” is desired.
Now that we have a source we would require a flow that would parse this consumer record. Once we are able to get the desired object, we will need to transform it into a WriteMessage so as to insert it into the elastic search.
Note: Earlier, The Elasticsearch flow and Sink required IncomingMessage which is now replaced by WriteMessage
For parsing the record to a predefined case class, you will need an implicit for the same, something like,
implicit val format: JsonFormat[Book] = jsonFormat2(Book)
Now comes the insertion to the elastic search i.e the sink.
For that, we will need to import the given things :
import org.apache.http.HttpHost import org.elasticsearch.client.RestClient
And create an implicit client :
implicit val client: RestClient = RestClient.builder(new HttpHost("localhost", 9201)).build()
These are required as the Sources, Flows and Sinks provided by Alpakka ElasticSearch connector need a prepared
org.elasticsearch.client.RestClient to access to Elasticsearch.
The ElasticSearch Sink mostly requires an index and a doctype for insertion of data and can be created in the given way.
Now that we have a Source, flow and a sink, we can stream our data easily from Kafka to elastic search by connecting these together something like :
Now all you have to do is run your application and your data will be streaming live from Kafka to the Elasticsearch.
Also, we have added the source code here.
Hope this Helps. Stay tuned for more interesting articles. 🙂