Creation Of Custom Kafka Connect/Connectors

Reading Time: 3 minutes

Kafka

Apache Kafka is a distributed event streaming platform that is open-source. Multiple organizations use it for increasing performance for integration of data, data pipelines, and streamlining of analytics for the applications which are mission-critical. The Kafka system is designed to be a fault-tolerant processing system.

Kafka Connect/Connectors

The Kafka Connect is defined as a framework that is designed for making the connection between the Kafka with the external systems. Under the term external systems databases, key-value stores, searching of indexes and file systems. The connection between Kafka and external systems is made by the help of Kafka connectors.

Kafka connectors are components that are designed as ready-to-use components. These connectors help in establishing the connection between the external systems and Kafka to help users in importing data from external systems to Kafka topics and exporting data from Kafka topics into the external systems.

Steps to Create Custom Kafka Connector

There are four essential steps that are required for the creation of a custom Kafka connector. The user with the help of Kafka Connect API can help in plugging the power enabled to the Kafka Connect Framework. This can be done by the implementation of several interfaces and abstract classes provided by it. For the creation of any basic source connector there is requirements of extension to the following three classes :

  • SourceConnector
  • SourceTask
  • AbstractConfig

For the creation of any new Kafka Connector below four steps need to follow by the user :

  • Defining Configuration Properties
  • Passing of configuration Properties to tasks
  • Polling of Tasks
  • Creation of a monitoring thread

Task 1: Defining Configuration Properties

When the connectors are initialized, connectors pick up configuration properties. The configuration properties enable the communication between the connector and the tasks with an external source or sink. They are also responsible for tasks such as :

  • Maximizing the number of parallel tasks
  • Specification of Kadka topic stream data to or from
  • It can also be designed for custom information which is essential for the connector for parsing its job.

The values which are passed in the configuration as the first value are String instances. Below is an example :

public abstract class Connector implements Versioned {
       public abstract void first(Map<String, String> pro);
}

Task 2: Passing of configuration Properties to tasks

The second task is the implementation of the Connector taskConfigs method. The method returns a list of maps. The list of maps includes configuration properties of each task that will be using the stream data into or going out of Kafka. Below is an example of how the code implementation would be done:

public abstract class Connector implements Versioned {
       public abstract List<Map<String, String>> taskConfigs(int mTask);
}

Task 3: Polling of Tasks

With the Connector class, the task consists of abstract methods for the start, stop, and version. The user needs to create a poll method for writing the logic for streamlining the data into Kafka. The method is called each time by the Kafka connect framework for every task. Below is an example of such a method:

public abstract List poll() throws InterruptedException;

Task 4: Creation of a monitoring thread

For a user to have the connector dynamic, the user needs to create a separate thread for monitoring the changes and a new instance of the monitoring thread while the connector startup. Below is the code representation for this :

public class UserSourceConnector extends SourceConnector {
       private MonitoringThread monitoringThread;
       @Override
       public void start(Map<String, String> prop) {
              monitoringThread = new monitoringThread(context);
       }
}

Conclusion

Kafka connect/connectors help in the simplification of the development, deployment, and management of connectors. It also helps in the deployment in two modes such as distributed and standalone methods. Since it is a distributed system it is also scalable by default hence number of workers can be scaled up and down as per the requirements.

Leave a Reply