How to create Kafka Producer in Apache Beam

Reading Time: 2 minutes

In this blog we will understand what is Kafka and Kafka Producer including code of Kafka Producer using Apache Beam.

What is Kafka?

Kafka is a distributed data streaming platform, So it is used commonly for high-performance data pipelines, streaming analytics, data integration, and mission-critical applications.

  • Publish (write) and Subscribe to (read) streams of events, called records.
  • Store streams of records durably and reliably inside topics.
  • Process streams of records as they occur or retrospectively.

In addition this functionality is provided in a distributed, highly scale-able, fault-tolerant, and secure manner.

What is Kafka Producer?

Kafka producers are the publishers responsible for sending stream of data to one or more topics. In short, a producer writes messages to Kafka topics.

Below are the basic commands for Kafka

  • To create topic:
    • bash kafka-topics.sh –create –zookeeper localhost:2181 –replication-factor 1 –partitions 1 –topic test
  • To list all topics:
    • bash kafka-topics –list –zookeeper localhost:2181
  • Command to start a consumer:
    • bash kafka-console-consumer –topic test –from-beginning –bootstrap-server localhost:9092

A typical Beam driver program works as follows:

  • First of all, create a pipeline object and set the pipeline execution options, including the Pipeline Runner.
  • Then, create an initial PCollection for pipeline data, either using the IO’s to read data from an external storage and other source.
  • Apply PTransforms to each PCollection.
    • PTransforms can change, filter, group, analyze or process the elements in a PCollection.
    • A transform creates a new output PCollection but without modifying the input collection.
  • Use IO’s to write the final, transformed PCollection(s) to an external source.
  • At last, Run the pipeline using the designated Pipeline Runner.

Let’s get into code, hereby assuming that Kafka setup is done and Kafka server is running on the machine.

Kafka Producer code using Apache Beam

import org.apache.beam.sdk.Pipeline;
import org.apache.beam.sdk.coders.StringUtf8Coder;
import org.apache.beam.sdk.io.kafka.KafkaIO;
import org.apache.beam.sdk.options.PipelineOptions;
import org.apache.beam.sdk.options.PipelineOptionsFactory;
import org.apache.beam.sdk.transforms.Create;
import org.apache.beam.sdk.values.PCollection;
import org.apache.kafka.common.serialization.StringSerializer;

public class KafkaProducer {

    public static void main(String[] args) {

        // Creating pipeline but without using options
        Pipeline pipeline = Pipeline.create();

        // Creating PCollection from the String which we want to pass as event in Kafka
        PCollection pCollection = pipeline.apply(Create.of("Hello World"));

        // Creating Kafka Producer using the KafkaIO and set the required configurations.
        pCollection
                .apply(KafkaIO.<Void, String>write()
                                .withBootstrapServers("localhost:9092")
                                .withTopic("test")
                                .withValueSerializer( StringSerializer.class).values());

        //Here we are starting the pipeline
        pipeline.run();

    }
}

Code Explanation

  • In this code, firstly we are creating the pipeline using create() method, without any options.
  • Secondly, we are creating PCollection from a string which we want to publish to a Kafka topic, the input can be taken by files and other sources.
  • After that, we are creating Kafka Producer using the KafkaIO inside apply() method and setting up the required configuration.
  • At last, we are starting the pipeline using run() method.

Below are the maven dependencies to be added on pom.xml

<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-core</artifactId>
    <version>2.10.0</version>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-sdks-java-io-kafka</artifactId>
    <version>2.10.0</version>
</dependency>
<dependency>
    <groupId>org.apache.beam</groupId>
    <artifactId>beam-runners-direct-java</artifactId>
    <version>2.10.0</version>
</dependency>

As a result in Kafka Consumer

Reference

Apache Beam: https://beam.apache.org/documentation/programming-guide/