assignReplicasToBrokers(
brokerMetadatas: Seq[BrokerMetadata],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1): Map[Int, Seq[Int]]
AdminUtils Helper Object
AdminUtils is an utility (a Scala object) with methods for…FIXME
| Name | Description |
|---|---|
|
AdminUtils uses __admin_client for…FIXME
assignReplicasToBrokers Method
assignReplicasToBrokers(
brokerMetadatas: Seq[BrokerMetadata],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1): Map[Int, Seq[Int]]
assignReplicasToBrokers branches off per whether all the brokers have rack information or not:
-
assignReplicasToBrokersassignReplicasToBrokersRackUnaware when all the brokers (inbrokerMetadatas) have no rack information available. -
assignReplicasToBrokersassignReplicasToBrokersRackAware when all the brokers (inbrokerMetadatas) have rack information available.
assignReplicasToBrokers throws a InvalidPartitionsException if the given nPartitions is 0 or less.
Number of partitions must be larger than 0.
assignReplicasToBrokers throws a InvalidReplicationFactorException if the given replicationFactor is 0 or less.
Replication factor must be larger than 0.
assignReplicasToBrokers throws a InvalidReplicationFactorException if the given replicationFactor is greater than the number of all brokers in the cluster.
Replication factor: [replicationFactor] larger than available brokers: [brokerMetadatas.size].
assignReplicasToBrokers throws an AdminOperationException if there is at least one broker (in brokerMetadatas) with no rack information (when it is assumed either all brokers have it or none).
Not all brokers have rack information for replica rack aware assignment.
|
Note
|
|
assignReplicasToBrokersRackUnaware Internal Method
assignReplicasToBrokersRackUnaware(
nPartitions: Int,
replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]]
assignReplicasToBrokersRackUnaware performs the replicas to brokers assignment in a fairly random manner (i.e. includes two random numbers). No additional information is used to make the decision (except the input parameters).
import kafka.admin.{AdminUtils, BrokerMetadata}
// assignReplicasToBrokersRackUnaware is a private method
// Using assignReplicasToBrokers instead as the entry point
val brokerMetadatas = Seq(
BrokerMetadata(0, None),
BrokerMetadata(1, None),
BrokerMetadata(2, None))
val assignment = AdminUtils.assignReplicasToBrokers(
brokerMetadatas,
nPartitions = 3,
replicationFactor = 2)
val output = assignment.toSeq.sortBy(_._1).map { case (brokerId, replicas) => s"$brokerId => $replicas" }
scala> output.foreach(println)
0 => ArrayBuffer(2, 1)
1 => ArrayBuffer(0, 2)
2 => ArrayBuffer(1, 0)
assignReplicasToBrokersRackUnaware…FIXME
|
Note
|
assignReplicasToBrokersRackUnaware is used exclusively when AdminUtils is requested to assignReplicasToBrokers (when all the brokers in a cluster have no rack information assigned).
|
assignReplicasToBrokersRackAware Internal Method
assignReplicasToBrokersRackAware(
nPartitions: Int,
replicationFactor: Int,
brokerMetadatas: Seq[BrokerMetadata],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]]
assignReplicasToBrokersRackAware…FIXME
|
Note
|
assignReplicasToBrokersRackAware is used exclusively when AdminUtils is requested to assignReplicasToBrokers (when all the brokers in a cluster have rack information assigned).
|