Rebalancing: What the fuss is all about?

Reading Time: 4 minutes

Apache Kafka is ruling in the world of Big Data. It is just not a messaging queue but a full-fledged event streaming platform. We have looked through the basic idea of Kafka and what makes it faster than any other messaging queue. You can read about it from my previous blog. Also, we looked through Partitions, Replicas, and ISR. We are now ready for our next learning- Rebalancing. Often heard, but never bothered to look at what is going on under the hood. Let’s find out!

Rebalancing means to balance out. Working on a real-time cluster comes with a bunch of problems. Nothing different with Apache Kafka. There will be scenarios when your application shuts down, or one of the nodes is unreachable, consumers are being added or removed from the consumer group, etc. The aim is to rebalance the uneven load in the cluster.

Consumer Rebalancing

It is a process where a group of consumer instances (belonging to the same group) co-ordinate to own a mutually exclusive set of partitions of topics that the group is subscribed to.

AIM: At the end of a successful rebalance operation for a consumer group, every partition for all subscribed topics will be owned by a single consumer instance within the group.

Rebalance will happen when:

  • a consumer JOINS the group- When a new consumer joins a consumer group the set of consumers attempt to “rebalance” the load to assign partitions to each consumer. If the set of consumers changes while this assignment is taking place the rebalance will fail and retry.
    To control the maximum number of attempts before giving up, we can use the command:
                                              rebalance.max.retries
    This is set to 4 by default. 
  • a consumer SHUTS DOWN cleanly 
  • a consumer is considered DEAD by the group coordinator- This may happen after a crash or when the consumer is busy with long-running processing, which means that no heartbeats have been sent in the meanwhile by the consumer to the group coordinator within the configured session interval.
  • If you subscribe to a topic that is not created yet, a rebalance will be triggered after the topic is created. Same, if a topic you subscribed to gets deleted.
     
  • Consumer Group subscribes to any topics 
  • ZooKeeper session timeout- If the consumer fails to heartbeat to ZooKeeper for this period of time it is considered dead and a rebalance will occur. 
  • rebalance.backoff.ms * rebalance.max.retries- This is the largest window allowed for the rebalancing phase, where clients are not reading anything from Kafka.

Rebalancing process

To thoroughly understand the Rebalancing process, we need to know two vital characters here: Group Coordinator and Group Leader.

Group Coordinator is nothing but one of the brokers which receives heartbeats (or polling for messages) from all consumers of a consumer group. Every consumer group has a group coordinator. If a consumer stops sending heartbeats, the coordinator will consider it DEAD and trigger a rebalance.

Group Leader is one of the consumers of Consumer Group which is chosen by the Group coordinator and will responsible for making partition assignment decision on behalf of all consumers in a group.

When a consumer wants to join a consumer group, it sends a JoinGroup request to the group coordinator. The first consumer to join the group becomes the group leader. The leader receives a list of all consumers in the group from the group coordinator (this will include all consumers that sent a heartbeat recently and are therefore considered alive) and it is responsible for assigning a subset of partitions to each consumer. It uses an implementation of the PartitionAssignor interface to decide which partitions should be handled by which consumer. After deciding on the partition assignment, the consumer leader sends the list of assignments to the GroupCoordinator which sends this information to all the consumers. Each consumer only sees his own assignment – the leader is the only client process that has the full list of consumers in the group and their assignments. This process repeats every time a rebalance happens.

In the rebalancing process, the partition assignment algorithm is executed and decides what partitions should be claimed and claims the partition ownership in Zookeeper. If the claim was successful, the consumer starts fetching his new partitions.

Leader rebalancing

kafka_partition

Consider a cluster of 4 nodes with replication factor 2 and 4 partitions. 
As we can see in the above image, for Partition 0, the leader is on broker 1 and its follower replica is on broker 2, for Partition 1, the leader is on broker 2 and its follower replica is on broker 4 and so on.
Now, let’s imagine a case where Broker 2 is unreachable or is shut down due to network issues, then broker 4 will become the leader of the Partition 1. 

At this point in time, we have established that if the leader replica is not there on the preferred broker. It leads to an uneven distribution of leaders.

Only the replicas that are part of the in-sync replica list are eligible for becoming the leader and the list of in-sync replicas is persisted to zookeeper whenever any changes are made to it. Also, Kafka’s guarantee of no data loss is applicable only if there exists an in-sync replica. 

In the case of no such replica, this guarantee is not applicable.

But who would be preferred over whom? There comes the concept of Preferred Leader!

Preferred Leader Election

There are two ways to elect a leader from the available replicas in case of any broker failure or network issue.

  1. Kafka-preferred-replica-election.shWhen running kafka-preferred-replica-election.sh, it forces the election of the preferred replica for all partitions. 
  2. auto.leader.rebalance.enable
    When you set auto.leader.rebalance.enable to true, the Controller will  regularly check the imbalance (every leader.imbalance.check.interval.seconds). However, to avoid unnecessary load on the cluster, leaders are only automatically rebalanced if the imbalance ratio is above leader.imbalance.per.broker.percentage which defaults to 10%.


The second approach is not recommended due to the unnecessary load on the cluster. Instead, we can do a rolling restart, or if there are a few brokers holding the leaders of other brokers just restart them and check if leadership is balanced.

 Hope it helped! 🙂

Reference:
1. https://www.linkedin.com/pulse/partitions-rebalance-kafka-raghunandan-gupta/

Knoldus-blog-footer-image

Written by 

Ramandeep Kaur is a Software Consultant, having experience of more than 1.5 years. She is a Java enthusiast and has knowledge of languages like C, C++, C#, and Scala. She is familiar with Object Oriented Programming Paradigms and also has a great interest in relational database technologies. Her hobbies include reading novels and listening music.