Using Apache Flink for Kinesis to Kafka Connect

Table of contents
Reading Time: 3 minutes

In this blog, we are going to use kinesis as a source and kafka as a consumer.

Let’s get started.

Step 1:

Apache Flink provides the kinesis and kafka connector dependencies. Let’s add them in our build.sbt:

name := "flink-demo"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % "1.10.0",
  "org.apache.flink" %% "flink-connector-kinesis" % "1.10.0",
  "org.apache.flink" %% "flink-connector-kafka" % "1.10.0",
  "org.apache.flink" %% "flink-streaming-scala" % "1.10.0"
)

Step 2:

The next step is to create a pointer to the environment on which this program runs.

val env = StreamExecutionEnvironment.getExecutionEnvironment

Step 3:

Setting parallelism of x here will cause all operators (such as join, map, reduce) to run with x parallel instance.

I am using 1 as it is a demo application.

env.setParallelism(1)

Step 4:

Disabling the aws cbor, as we are testing locally.

System.setProperty("com.amazonaws.sdk.disableCbor", "true")
System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor", "true")

Step 5:

Defining Kinesis consumer properties.

  • Region
  • Stream Position – TRIM_HORIZON to read all the records available in the stream
  • Aws keys
  • Do not worry about the endpoint, it is set to http://localhost:4568 as we will test the kinesis using localstack.

Do not worry about the endpoint, it is set to http://localhost:4568 as we will test the kinesis using localstack.

val kinesisConsumerConfig = new Properties()
kinesisConsumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name())
kinesisConsumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "dummy")
kinesisConsumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "dummy")
kinesisConsumerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4568")

Step 6:

Define the kafka properties for the bootstrap server. It is set to 172.17.0.3:9092 as we will use the confluence kafka image for testing.

val kafkaProducerProperties = new Properties kafkaProducerProperties.setProperty("bootstrap.servers","172.17.0.3:9092")

Step 7:

Add kinesis as a source here kinesisStream is the name of stream and SimpleStringSchema is our DeserializationSchema.

val kinesis = env.addSource(new FlinkKinesisConsumer("kinesisStream", new SimpleStringSchema, kinesisConsumerConfig))

Step 8:

Define the sink as kafka with topic name kafkaTopic and SimpleStringSchema is our SerializationSchema.

val kinesis = env.addSource(new FlinkKinesisConsumer("kinesisStream", new SimpleStringSchema, kinesisConsumerConfig))

Step 9:

Add source and sink.

kinesis.addSink(kafka)

Step 10:

Streams are lazy. At last, trigger the program execution using execute.

env.execute(jobName = "kinesis-kafka-demo")

Now, let’s test that it is working:

For kinesis, we will be using localstack.

1. git clone https://github.com/localstack/localstack.git

2. cd localstack

3. Change docker-compose file to:

version: '2.1'
services:
  localstack:
    container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}"
    image: localstack/localstack
    ports:
      - "4568:4568"
    environment:
      - SERVICES=kinesis
      - DEBUG=1
      - DOCKER_HOST=unix:///var/run/docker.sock

4. Run aws configure

  • Set aws key and secret key as a dummy.
  • Region as us-east-1
  • Format as JSON.

5. List all the kinesis streams.

aws kinesis list-streams --endpoint-url=http://localhost:4568

6. Let’s create our stream.

aws --endpoint-url=http://localhost:4568 kinesis create-stream --shard-count 1 --stream-name kinesisStream

Check the stream is created using the command in step 5.

7. Add the record to kinesis stream.

aws --endpoint-url=http://localhost:4568 kinesis put-record --stream-name kinesisStream --partition-key 123 --data "Abcd1"

8. Now, we will start the kafka as a sink.

sudo docker run -d --rm --name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
-p 2181:2181 \
confluentinc/cp-zookeeper:5.3.1
zookeeper_host=$(sudo docker inspect zookeeper | jq -r .[].NetworkSettings.Networks.bridge.IPAddress) && echo $zookeeper_host:2181


sudo docker run -d --rm --name=kafka \
  -p 9092:9092 \
  -e KAFKA_ZOOKEEPER_CONNECT=${zookeeper_host}:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.3:9092 \
  -e KAFKA_BROKER_ID=0 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e CONFLUENT_SUPPORT_CUSTOMER_ID=c0 \
  confluentinc/cp-kafka:5.3.1

9. The following command gives me 172.17.0.3:9092 which is the value of my bootstrap.servers.

kafka_host=$(sudo docker inspect kafka | jq -r .[].NetworkSettings.Networks.bridge.IPAddress) && echo $kafka_host:9092

10. Run docker ps and exec into the kafka container using command:

docker exec -it  bash

11. Run the following command to see the data from kinesis.

kafka-console-consumer --bootstrap-server 172.17.0.3:9092 --topic kafkaTopic --from-beginning

To download the complete code, visit  kinesis-kafka-connector

Cheers!

Conclusion: I hope after reading this blog you will get an understanding of how we use kinesis as a source and kafka as a consumer.

Written by 

Jyoti Sachdeva is a software consultant with more than 6 months of experience. She likes to keep up with the trending technologies. She is familiar with languages such as C,C++,Java,Scala and is currentky working on akka,akka http and scala. Her hobbies include watching tv series and movies, reading novels and dancing.

Discover more from Knoldus Blogs

Subscribe now to keep reading and get access to the full archive.

Continue reading