$ ./bin/kafka-reassign-partitions.sh
This command moves topic partitions between replicas.
ReassignPartitionsCommand Command-Line Tool — Partition Reassignment on Command Line
kafka.admin.ReassignPartitionsCommand
is a command-line tool that allows for generating, executing and verifying a custom partition (re)assignment configuration (as specified using a reassignment JSON file).
Action | Description |
---|---|
Executes the reassignment as specified by the reassignment-json-file option |
|
Verifies if the reassignment completed as specified by the reassignment-json-file option. If there is a throttle engaged for the replicas specified, and the rebalance has completed, the throttle will be removed |
ReassignPartitionsCommand
can be executed using kafka-reassign-partitions
shell script (i.e. bin/kafka-reassign-partitions.sh
or bin\windows\kafka-reassign-partitions.bat
).
Option | Description |
---|---|
|
The format to use is as follows:
Note that "log_dirs" is optional. When specified, its length must equal the length of the replicas list. The value in this list can be either If absolute log directory path is specified, it is currently required that the replica has not already been created on that broker. The replica will then be created in the specified log directory on the broker later. |
|
|
|
|
|
$ ./bin/kafka-topics.sh --list --zookeeper :2181
my-topic
$ cat reassign-partitions.json
{
"partitions": [
{
"topic": "my-topic",
"partition": 1,
"replicas": [
1
]
}
],
"version": 1
}
$ ./bin/kafka-reassign-partitions.sh \
--generate \
--zookeeper :2181 \
--topics-to-move-json-file reassign-partitions.json \
--broker-list 0
$ ./bin/kafka-reassign-partitions.sh \
--verify \
--zookeeper :2181 \
--reassignment-json-file reassign-partitions.json
ReassignPartitionsCommand
is created exclusively when ReassignPartitionsCommand
is requested to executeAssignment.
executeAssignment
Method
executeAssignment(
zkClient: KafkaZkClient,
adminClientOpt: Option[JAdminClient],
opts: ReassignPartitionsCommandOptions): Unit (1)
executeAssignment(
zkClient: KafkaZkClient,
adminClientOpt: Option[JAdminClient],
reassignmentJsonString: String,
throttle: Throttle,
timeoutMs: Long = 10000L): Unit
-
Uses options for
reassignmentJsonString
,throttle
andtimeoutMs
inputs
executeAssignment
…FIXME
reassignPartitions
Method
reassignPartitions(throttle: Throttle = NoThrottle, timeoutMs: Long = 10000L): Boolean
reassignPartitions
…FIXME
Note
|
reassignPartitions is used exclusively when ReassignPartitionsCommand executeAssignment.
|
alterReplicaLogDirsIgnoreReplicaNotAvailable
Internal Method
alterReplicaLogDirsIgnoreReplicaNotAvailable(
replicaAssignment: Map[TopicPartitionReplica, String],
adminClient: JAdminClient,
timeoutMs: Long): Set[TopicPartitionReplica]
alterReplicaLogDirsIgnoreReplicaNotAvailable
…FIXME
Note
|
alterReplicaLogDirsIgnoreReplicaNotAvailable is used exclusively when ReassignPartitionsCommand reassignPartitions
|
generateAssignment
Method
generateAssignment(
zkClient: KafkaZkClient,
brokerListToReassign: Seq[Int],
topicsToMoveJsonString: String,
disableRackAware: Boolean)
: (Map[TopicPartition, Seq[Int]], Map[TopicPartition, Seq[Int]])
generateAssignment
…FIXME
Note
|
generateAssignment is used when…FIXME
|
verifyAssignment
Method
verifyAssignment(
zkClient: KafkaZkClient,
adminClientOpt: Option[JAdminClient],
jsonString: String): Unit
verifyAssignment
…FIXME
Note
|
verifyAssignment is used when…FIXME
|
parseAndValidate
Method
parseAndValidate(
zkClient: KafkaZkClient,
reassignmentJsonString: String)
: (Seq[(TopicPartition, Seq[Int])], Map[TopicPartitionReplica, String])
parseAndValidate
…FIXME
Note
|
parseAndValidate is used when…FIXME
|
removeThrottle
Method
removeThrottle(
zkClient: KafkaZkClient,
reassignedPartitionsStatus: Map[TopicPartition, ReassignmentStatus],
replicasReassignmentStatus: Map[TopicPartitionReplica, ReassignmentStatus],
adminZkClient: AdminZkClient): Unit
removeThrottle
…FIXME
Note
|
removeThrottle is used when…FIXME
|
maybeLimit
Method
maybeLimit(throttle: Throttle): Unit
maybeLimit
…FIXME
Note
|
maybeLimit is used when…FIXME
|
assignThrottledReplicas
Method
assignThrottledReplicas(
existingPartitionAssignment: Map[TopicPartition, Seq[Int]],
proposedPartitionAssignment: Map[TopicPartition, Seq[Int]],
adminZkClient: AdminZkClient): Unit
assignThrottledReplicas
…FIXME
Note
|
assignThrottledReplicas is used when…FIXME
|
existingAssignment
Method
existingAssignment(): Map[TopicPartition, Seq[Int]]
existingAssignment
takes the topics (from the keys) from the proposedPartitionAssignment and requests the KafkaZkClient to getReplicaAssignmentForTopics.
Note
|
existingAssignment is used when…FIXME
|
Creating ReassignPartitionsCommand Instance
ReassignPartitionsCommand
takes the following when created:
-
Optional AdminClient
-
Proposed partition assignment (
Map[TopicPartition, Seq[Int]]
) -
Proposed replica assignment (
Map[TopicPartitionReplica, String]
)