ReassignPartitionsCommand — Partition Reassignment on Command Line

kafka.admin.ReassignPartitionsCommand is a command-line application that allows for generating, executing and verifying a custom partition (re)assignment configuration (as specified using a reassignment JSON file).

Table 1. ReassignPartitionsCommand’s Actions
Action Description

--execute

Executes the reassignment as specified by the reassignment-json-file option

--generate

Generates a candidate partition reassignment configuration

Note
This only generates a candidate assignment, it does not execute it.

--verify

Verifies if the reassignment completed as specified by the reassignment-json-file option. If there is a throttle engaged for the replicas specified, and the rebalance has completed, the throttle will be removed

ReassignPartitionsCommand can be executed using kafka-reassign-partitions shell script (i.e. bin/kafka-reassign-partitions.sh or bin\windows\kafka-reassign-partitions.bat).

$ ./bin/kafka-reassign-partitions.sh
This command moves topic partitions between replicas.
Table 2. ReassignPartitionsCommand’s Options
Option Description

reassignment-json-file

A JSON file with a custom partition (re)assignment configuration

The format to use is as follows:

{
  "partitions": [
    {
      "topic": "foo",
      "partition": 1,
      "replicas": [
        1,
        2,
        3
      ],
      "log_dirs": [
        "dir1",
        "dir2",
        "dir3"
      ]
    }
  ],
  "version": 1
}

Note that "log_dirs" is optional. When specified, its length must equal the length of the replicas list. The value in this list can be either "any" or the absolute path of the log directory on the broker.

If absolute log directory path is specified, it is currently required that the replica has not already been created on that broker. The replica will then be created in the specified log directory on the broker later.

replica-alter-log-dirs-throttle

The movement of replicas between log directories on the same broker will be throttled to this value (bytes/sec).

Default: -1

Rerunning with this option, whilst a rebalance is in progress, will alter the throttle value. The throttle rate should be at least 1 KB/s.

throttle

The movement of partitions between brokers will be throttled to this value (bytes/sec).

Default: -1

Rerunning with this option, whilst a rebalance is in progress, will alter the throttle value. The throttle rate should be at least 1 KB/s.

timeout

The maximum time (in ms) allowed to wait for partition reassignment execution to be successfully initiated

Default: 10000

$ ./bin/kafka-topics.sh --list --zookeeper :2181
my-topic

$ cat reassign-partitions.json
{
  "partitions": [
    {
      "topic": "my-topic",
      "partition": 1,
      "replicas": [
        1
      ]
    }
  ],
  "version": 1
}

$ ./bin/kafka-reassign-partitions.sh \
  --generate \
  --zookeeper :2181 \
  --topics-to-move-json-file reassign-partitions.json \
  --broker-list 0

$ ./bin/kafka-reassign-partitions.sh \
  --verify \
  --zookeeper :2181 \
  --reassignment-json-file reassign-partitions.json

ReassignPartitionsCommand is created exclusively when ReassignPartitionsCommand is requested to executeAssignment.

Executing Standalone Application — main Method

main(args: Array[String]): Unit

main…​FIXME

executeAssignment Method

executeAssignment(
  zkClient: KafkaZkClient,
  adminClientOpt: Option[JAdminClient],
  opts: ReassignPartitionsCommandOptions): Unit (1)
executeAssignment(
  zkClient: KafkaZkClient,
  adminClientOpt: Option[JAdminClient],
  reassignmentJsonString: String,
  throttle: Throttle,
  timeoutMs: Long = 10000L): Unit
  1. Uses options for reassignmentJsonString, throttle and timeoutMs inputs

executeAssignment…​FIXME

Note
executeAssignment is used exclusively when ReassignPartitionsCommand is executed (with execute action).

reassignPartitions Method

reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean

reassignPartitions…​FIXME

Note
reassignPartitions is used exclusively when ReassignPartitionsCommand executeAssignment.

alterReplicaLogDirsIgnoreReplicaNotAvailable Internal Method

alterReplicaLogDirsIgnoreReplicaNotAvailable(
  replicaAssignment: Map[TopicPartitionReplica, String],
  adminClient: JAdminClient,
  timeoutMs: Long): Set[TopicPartitionReplica]

alterReplicaLogDirsIgnoreReplicaNotAvailable…​FIXME

Note
alterReplicaLogDirsIgnoreReplicaNotAvailable is used exclusively when ReassignPartitionsCommand reassignPartitions

generateAssignment Method

generateAssignment(
  zkClient: KafkaZkClient,
  brokerListToReassign: Seq[Int],
  topicsToMoveJsonString: String,
  disableRackAware: Boolean)
: (Map[TopicPartition, Seq[Int]], Map[TopicPartition, Seq[Int]])

generateAssignment…​FIXME

Note
generateAssignment is used when…​FIXME

verifyAssignment Method

verifyAssignment(
  zkClient: KafkaZkClient,
  adminClientOpt: Option[JAdminClient],
  jsonString: String): Unit

verifyAssignment…​FIXME

Note
verifyAssignment is used when…​FIXME

parseAndValidate Method

parseAndValidate(
  zkClient: KafkaZkClient,
  reassignmentJsonString: String)
: (Seq[(TopicPartition, Seq[Int])], Map[TopicPartitionReplica, String])

parseAndValidate…​FIXME

Note
parseAndValidate is used when…​FIXME

removeThrottle Method

removeThrottle(
  zkClient: KafkaZkClient,
  reassignedPartitionsStatus: Map[TopicPartition, ReassignmentStatus],
  replicasReassignmentStatus: Map[TopicPartitionReplica, ReassignmentStatus],
  adminZkClient: AdminZkClient): Unit

removeThrottle…​FIXME

Note
removeThrottle is used when…​FIXME

existingAssignment Method

existingAssignment(): Map[TopicPartition, Seq[Int]]

existingAssignment takes the topics (from the keys) from the proposedPartitionAssignment and requests the KafkaZkClient to getReplicaAssignmentForTopics.

Note
existingAssignment is used when…​FIXME

Creating ReassignPartitionsCommand Instance

ReassignPartitionsCommand takes the following when created:

results matching ""

    No results matching ""