Kafka Connector – Bridge between your source and destination data

Reading Time: 6 minutes

In this blog we will talk about Apache Kafka and Kafka Connector. After reading this blog readers will be able to know about what Apache Kafka is and Kafka Connector.

Apache Kafka:

Apache Kafka is a distributed event store and stream-processing platform. This is an open-source system which is developed by the Apache Software Foundation and written in Java and Scala. The project aims to provide a unified, high-throughput, low-latency platform for handling real-time data feeds. Kafka firstly developed at linedkn and then later open sourced in early 2011.

Apache Kafka is based on the commit log, and it allows users to subscribe to it and publish data to any number of systems or real-time applications. Kafka stores key-value messages that come from arbitrarily many processes called producers. The data can be partitioned into different “partitions” within different “topics”. Within a partition, messages are strictly ordered by their offsets (the position of a message within a partition) and indexed and stored together with a timestamp. Other processes called “consumers” can read messages from partitions.

Kafka Connector:

Kafka Connect was added in the Kafka 0.9.0 release. It uses the Producer and Consumer API.

Kafka Connect is a framework for connecting Kafka with external systems such as databases, key-value stores, search indexes, and file systems, using so-called Connectors. Kafka Connectors are ready-to-use components, which can help us to import data from external systems into Kafka topics and export data from Kafka topics into external systems.

A Kafka Connect cluster is a separate cluster from the Kafka cluster. The Kafka Connect cluster supports running and scaling out connectors (components that support reading and/or writing between external systems).

Key Concept:

  • There is an object that defines parameters for one or more tasks which should do the work of importing or exporting data, is what we call a connector.
  • A source connector collects data from a system. Source systems can be entire databases, streams tables, or message brokers. A source connector could also collect metrics from application servers into Kafka topics, making the data available for stream processing with low latency.
  • A sink connector delivers data from Kafka topics into other systems, which might be indexes such as Elasticsearch, batch systems such as Hadoop, or any kind of database.
  • An operating-system process (Java-based) which executes connectors and their associated tasks in child threads, is what we call a Kafka Connect worker.

Some of the examples where we can find connector is: S3, JDBC, Cassandra, Elasticsearch, HDFS etc… If you need to get data into Kafka from a system that is not currently supported, then you will need a custom connector.

Kafka Connector Features:

Kafka Connect provide following features:

  • Kafka connect provides a common framework and standardize the integration of other system with Kafka.
  • It simplifies the development, deployment, and management of connectors.
  • It provides a REST interface – we can manage connectors using a REST API.
  • Distributed and standalone modes – it helps us to deploy large clusters by leveraging the distributed nature of Kafka, as well as setups for development, testing, and small production deployments.
  • Kafka connect is Distributed and scalable by default. It builds upon the existing group management protocol. And to scale up a Kafka Connect cluster we can add more workers.
  • Automatic offset management – However, Kafka Connect can manage the offset commit process automatically even with just a little information from connectors. Hence, connector developers do not need to worry about this error-prone part of connector development.
  • Streaming and batch integration – Kafka Connect is an ideal solution for bridging streaming and batch data systems in connection with Kafka’s existing capabilities

Kafka Connect workers:

Kafka connector can run in stand-alone mode or distributed mode.

Distributed Mode:

Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. In distributed mode, you start many worker processes using the same group.id and they automatically coordinate to schedule execution of connectors and tasks across all available workers. If you add a worker, shut down a worker, or a worker fails unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks across the updated set of available workers. Note the similarity to consumer group rebalance. Under the covers, connect workers are using consumer groups to coordinate and rebalance.

Standalone Mode:

Standalone mode is the simplest mode, where a single process is responsible for executing all connectors and tasks. Since it is a single process, it requires minimal configuration. Standalone mode is convenient for getting started, during development, and in certain situations where only one process makes sense, such as collecting logs from a host. However, because there is only a single process, it also has more limited functionality: scalability is limited to the single process and there is no fault tolerance beyond any monitoring you add to the single process.

Kafka Connect Configuration:

Source Connector Configuration:

For the source connector, the reference configuration is available at $CONFLUENT_HOME/etc/kafka/connect-file-source.properties:

name=local-file-source

connector.class=FileStreamSource

tasks.max=1

topic=connect-test

file=test.txt

This configuration has some properties that are common for all source connectors:

  • name is a user-specified name for the connector instance
  • connector.class specifies the implementing class, basically the kind of connector
  • tasks.max specifies how many instances of our source connector should run in parallel, and
  • topic defines the topic to which the connector should send the output

Sink Connector Configuration:

For our sink connector, we’ll use the reference configuration at $CONFLUENT_HOME/etc/kafka/connect-file-sink.properties:

name=local-file-sink

connector.class=FileStreamSink

tasks.max=1

file=test.sink.txt

topics=connect-test

Logically, it contains exactly the same parameters, though this time connector.class specifies the sink connector implementation, and file is the location where the connector should write the content.

Worker Config

Finally, we have to configure the connect worker, which will integrate our two connectors and do the work of reading from the source connector and writing to the sink connector.

For that, we can use $CONFLUENT_HOME/etc/kafka/connect-standalone.properties:

bootstrap.servers=localhost:9092

key.converter=org.apache.kafka.connect.json.JsonConverter

value.converter=org.apache.kafka.connect.json.JsonConverter

key.converter.schemas.enable=false

value.converter.schemas.enable=false

offset.storage.file.filename=/tmp/connect.offsets

offset.flush.interval.ms=10000

plugin.path=/share/java

Note that plugin.path can hold a list of paths, where connector implementations are available

As we’ll use connectors bundled with Kafka, we can set plugin.path to $CONFLUENT_HOME/share/java. Working with Windows, it might be necessary to provide an absolute path here.

For the other parameters, we can leave the default values:

  • bootstrap.servers contains the addresses of the Kafka brokers
  • key.converter and value.converter define converter classes, which serialize and deserialize the data as it flows from the source into Kafka and then from Kafka to the sink
  • key.converter.schemas.enable and value.converter.schemas.enable are converter-specific settings
  • offset.storage.file.filename is the most important setting when running Connect in standalone mode: it defines where Connect should store its offset data
  • offset.flush.interval.ms defines the interval at which the worker tries to commit offsets for tasks

Kafka Connect REST API:

In Kafka connect each worker instance starts an embedded web server. It exposes REST API for various operations. If we are using distributed mode, then configuration uploaded via this REST API is saved in internal Kafka message broker topics. If we are using standalone mode, then configuration REST API are not relevant.

Configuration uploaded via this REST API is saved in internal Kafka message broker topics, for workers in distributed mode. However, the configuration REST APIs are not relevant, for workers in standalone mode.

The following are the currently supported REST API endpoints:

  • GET /connectors – return a list of active connectors
  • POST /connectors – create a new connector; the request body should be a JSON object containing a string name field and an object config field with the connector configuration parameters
  • GET /connectors/{name} – get information about a specific connector
  • GET /connectors/{name}/config – get the configuration parameters for a specific connector
  • PUT /connectors/{name}/config – update the configuration parameters for a specific connector
  • GET /connectors/{name}/status – get current status of the connector, including if it is running, failed, paused, etc., which worker it is assigned to, error information if it has failed, and the state of all its tasks
  • GET /connectors/{name}/tasks – get a list of tasks currently running for a connector
  • GET /connectors/{name}/tasks/{taskid}/status – get current status of the task, including if it is running, failed, paused, etc., which worker it is assigned to, and error information if it has failed
  • PUT /connectors/{name}/pause – pause the connector and its tasks, which stops message processing until the connector is resumed
  • PUT /connectors/{name}/resume – resume a paused connector (or do nothing if the connector is not paused)
  • POST /connectors/{name}/restart?includeTasks=<true|false>&onlyFailed=<true|false> – restart a connector and its tasks instances.
    • the “includeTasks” parameter specifies whether to restart the connector instance and task instances (“includeTasks=true”) or just the connector instance (“includeTasks=false”), with the default (“false”) preserving the same behavior as earlier versions.
    • the “onlyFailed” parameter specifies whether to restart just the instances with a FAILED status (“onlyFailed=true”) or all instances (“onlyFailed=false”), with the default (“false”) preserving the same behavior as earlier versions.
  • POST /connectors/{name}/tasks/{taskId}/restart – restart an individual task (typically because it has failed)
  • DELETE /connectors/{name} – delete a connector, halting all tasks and deleting its configuration
  • GET /connectors/{name}/topics – get the set of topics that a specific connector is using since the connector was created or since a request to reset its set of active topics was issued
  • PUT /connectors/{name}/topics/reset – send a request to empty the set of active topics of a connector

Reference:

[1] https://en.wikipedia.org/wiki/Apache_Kafka

[2] https://docs.confluent.io/

knoldus

Leave a Reply