In this blog, we are going to use kinesis as a source and kafka as a consumer.
Let’s get started.
Step 1:
Apache Flink provides the kinesis and kafka connector dependencies. Let’s add them in our build.sbt:
name := "flink-demo"
version := "0.1"
scalaVersion := "2.12.8"
libraryDependencies ++= Seq(
"org.apache.flink" %% "flink-scala" % "1.10.0",
"org.apache.flink" %% "flink-connector-kinesis" % "1.10.0",
"org.apache.flink" %% "flink-connector-kafka" % "1.10.0",
"org.apache.flink" %% "flink-streaming-scala" % "1.10.0"
)
Step 2:
The next step is to create a pointer to the environment on which this program runs.
val env = StreamExecutionEnvironment.getExecutionEnvironment
Step 3:
Setting parallelism of x here will cause all operators (such as join, map, reduce) to run with x parallel instance.
I am using 1 as it is a demo application.
env.setParallelism(1)
Step 4:
Disabling the aws cbor, as we are testing locally.
System.setProperty("com.amazonaws.sdk.disableCbor", "true")
System.setProperty("org.apache.flink.kinesis.shaded.com.amazonaws.sdk.disableCbor", "true")
Step 5:
Defining Kinesis consumer properties.
- Region
- Stream Position – TRIM_HORIZON to read all the records available in the stream
- Aws keys
- Do not worry about the endpoint, it is set to http://localhost:4568 as we will test the kinesis using localstack.
Do not worry about the endpoint, it is set to http://localhost:4568 as we will test the kinesis using localstack.
val kinesisConsumerConfig = new Properties()
kinesisConsumerConfig.put(AWSConfigConstants.AWS_REGION, "us-east-1")
kinesisConsumerConfig.put(ConsumerConfigConstants.STREAM_INITIAL_POSITION, ConsumerConfigConstants.InitialPosition.TRIM_HORIZON.name())
kinesisConsumerConfig.put(AWSConfigConstants.AWS_ACCESS_KEY_ID, "dummy")
kinesisConsumerConfig.put(AWSConfigConstants.AWS_SECRET_ACCESS_KEY, "dummy")
kinesisConsumerConfig.put(AWSConfigConstants.AWS_ENDPOINT, "http://localhost:4568")
Step 6:
Define the kafka properties for the bootstrap server. It is set to 172.17.0.3:9092 as we will use the confluence kafka image for testing.
val kafkaProducerProperties = new Properties kafkaProducerProperties.setProperty("bootstrap.servers","172.17.0.3:9092")
Step 7:
Add kinesis as a source here kinesisStream is the name of stream and SimpleStringSchema is our DeserializationSchema.
val kinesis = env.addSource(new FlinkKinesisConsumer("kinesisStream", new SimpleStringSchema, kinesisConsumerConfig))
Step 8:
Define the sink as kafka with topic name kafkaTopic and SimpleStringSchema is our SerializationSchema.
val kinesis = env.addSource(new FlinkKinesisConsumer("kinesisStream", new SimpleStringSchema, kinesisConsumerConfig))
Step 9:
Add source and sink.
kinesis.addSink(kafka)
Step 10:
Streams are lazy. At last, trigger the program execution using execute.
env.execute(jobName = "kinesis-kafka-demo")
Now, let’s test that it is working:
For kinesis, we will be using localstack.
1. git clone https://github.com/localstack/localstack.git
2. cd localstack
3. Change docker-compose file to:
version: '2.1'
services:
localstack:
container_name: "${LOCALSTACK_DOCKER_NAME-localstack_main}"
image: localstack/localstack
ports:
- "4568:4568"
environment:
- SERVICES=kinesis
- DEBUG=1
- DOCKER_HOST=unix:///var/run/docker.sock
4. Run aws configure
- Set aws key and secret key as a dummy.
- Region as us-east-1
- Format as JSON.
5. List all the kinesis streams.
aws kinesis list-streams --endpoint-url=http://localhost:4568
6. Let’s create our stream.
aws --endpoint-url=http://localhost:4568 kinesis create-stream --shard-count 1 --stream-name kinesisStream
Check the stream is created using the command in step 5.
7. Add the record to kinesis stream.
aws --endpoint-url=http://localhost:4568 kinesis put-record --stream-name kinesisStream --partition-key 123 --data "Abcd1"
8. Now, we will start the kafka as a sink.
sudo docker run -d --rm --name=zookeeper \
-e ZOOKEEPER_CLIENT_PORT=2181 \
-p 2181:2181 \
confluentinc/cp-zookeeper:5.3.1
zookeeper_host=$(sudo docker inspect zookeeper | jq -r .[].NetworkSettings.Networks.bridge.IPAddress) && echo $zookeeper_host:2181
sudo docker run -d --rm --name=kafka \
-p 9092:9092 \
-e KAFKA_ZOOKEEPER_CONNECT=${zookeeper_host}:2181 \
-e KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://172.17.0.3:9092 \
-e KAFKA_BROKER_ID=0 \
-e KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1 \
-e CONFLUENT_SUPPORT_CUSTOMER_ID=c0 \
confluentinc/cp-kafka:5.3.1
9. The following command gives me 172.17.0.3:9092 which is the value of my bootstrap.servers.
kafka_host=$(sudo docker inspect kafka | jq -r .[].NetworkSettings.Networks.bridge.IPAddress) && echo $kafka_host:9092
10. Run docker ps and exec into the kafka container using command:
docker exec -it bash
11. Run the following command to see the data from kinesis.
kafka-console-consumer --bootstrap-server 172.17.0.3:9092 --topic kafkaTopic --from-beginning
To download the complete code, visit kinesis-kafka-connector
Cheers!
Conclusion: I hope after reading this blog you will get an understanding of how we use kinesis as a source and kafka as a consumer.
