Open Source: Kafka Partition Reassignment with Minimal Data Movement

Kanak BiscuitwalaJuly 28, 2016

We’re happy to announce that we’re open sourcing one component of our Kafka operations toolkit: an algorithm that computes a new partition assignment to brokers while obeying the following properties:

  • Data movement is minimized
  • Partitions for each topic are evenly distributed across brokers
  • Multiple replicas for each partition never appear in the same rack (or the same host, if rack awareness is disabled)
  • Leadership is roughly evenly distributed across the cluster

The code and binaries can be found at: https://github.com/SiftScience/kafka-assigner, and it has been released under the Apache License, Version 2.0.

Background

One of the most important properties of many distributed systems is elasticity. They can be resilient to failure, and can be expanded to handle additional load. Apache Kafka certainly checks both boxes. However, the out-of-the-box ability for Kafka to change its broker cluster topology is somewhat limited. When a new node is added, for example, an operator has two choices:

  • Move entire topics to the new node(s)
  • Come up with a new partition assignment manually

The first choice is not ideal because one possible motivation for expanding the cluster is to allow it to share the burden of serving a topic that may require additional computing resources. Moving an entire topic is limited in that sense.

The second choice requires either a lot of manual work (many production Kafka clusters have thousands of partitions to assign), or custom scripting to automate the process. It turns out, though, that automation is not necessarily trivial. In particular, the assignment algorithm needs to take current assignments into account, or else the cluster will take up significant bandwidth moving data that it didn’t need to move. The primary goal when adding a node is to have the new node take on some amount of work, so we should only move enough data to give the new node an equal share of work, and not any more.

The situation is worse when taking decommissioning or replacement of nodes into consideration. Currently, the only officially supported way to do this is to come up with a custom reassignment. Given this pain point, and the lack of flexibility when expanding the broker cluster, we wrote our own algorithm.

Algorithm

The high-level algorithm is as follows (given all topic partitions, replication factor, and set of nodes):

  1. Figure out how many partitions each node can accept so that each node serves roughly the same number of partitions.
  2. Assign as many partitions from the current assignment back to their current owner nodes, taking into account the upper bound computed in the previous step.
  3. Assign all still-unassigned partition replicas evenly to nodes that can accept them.

It’s worth noting that a node can accept a partition if it has capacity, and no other replica of the partition is currently being served on the node’s rack (or by the node itself if rack awareness is disabled).

Overall, the algorithm is similar in spirit to Apache Helix’s AutoRebalanceStrategy, except that it has been simplified to focus on Kafka, and rack-awareness has been added on.

Examples

When getting a reassignment, the tool will print JSON that can be sent to Kafka’s built in assigner tool. See http://kafka.apache.org/0100/ops.html#basic_ops_partitionassignment for instructions on how to use that tool.

Reassign Partitions to All Live Hosts

./kafka-assignment-generator.sh --zk_string my-zk-host:2181 --mode PRINT_REASSIGNMENT

Reassign Partitions to All Lives Hosts, Excluding a Few

This mode is useful for decommissioning or replacing a node. The partitions will be assigned to all live hosts, excluding the hosts that are specified.

./kafka-assignment-generator.sh --zk_string my-zk-host:2181 --mode PRINT_REASSIGNMENT --broker_hosts_to_remove misbehaving-host1,misbehaving-host2

Reassign Partitions to Specific Hosts

This is less common, but it allows the operator to specify the specific hosts that should participate in the broker cluster.

./kafka-assignment-generator.sh --zk_string my-zk-host:2181 --mode PRINT_REASSIGNMENT --broker_hosts host1,host2,host3

Print Current Brokers

./kafka-assignment-generator.sh --zk_string my-zk-host:2181 --mode PRINT_CURRENT_BROKERS

Print Current Assignment

./kafka-assignment-generator.sh --zk_string my-zk-host:2181 --mode PRINT_CURRENT_ASSIGNMENT

Next Steps

This is not the only Kafka-related library that we feel addresses a common problem faced by companies deploying the system. We are also looking into releasing other Kafka-related code that we’ve written, as we mentioned in a previous blog post.