Deep dive into Kafka Connect

Reading Time: 6 minutes

Hello! In this article we will continue our journey of understanding Kafka Connect. We will try to understand the architecture and internals of it.

We’ve seen that Kafka Connect is a pluggable component that helps to fed data into or from Kafka and hence provides flexible integration pipelines. It is inherently fault tolerant and sacalable. To work with any software component and get the most out of it we should understand the underlying architecture and components around which it is built.

Kafka Connect Concepts

It’s important to understand a few major concepts before discussing the inner workings of Kafka Connect.

Connectors: As the name suggests, they are responsible for the interaction between Kafka Connect and external system being integrated with. It can be defined as the high-level abstraction that coordinates data streaming by managing tasks. A connector instance is a logical job that is responsible for managing the copying of data between Kafka and another system. All of the classes that implement or are used by a connector are defined in a connector plugin.

The role of connectors is limited it acts as the interface with Kafka Connect by passing or receiving the data to the next component in Kafka Connect. They have different configuration properties specific to the technology with which they’re integrating with. Below is sink connector configuration to connect with Elastic Search

CREATE SINK CONNECTOR sink-elastic-01 WITH (
  'connector.class' = 'io.confluent.connect.elasticsearch.ElasticsearchSinkConnector',
  'topics'          = 'orders',
  'connection.url'  = 'http://elasticsearch:9200',
  'type.name'       = '_doc',
  'key.ignore'      = 'false',
  'schema.ignore'   = 'true'
);

This is all we need to do to connect with the Kafka Connect and rest will be handled by the Connectors.

Tasks: In Connect’s data model, tasks are the main component. Each connector instance is in charge of coordinating a series of tasks that replicate the data. Kafka Connect provides built-in support for parallelism and scalable data copying with very little configuration by allowing the connector to partition a single job into several tasks. There is no state stored in these tasks. The associated connector manages the task state, which is stored in Kafka in special topics config.storage.topic and status.storage.topic. As such, tasks can be started, paused, or restarted at any moment, resulting in a scalable and resilient data pipeline.

Image courtesy: Confluent

Workers: Connectors and tasks are logical work units that must be scheduled to execute a process. These processes are referred to as workers in Kafka Connect, and there are two sorts of workers: standalone and distributed.

Standalone mode is the simplest mode, where a single process is responsible for executing all connectors and tasks. Since it is a single process, it requires minimal configuration. It is convenient for getting started, during development, and in certain situations where only one process makes sense, such as collecting logs from a host. However, because there is only a single process, it also has more limited functionality: scalability is limited to the single process and there is no fault tolerance beyond any monitoring you add to the single process.

Distributed mode provides scalability and automatic fault tolerance for Kafka Connect. In distributed mode, you start many worker processes using the same group.id and they automatically coordinate to schedule execution of connectors and tasks across all available workers. If you add a worker, shut down a worker, or a worker fails unexpectedly, the rest of the workers detect this and automatically coordinate to redistribute connectors and tasks across the updated set of available workers. the similarity to consumer group rebalance. Under the covers, connect workers are using consumer groups to coordinate and rebalance.

Image courtesy: Oreilly

Convertors: Connectors don’t actually write or read data from the Kafka topics. This responsibility is bore by Convertors. They are responsible for the serialization or deserialization of data flowing between Kafka Connect and Kafka itself. Converters are disconnected from connectors so that they can be reused naturally between connectors. The JDBC Source Connector, for example, can publish Avro data to Kafka and the HDFS Sink Connector can retrieve Avro data from Kafka using the same Avro converter. This means that even if the JDBC source delivers a ResultSet that is finally saved to HDFS as a parquet file, the same converter can be utilised.

Commonly convertors use Avro, ProtoBuf or JSON schema as Serde methods but Kafka really doesn’t care about how the serialization/deserialization is done because for Kafka it is bytes in the end! But, it is nice to have a serialization method using a schema in place to make sure we have structured data in our system. Converters are necessary to have a Kafka Connect deployment support a particular data format when writing to or reading from Kafka. Tasks use converters to change the format of data from bytes to a Connect internal data format and vice versa.

Single Message Transforms: The third but optional component in Kafka Connect is Single Message Transforms. Transformations can be specified in connectors to make easy and lightweight changes to individual messages. Multiple transformations can be linked together in the connection configuration, which is useful for small data modifications and event routing.

A transform is a basic function that takes one record as input and returns a changed version. Kafka Connect’s transforms all make simple but frequently needed alterations. It’s worth noting that you may utilize the Transformation interface to create your own custom logic, package it as a Kafka Connect plugin, and use it with any connection.

When Kafka Connect uses transforms with a source connector, each source record created by the connector is passed through the first transformation, which modifies it and outputs a new source record. The next transform in the chain takes this updated source record and creates a new modified source record. This process is repeated for the remaining transformations. The binary version of the final revised source record is converted and written to Kafka.

Sink connectors can also be utilised with transforms. Kafka Connect receives Kafka messages and translates their binary representations to sink records. If a transform is present, Kafka Connect sends the record to the first transformation, which modifies it and returns a new, updated sink record. The new sink record is created by passing the updated sink record through the next transform in the chain. This process is repeated for the remaining transforms, after which the final modified sink record is sent to the sink connector to be processed.

Dead Letter Queue: Exceptions and errors are part of any piece of software processing data. It may happen that we receive an invalid record from the external system. DLQ are introduced to handle such situations gracefully. They are only applicable for sink connectors because in the case of source connectors the invalid message is never allowed to enter the Kafka system because the connector fails immediately then.

An error-handling feature is available that will route all invalid records to a special topic and report the error. This topic contains a dead letter queue of records that could not be processed by the sink connector. the error is handled based on the connector configuration property errors.tolerance.

errors.tolerance = all
errors.deadletterqueue.topic.name = <dead-letter-topic-name>

When errors.tolerance is set to none an error or invalid record causes the connector task to immediately fail and the connector goes into a failed state. And, when errors.tolerance is set to all, all errors or invalid records are ignored and processing continues. No errors are written to the Connect Worker log. 

When we use Confluent Platform with security enabled, the Confluent Platform Admin Client creates the dead letter queue topic. Invalid records are first passed to an internal Producer constructed to send these records. Then, the Admin Client creates the dead letter queue topic. For the dead letter queue to work in a secure Confluent Platform environment, additional Admin Client configuration properties (prefixed with .admin) must be added to the Connect Worker configuration.

admin.ssl.endpoint.identification.algorithm=https
admin.sasl.mechanism=PLAIN
admin.security.protocol=SASL_SSL
admin.request.timeout.ms=20000
admin.retry.backoff.ms=500
admin.sasl.jaas.config=org.apache.kafka.common.security.plain.PlainLoginModule required \
  username="<user>" \
  password="<secret>";

Conclusion

Kafka Connect provides out of the features like fault tolerance and scalability. We’ve seen how multiple components work together to provide a robust system to help integrate an external systems with Kafka.

References

https://docs.confluent.io/platform/current/connect/concepts.html

Leave a Reply