assignReplicasToBrokers(
brokerMetadatas: Seq[BrokerMetadata],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1): Map[Int, Seq[Int]]
AdminUtils Helper Object
AdminUtils
is an utility (a Scala object) with methods for…FIXME
Name | Description |
---|---|
|
AdminUtils
uses __admin_client for…FIXME
assignReplicasToBrokers
Method
assignReplicasToBrokers(
brokerMetadatas: Seq[BrokerMetadata],
nPartitions: Int,
replicationFactor: Int,
fixedStartIndex: Int = -1,
startPartitionId: Int = -1): Map[Int, Seq[Int]]
assignReplicasToBrokers
branches off per whether all the brokers have rack information or not:
-
assignReplicasToBrokers
assignReplicasToBrokersRackUnaware when all the brokers (inbrokerMetadatas
) have no rack information available. -
assignReplicasToBrokers
assignReplicasToBrokersRackAware when all the brokers (inbrokerMetadatas
) have rack information available.
assignReplicasToBrokers
throws a InvalidPartitionsException
if the given nPartitions
is 0
or less.
Number of partitions must be larger than 0.
assignReplicasToBrokers
throws a InvalidReplicationFactorException
if the given replicationFactor
is 0
or less.
Replication factor must be larger than 0.
assignReplicasToBrokers
throws a InvalidReplicationFactorException
if the given replicationFactor
is greater than the number of all brokers in the cluster.
Replication factor: [replicationFactor] larger than available brokers: [brokerMetadatas.size].
assignReplicasToBrokers
throws an AdminOperationException
if there is at least one broker (in brokerMetadatas
) with no rack information (when it is assumed either all brokers have it or none).
Not all brokers have rack information for replica rack aware assignment.
Note
|
|
assignReplicasToBrokersRackUnaware
Internal Method
assignReplicasToBrokersRackUnaware(
nPartitions: Int,
replicationFactor: Int,
brokerList: Seq[Int],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]]
assignReplicasToBrokersRackUnaware
performs the replicas to brokers assignment in a fairly random manner (i.e. includes two random numbers). No additional information is used to make the decision (except the input parameters).
import kafka.admin.{AdminUtils, BrokerMetadata}
// assignReplicasToBrokersRackUnaware is a private method
// Using assignReplicasToBrokers instead as the entry point
val brokerMetadatas = Seq(
BrokerMetadata(0, None),
BrokerMetadata(1, None),
BrokerMetadata(2, None))
val assignment = AdminUtils.assignReplicasToBrokers(
brokerMetadatas,
nPartitions = 3,
replicationFactor = 2)
val output = assignment.toSeq.sortBy(_._1).map { case (brokerId, replicas) => s"$brokerId => $replicas" }
scala> output.foreach(println)
0 => ArrayBuffer(2, 1)
1 => ArrayBuffer(0, 2)
2 => ArrayBuffer(1, 0)
assignReplicasToBrokersRackUnaware
…FIXME
Note
|
assignReplicasToBrokersRackUnaware is used exclusively when AdminUtils is requested to assignReplicasToBrokers (when all the brokers in a cluster have no rack information assigned).
|
assignReplicasToBrokersRackAware
Internal Method
assignReplicasToBrokersRackAware(
nPartitions: Int,
replicationFactor: Int,
brokerMetadatas: Seq[BrokerMetadata],
fixedStartIndex: Int,
startPartitionId: Int): Map[Int, Seq[Int]]
assignReplicasToBrokersRackAware
…FIXME
Note
|
assignReplicasToBrokersRackAware is used exclusively when AdminUtils is requested to assignReplicasToBrokers (when all the brokers in a cluster have rack information assigned).
|