Using Apache Flink for Kinesis to Kafka Connect

Reading Time: 3 minutes

In this blog, we are going to use kinesis as a source and kafka as a consumer.

Let’s get started.

Step 1:

Apache Flink provides the kinesis and kafka connector dependencies. Let’s add them in our build.sbt:

name := "flink-demo"

version := "0.1"

scalaVersion := "2.12.8"

libraryDependencies ++= Seq(
  "org.apache.flink" %% "flink-scala" % "1.10.0",
  "org.apache.flink" %% "flink-connector-kinesis" % "1.10.0",
  "org.apache.flink" %% "flink-connector-kafka" % "1.10.0",
  "org.apache.flink" %% "flink-streaming-scala" % "1.10.0"
)

Step 2:

The next step is to create a pointer to the environment on which this program runs.

val env = StreamExecutionEnvironment.getExecutionEnvironment

Step 3:

Setting parallelism of x here will cause all operators (such as join, map, reduce) to run with x parallel instance.

I am using 1 as it is a demo application.

env.setParallelism(1)

Step 4:

Disabling the aws cbor, as we are testing locally.

System.setProperty("com.amazonaws.sdk.disableCbor", "true")
System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor", "true")

Step 5:

Defining Kinesis consumer properties.

  • Region
  • Stream Position – TRIM_HORIZON to read all the records available in the stream
  • Aws keys
  • Do not worry about the endpoint, it is set to http://localhost:4568 as we will test the kinesis using localstack.

Do not worry about the endpoint, it is set to http://localhost:4568 as we will test the kinesis using localstack.

val kinesisConsumerConfig = new Properties()
kinesisConsumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name())
kinesisConsumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "dummy")
kinesisConsumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "dummy")
kinesisConsumerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4568")

Step 6:

Define the kafka properties for the bootstrap server. It is set to 172.17.0.3:9092 as we will use the confluence kafka image for testing.

val kafkaProducerProperties = new Properties kafkaProducerProperties.setProperty("bootstrap.servers","172.17.0.3:9092")

Step 7:

Add kinesis as a source here kinesisStream is the name of stream and SimpleStringSchema is our DeserializationSchema.

val kinesis = env.addSource(new FlinkKinesisConsumer("kinesisStream", new SimpleStringSchema, kinesisConsumerConfig))

Step 8:

Define the sink as kafka with topic name kafkaTopic and SimpleStringSchema is our SerializationSchema.

val kinesis = env.addSource(new FlinkKinesisConsumer("kinesisStream", new SimpleStringSchema, kinesisConsumerConfig))

Step 9:

Add source and sink.

kinesis.addSink(kafka)

Step 10:

Streams are lazy. At last, trigger the program execution using execute.

env.execute(jobName = "kinesis-kafka-demo")

Now, let’s test that it is working:

For kinesis, we will be using localstack.

1. git clone https://github.com/localstack/localstack.git

2. cd localstack

3. Change docker-compose file to:

version: '2.1'
services:
  localstack:
    container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}"
    image: localstack/localstack
    ports:
      - "4568:4568"
    environment:
      - SERVICES=kinesis
      - DEBUG=1
      - DOCKER_HOST=unix:///var/run/docker.sock

4. Run aws configure

  • Set aws key and secret key as a dummy.
  • Region as us-east-1
  • Format as JSON.

5. List all the kinesis streams.

aws kinesis list-streams --endpoint-url=http://localhost:4568

6. Let’s create our stream.

aws --endpoint-url=http://localhost:4568 kinesis create-stream --shard-count 1 --stream-name kinesisStream

Check the stream is created using the command in step 5.

7. Add the record to kinesis stream.

aws --endpoint-url=http://localhost:4568 kinesis put-record --stream-name kinesisStream --partition-key 123 --data "Abcd1"

8. Now, we will start the kafka as a sink.

sudo docker run -d --rm --name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
-p 2181:2181 \
confluentinc/cp-zookeeper:5.3.1
zookeeper_host=$(sudo docker inspect zookeeper | jq -r .[].NetworkSettings.Networks.bridge.IPAddress) && echo $zookeeper_host:2181


sudo docker run -d --rm --name=kafka \
  -p 9092:9092 \
  -e KAFKA_ZOOKEEPER_CONNECT=${zookeeper_host}:2181 \
  -e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.3:9092 \
  -e KAFKA_BROKER_ID=0 \
  -e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
  -e CONFLUENT_SUPPORT_CUSTOMER_ID=c0 \
  confluentinc/cp-kafka:5.3.1

9. The following command gives me 172.17.0.3:9092 which is the value of my bootstrap.servers.

kafka_host=$(sudo docker inspect kafka | jq -r .[].NetworkSettings.Networks.bridge.IPAddress) && echo $kafka_host:9092

10. Run docker ps and exec into the kafka container using command:

docker exec -it  bash

11. Run the following command to see the data from kinesis.

kafka-console-consumer --bootstrap-server 172.17.0.3:9092 --topic kafkaTopic --from-beginning

To download the complete code, visit  kinesis-kafka-connector

Cheers!

Conclusion: I hope after reading this blog you will get an understanding of how we use kinesis as a source and kafka as a consumer.

Written by 

Jyoti Sachdeva is a software consultant with more than 6 months of experience. She likes to keep up with the trending technologies. She is familiar with languages such as C,C++,Java,Scala and is currentky working on akka,akka http and scala. Her hobbies include watching tv series and movies, reading novels and dancing.