In earlier blogs, we have gone through the basic terminologies of Kafka, and one step deeper into Zookeeper. Now let’s talk in detail about topic Partitions and replicas.
The topic is a place holder of your data in Kafka. Data on a topic is further divided onto partitions. Each partition is an ordered, immutable sequence of records that is continually appended to a structured commit log.
When publishing a message with a key (keyed message), Kafka deterministically maps the message to a partition based on the hash of the key. This provides a guarantee that messages with the same key are always routed to the same partition. For messages without the key, Kafka will map it in any partition randomly.
Each message within a partition has an identifier called its offset. It is an immutable sequence of messages. Consumers can read messages starting from a specific offset and are allowed to read from any offset point they choose, allowing consumers to join the cluster at any point in time they see fit.
More the Partitions, Higher the throughput
A topic partition is the unit of parallelism in Kafka. On the consumer side, Kafka always gives a single partition’s data to one consumer thread. Thus, the degree of parallelism in the consumer (within a consumer group) is bounded by the number of partitions being consumed. Therefore, in general, the more partitions there are in a Kafka cluster, the higher the throughput one can achieve.
Pitfalls of more Partitions
- Need More File Handlers
Each partition maps to a directory in the file system in the broker. Within that log directory, there will be two files (one for the index and another for the actual data) per log segment. For every log segment, each broker opens a filehandle of both index and data files.
The total number of open files could be very huge if there are large number of partitions. Though this can be dealt by tweaking the configuration.
We can view the number of open files using this command:
lsof | wc -l
This issue can be resolved by using the below command:
ulimit -n <noOfFiles>
- End to End Latency
The time between a message published by the producer to the message read by the consumer is called Lag. Kafka only exposes a message to a consumer after it has been committed, i.e., when the message is replicated to all the in-sync replicas. So, the time to commit a message can be a significant portion of the end-to-end latency. Given that, by default, a Kafka broker only uses a single thread to replicate data from another broker, for all partitions that share replicas between the two brokers.
For example, suppose that there are 1000 partition leaders on a broker and it is a 2 brokers Kafka cluster with replication factor 2. Second broker needs to fetch 500 partitions from the first broker. A single thread is responsible for fetching these partitions from Broker 1 to Broker 2. This will take adequate amount of time. Though, this problem will not cause issues in case of large clusters.
Replication in Kafka
Replication simply means keeping copies of the data over the cluster so as to promote the availability feature in any application. Replication in Kafka is at the Partition level. Each Partition has 0 or more replications over the cluster.
As in the given example, we have Partition 0 in broker 1 and 2, Partition 1 in broker 1 and 4, Partition 2 in broker 3, and 4. Out of these replicas, one partition will act as a leader and others (in this case 1 replica) as followers.
The Leader is responsible for sending as well as receiving data for that partition.
So, when we say a topic has a replication factor of 2 that means we will be having two copies of each of its partitions. Kafka considers that a record is committed when all replicas in the In-Sync Replica set (ISR) have confirmed that they have written the record to disk.
Acknowledgment in Kafka
Producers can choose to receive acknowledgments for the data written to the partition using the “acks” setting.
The ack-value is a producer configuration parameter in Apache Kafka and defines the number of acknowledgments that should be waited for from the in-sync replicas only. It can be set to the following values:
The producer never waits for an ack from the broker when the ack value is set to 0. No guarantee can be made that the broker has received the message. The producer doesn’t try to send the record again since the producer never knows that the record was lost. This setting provides lower latency and higher throughput at the cost of much higher risk of message loss.
When setting the ack value to 1, the producer gets an ack after the leader has received the record. The leader will write the record to its log but will respond without awaiting a full acknowledgment from all followers. The message will be lost only if the leader fails immediately after acknowledging the record, but before the followers have replicated it. This setting is the middle ground for latency, throughput, and durability. It is slower but more durable than acks=0.
ACK= -1 [ALL]
Setting the ack value to all means that the producer gets an ack when all in-sync replicas have received the record. The leader will wait for the full set of in-sync replicas to acknowledge the record. This means that it takes a longer time to send a message with ack value all, but it gives the strongest message durability.
What is ISR?
In-Sync Replicas are the replicated partitions that are in sync with its leader, i.e. those followers that have the same messages (or in sync) as the leader. It’s not mandatory to have ISR equal to the number of replicas.
The definition of “in-sync” depends on the topic configuration, but by default, it means that a replica is or has been fully caught up with the leader in the last 10 seconds. The setting for this time period is: replica.lag.time.max.ms and has a server default which can be overridden on a per topic basis.
Followers replicate data from the leader to themselves by sending Fetch Requests periodically, by default every 500ms.
If a follower fails, then it will cease sending fetch requests and after the default, 10 seconds will be removed from the ISR. Likewise, if a follower slows down, perhaps a network related issue or constrained server resources, then as soon as it has been lagging behind the leader for more than 10 seconds it is removed from the ISR.
min.insync.replicas specifies the minimum number of replicas that must acknowledge a write in order to consider this write as successful and therefore, it has an effect on the producer side which is responsible for the writes. This configuration parameter does not have any direct impact on the consumer side and this is why it does not affect Consumers, even if the number of alive brokers is less than the value of
When a producer sets acks to “all” (or “-1”), min.insync.replicas specify the minimum number of replicas that must acknowledge a write for the write to be considered successful. If this minimum cannot be met, then the producer will raise an exception (either NotEnoughReplicas or NotEnoughReplicasAfterAppend).
When used together, min.insync.replicas and acks allow you to enforce greater durability guarantees. A typical scenario would be to create a topic with a replication factor of 3, set min.insync.replicas to 2, and produce with acks of “all”. This will ensure that the producer raises an exception if a majority of replicas do not receive a write.
What’s the benefit?
If by any chance one of the brokers goes down or due to network issues, is unreachable, one of the replicas will become the leader.
Only the replicas that are part of the in-sync replica list are eligible for becoming the leader and the list of in-sync replicas is persisted to zookeeper whenever any changes are made to it. Also, Kafka’s guarantee of no data loss is applicable only if there exists at least one in-sync replica.
In the case of no such replica, this guarantee is not applicable.
That was all about topic partitions, replication and ISR. I’ll be covering more about Kafka in further blogs. Happy coding! 🙂