KafkaApis — API Request Handler
KafkaApis
is responsible for handling requests to a Kafka broker (by means of handlers).
KafkaApis
is created when KafkaServer
is requested to start (for KafkaRequestHandlerPool).
When created, KafkaApis
is given broker services (from the owning KafkaServer) that are used to handle requests (from producers, consumers, brokers, administrative clients, etc.)
Some requests are meant for the controller broker and simply do nothing (no-ops) when received by a regular non-controller broker.
API Key | Handler |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Cluster action / handleLeaderAndIsrRequest |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Cluster action / handleStopReplicaRequest |
|
|
|
|
|
Cluster action / handleUpdateMetadataRequest |
|
Tip
|
Enable Add the following line to
Refer to Logging. Please note that Kafka comes with a preconfigured
That means that the logs of |
KafkaApis and Authorizer
KafkaApis
may be given an Authorizer when created.
The Authorizer
is used for the following:
KafkaApis and AdminZkClient
adminZkClient: AdminZkClient
When created, KafkaApis
creates a AdminZkClient that is used to create a topic for the following handlers:
-
handleFindCoordinatorRequest (for
consumer_offsets
andtransaction_state
internal topics) -
handleTopicMetadataRequest (for
consumer_offsets
andtransaction_state
internal topics or any other topic with auto-creation enabled)
Routing API Requests to Respective Handlers — handle
Method
handle(
request: RequestChannel.Request): Unit
handle
first prints out the following TRACE message to the logs:
Handling request:[request] from connection [id];securityProtocol:[protocol],principal:[principal]
handle
then relays the input RequestChannel.Request to the corresponding handler per the apiKey (from the header of the input request
).
Note
|
handle is used exclusively when KafkaRequestHandler thread is requested to run.
|
Handling ElectLeaders Request — handleElectReplicaLeader
Handler
handleElectReplicaLeader(
request: RequestChannel.Request): Unit
Caution
|
FIXME Describe me again |
In summary, handleElectPreferredReplicaLeader
requests the ReplicaManager to electPreferredLeaders.
Internally, handleElectPreferredReplicaLeader
…FIXME
Note
|
handleElectPreferredReplicaLeader is used exclusively when KafkaApis is requested to handle a ElectPreferredLeaders request.
|
Handling LeaderAndIsr Request — handleLeaderAndIsrRequest
Handler
handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit
In summary, handleLeaderAndIsrRequest
requests the ReplicaManager to become the leader or a follower (of partitions).
Internally, handleLeaderAndIsrRequest
takes the LeaderAndIsrRequest from (the body of) the RequestChannel.Request.
When authorized for cluster action and not isBrokerEpochStale, handleLeaderAndIsrRequest
requests the ReplicaManager to become the leader or a follower (of partitions) (with the onLeadershipChange callback that gets the Partitions of which the broker is a new leader or a follower).
handleLeaderAndIsrRequest
isBrokerEpochStale and…FIXME
Note
|
handleLeaderAndIsrRequest is used exclusively when KafkaApis is requested to handle a LeaderAndIsr request.
|
Handling AlterReplicaLogDirs Request — handleAlterReplicaLogDirsRequest
Handler
handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit
In summary, handleAlterReplicaLogDirsRequest
requests the ReplicaManager to alterReplicaLogDirs.
handleAlterReplicaLogDirsRequest
…FIXME
Note
|
handleAlterReplicaLogDirsRequest is used exclusively when KafkaApis is requested to handle a AlterReplicaLogDirs request.
|
Handling CreateTopics Request — handleCreateTopicsRequest
Handler
handleCreateTopicsRequest(request: RequestChannel.Request): Unit
handleCreateTopicsRequest
…FIXME
handleCreateTopicsRequest
checks whether KafkaController is active…FIXME
handleCreateTopicsRequest
authorizes the Create
operation for ClusterResource
…FIXME
In the end, handleCreateTopicsRequest
requests AdminManager to create the topics.
Note
|
handleCreateTopicsRequest is used exclusively when KafkaApis is requested to handle a CreateTopics request.
|
Handling OffsetFetch Request — handleOffsetFetchRequest
Handler
handleOffsetFetchRequest(request: RequestChannel.Request): Unit
handleOffsetFetchRequest
…FIXME
Note
|
handleOffsetFetchRequest is used exclusively when KafkaApis is requested to handle a OffsetFetch request.
|
Handling Fetch Request — handleFetchRequest
Handler
handleFetchRequest(
request: RequestChannel.Request): Unit
In summary, handleFetchRequest
requests the ReplicaManager to fetch messages.
handleFetchRequest
…FIXME
Note
|
handleFetchRequest is used when KafkaApis is requested to handle a Fetch request.
|
Handling CreateAcls Request — handleCreateAcls
Handler
handleCreateAcls(
request: RequestChannel.Request): Unit
handleCreateAcls
…FIXME
Note
|
handleCreateAcls is used when KafkaApis is requested to handle a CreateAcls request.
|
Handling Metadata Request — handleTopicMetadataRequest
Handler
handleTopicMetadataRequest(
request: RequestChannel.Request): Unit
handleTopicMetadataRequest
takes the MetadataRequest from (the body of) the RequestChannel.Request.
handleTopicMetadataRequest
requests the MetadataCache for getAllTopics or its subset (per topics attribute of the MetadataRequest
).
handleTopicMetadataRequest
filters out the topics for which the current principal (user) is not authorized to execute Describe
operation.
For every authorized topic, handleTopicMetadataRequest
…FIXME
handleTopicMetadataRequest
creates a MetadataResponse.TopicMetadata
with TOPIC_AUTHORIZATION_FAILED
for every unauthorizedForCreateTopics
and unauthorizedForDescribeTopics
.
handleTopicMetadataRequest
getTopicMetadata if there are authorizedTopics
.
handleTopicMetadataRequest
prints out the following TRACE message to the logs:
Sending topic metadata [completeTopicMetadata] and brokers [brokers] for correlation id [correlationId] to client [clientId]
In the end, handleTopicMetadataRequest
sendResponseMaybeThrottle with a new MetadataResponse.
Note
|
handleTopicMetadataRequest is used exclusively when KafkaApis is requested to handle a Metadata request.
|
Authorizing Request for Operation on Resource — authorize
Internal Method
authorize(
request: RequestChannel.Request,
operation: AclOperation,
resourceType: ResourceType,
resourceName: String,
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true,
refCount: Int = 1): Boolean
authorize
simply requests the Authorizer (when defined) to authorize the given AclOperation
on a broker resource (described by the ResourceType
and resourceName
).
authorize
is positive (true
) when the Authorizer
returned ALLOWED
.
Note
|
The Authorizer is created in KafkaServer (when the KafkaApis is created). It is configured using authorizer.class.name configuration property which is empty by default and so all operations are authorized.
|
Request | AclOperation | Resource Type | Resource Name |
---|---|---|---|
|
|
groupId |
|
|
|
transactionalId |
|
|
|
kafka-cluster |
|
Fetch (from followers) |
|
|
kafka-cluster |
Metadata (for auto-create topics) |
|
|
kafka-cluster |
|
|
kafka-cluster |
|
|
|
kafka-cluster |
|
|
|
Coordinator key |
|
|
|
Coordinator key |
|
|
|
Group ID |
|
|
|
kafka-cluster |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
kafka-cluster |
|
|
|
Transactional ID |
|
|
|
kafka-cluster |
|
|
|
Transactional ID |
|
|
|
Transactional ID |
|
|
|
Transactional ID |
|
|
|
Group ID |
|
|
|
Transactional ID |
|
|
|
Group ID |
|
|
|
kafka-cluster |
|
AlterConfigs (for brokers) |
|
|
kafka-cluster |
AlterConfigs (for topics) |
|
|
Topic name |
IncrementalAlterConfigs (for brokers) |
|
|
kafka-cluster |
IncrementalAlterConfigs (for topics) |
|
|
Topic name |
DescribeConfigs (for brokers) |
|
|
kafka-cluster |
DescribeConfigs (for topics) |
|
|
Topic name |
|
|
kafka-cluster |
|
|
|
kafka-cluster |
|
|
|
Token ID |
|
|
|
kafka-cluster |
|
|
|
Group ID |
Handling CreatePartitions Request — handleCreatePartitionsRequest
Handler
handleCreatePartitionsRequest(request: RequestChannel.Request): Unit
handleCreatePartitionsRequest
…FIXME
Note
|
handleCreatePartitionsRequest is used when…FIXME
|
Handling DeleteTopics Request — handleDeleteTopicsRequest
Handler
handleDeleteTopicsRequest(
request: RequestChannel.Request): Unit
handleDeleteTopicsRequest
…FIXME
Note
|
handleDeleteTopicsRequest is used when…FIXME
|
Handling TxnOffsetCommit Request — handleTxnOffsetCommitRequest
Handler
handleTxnOffsetCommitRequest(
request: RequestChannel.Request): Unit
handleTxnOffsetCommitRequest
…FIXME
Note
|
handleTxnOffsetCommitRequest is used when…FIXME
|
Handling SyncGroup Request — handleSyncGroupRequest
Handler
handleSyncGroupRequest(
request: RequestChannel.Request): Unit
handleSyncGroupRequest
…FIXME
Note
|
handleSyncGroupRequest is used when…FIXME
|
Handling SaslHandshake Request — handleSaslHandshakeRequest
Handler
handleSaslHandshakeRequest(
request: RequestChannel.Request): Unit
handleSaslHandshakeRequest
…FIXME
Note
|
handleSaslHandshakeRequest is used when…FIXME
|
Handling SaslAuthenticate Request — handleSaslAuthenticateRequest
Handler
handleSaslAuthenticateRequest(
request: RequestChannel.Request): Unit
handleSaslAuthenticateRequest
…FIXME
Note
|
handleSaslAuthenticateRequest is used when…FIXME
|
Handling AddPartitionToTxn Request — handleAddPartitionToTxnRequest
Handler
handleAddPartitionToTxnRequest(
request: RequestChannel.Request): Unit
handleAddPartitionToTxnRequest
…FIXME
Note
|
handleAddPartitionToTxnRequest is used when…FIXME
|
Handling ApiVersions Request — handleApiVersionsRequest
Handler
handleApiVersionsRequest(
request: RequestChannel.Request): Unit
handleApiVersionsRequest
…FIXME
Note
|
handleApiVersionsRequest is used when…FIXME
|
Handling CreateToken Request — handleCreateTokenRequest
Handler
handleCreateTokenRequest(
request: RequestChannel.Request): Unit
handleCreateTokenRequest
…FIXME
Note
|
handleCreateTokenRequest is used when…FIXME
|
Handling DeleteAcls Request — handleDeleteAcls
Handler
handleDeleteAcls(
request: RequestChannel.Request): Unit
handleDeleteAcls
…FIXME
Note
|
handleDeleteAcls is used when…FIXME
|
Handling DeleteGroups Request — handleDeleteGroupsRequest
Handler
handleDeleteGroupsRequest(
request: RequestChannel.Request): Unit
handleDeleteGroupsRequest
…FIXME
Note
|
handleDeleteGroupsRequest is used when…FIXME
|
Handling DescribeConfigs Request — handleDescribeConfigsRequest
Handler
handleDescribeConfigsRequest(
request: RequestChannel.Request): Unit
handleDescribeConfigsRequest
takes the DescribeConfigsRequest from (the body of) the given RequestChannel.Request.
handleDescribeConfigsRequest
authorizes the DescribeConfigs operation on the broker and topic resources (of the DescribeConfigsRequest).
For every authorized operation, handleDescribeConfigsRequest
requests the AdminManager to describeConfigs.
In the end, handleDescribeConfigsRequest
sendResponseMaybeThrottle with a new DescribeConfigsResponse
.
Note
|
handleDescribeConfigsRequest is used exclusively when KafkaApis is requested to handle a DescribeConfigs request.
|
Handling DescribeAcls Request — handleDescribeAcls
Handler
handleDescribeAcls(
request: RequestChannel.Request): Unit
handleDescribeAcls
authorizeClusterOperation for the DESCRIBE
operation.
handleDescribeAcls
…FIXME
Note
|
handleDescribeAcls is used when…FIXME
|
Handling DescribeTokens Request — handleDescribeTokensRequest
Handler
handleDescribeTokensRequest(
request: RequestChannel.Request): Unit
handleDescribeTokensRequest
…FIXME
Note
|
handleDescribeTokensRequest is used when…FIXME
|
Handling EndTxn Request — handleEndTxnRequest
Handler
handleEndTxnRequest(
request: RequestChannel.Request): Unit
handleEndTxnRequest
…FIXME
Note
|
handleEndTxnRequest is used when…FIXME
|
Handling ExpireToken Request — handleExpireTokenRequest
Handler
handleExpireTokenRequest(
request: RequestChannel.Request): Unit
handleExpireTokenRequest
…FIXME
Note
|
handleExpireTokenRequest is used when…FIXME
|
Handling Heartbeat Request — handleHeartbeatRequest
Handler
handleHeartbeatRequest(
request: RequestChannel.Request): Unit
handleHeartbeatRequest
…FIXME
Note
|
handleHeartbeatRequest is used when…FIXME
|
Handling InitProducerId Request — handleInitProducerIdRequest
Handler
handleInitProducerIdRequest(
request: RequestChannel.Request): Unit
handleInitProducerIdRequest
…FIXME
Note
|
handleInitProducerIdRequest is used when…FIXME
|
Handling IncrementalAlterConfigs Request — handleIncrementalAlterConfigsRequest
Handler
handleIncrementalAlterConfigsRequest(
request: RequestChannel.Request): Unit
handleIncrementalAlterConfigsRequest
…FIXME
Note
|
handleIncrementalAlterConfigsRequest is used when…FIXME
|
Handling ListGroups Request — handleListGroupsRequest
Handler
handleListGroupsRequest(
request: RequestChannel.Request): Unit
handleListGroupsRequest
…FIXME
Note
|
handleListGroupsRequest is used when…FIXME
|
Handling LeaveGroup Request — handleLeaveGroupRequest
Handler
handleLeaveGroupRequest(
request: RequestChannel.Request): Unit
handleLeaveGroupRequest
…FIXME
Note
|
handleLeaveGroupRequest is used when…FIXME
|
Handling RenewToken Request — handleRenewTokenRequest
Handler
handleRenewTokenRequest(
request: RequestChannel.Request): Unit
handleRenewTokenRequest
…FIXME
Note
|
handleRenewTokenRequest is used when…FIXME
|
Handling ControlledShutdown Request — handleControlledShutdownRequest
Handler
handleControlledShutdownRequest(request: RequestChannel.Request): Unit
handleControlledShutdownRequest
…FIXME
Note
|
handleControlledShutdownRequest is used when…FIXME
|
fetchOffsetForTimestamp
Internal Method
fetchOffsetForTimestamp(topicPartition: TopicPartition, timestamp: Long): Option[TimestampOffset]
fetchOffsetForTimestamp
…FIXME
Note
|
fetchOffsetForTimestamp is used exclusively when KafkaApis is requested to handleListOffsetRequestV1AndAbove.
|
handleListOffsetRequestV0
Internal Method
handleListOffsetRequestV0(
request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData]
handleListOffsetRequestV0
…FIXME
Note
|
handleListOffsetRequestV0 is used exclusively when KafkaApis is requested to handleListOffsetRequest (for the API version 0 ).
|
handleListOffsetRequestV1AndAbove
Internal Method
handleListOffsetRequestV1AndAbove(
request: RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData]
handleListOffsetRequestV1AndAbove
…FIXME
Note
|
handleListOffsetRequestV1AndAbove is used exclusively when KafkaApis is requested to handleListOffsetRequest (for the API version 1 or above).
|
Handling DescribeLogDirs Request — handleDescribeLogDirsRequest
Handler
handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit
In summary, handleDescribeLogDirsRequest
requests the ReplicaManager to describeLogDirs.
Internally, handleDescribeLogDirsRequest
takes the DescribeLogDirsRequest from the body (of the RequestChannel.Request).
handleDescribeLogDirsRequest
branches off per whether the DescribeLogDirsRequest
was for isAllTopicPartitions or not.
-
For all TopicPartitions,
handleDescribeLogDirsRequest
requests the ReplicaManager for the LogManager that is requested for all the partition logs and their TopicPartitions. -
For specific
TopicPartitions
,handleDescribeLogDirsRequest
requests them from the DescribeLogDirsRequest.
Note
|
handleDescribeLogDirsRequest returns an empty list of log directories when the request is not authorized.
|
handleDescribeLogDirsRequest
then requests the ReplicaManager to describeLogDirs with the requested TopicPartitions
.
In the end, handleDescribeLogDirsRequest
sendResponseMaybeThrottle with a DescribeLogDirsResponse
and the LogDirInfos
.
Note
|
handleDescribeLogDirsRequest is used exclusively when KafkaApis is requested to handle a DescribeLogDirs request.
|
sendResponseMaybeThrottle
Internal Method
sendResponseMaybeThrottle(
request: RequestChannel.Request,
createResponse: Int => AbstractResponse,
onComplete: Option[Send => Unit] = None): Unit
sendResponseMaybeThrottle
…FIXME
Note
|
sendResponseMaybeThrottle is used when…FIXME
|
fetchOffsetsBefore
Method
fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long]
fetchOffsetsBefore
…FIXME
Note
|
fetchOffsetsBefore is used exclusively when KafkaApis is requested to fetchOffsets.
|
fetchOffsets
Method
fetchOffsets(
logManager: LogManager,
topicPartition: TopicPartition,
timestamp: Long,
maxNumOffsets: Int): Seq[Long]
fetchOffsets
…FIXME
Note
|
fetchOffsets is used exclusively when KafkaApis is requested to handleListOffsetRequestV0.
|
Handling StopReplica Request — handleStopReplicaRequest
Handler
handleStopReplicaRequest(request: RequestChannel.Request): Unit
In summary, handleStopReplicaRequest
requests the ReplicaManager to stopReplicas.
handleStopReplicaRequest
…FIXME
Note
|
handleStopReplicaRequest is used exclusively when KafkaApis is requested to handle a StopReplica request.
|
Handling UpdateMetadata Request (From Kafka Controller) — handleUpdateMetadataRequest
Handler
handleUpdateMetadataRequest(
request: RequestChannel.Request): Unit
handleUpdateMetadataRequest
takes the UpdateMetadataRequest from (the body of) the RequestChannel.Request.
When authorized for cluster action and not isBrokerEpochStale, handleUpdateMetadataRequest
requests the following:
-
ReplicaManager to maybeUpdateMetadataCache (that gives deleted partitions)
-
AdminManager to tryCompleteDelayedTopicOperations for all the topics (based on the partitionStates of the
UpdateMetadataRequest
)
handleUpdateMetadataRequest
updates quotas…FIXME
handleUpdateMetadataRequest
requests the ReplicaManager to tryCompleteElection for every partition (based on the partitionStates of the UpdateMetadataRequest
).
In the end, handleUpdateMetadataRequest
sendResponseExemptThrottle (with a no-error UpdateMetadataResponse
).
Note
|
handleUpdateMetadataRequest is used exclusively when KafkaApis is requested to handle a UpdateMetadata request.
|
Handling OffsetCommitRequest — handleOffsetCommitRequest
Handler
handleOffsetCommitRequest(request: RequestChannel.Request): Unit
handleOffsetCommitRequest
takes the OffsetCommitRequest from (the body of) the RequestChannel.Request.
If authorized, handleOffsetCommitRequest
simply requests the GroupCoordinator to handleCommitOffsets (with the sendResponseCallback).
Note
|
If authorized, handleOffsetCommitRequest branches off per API version (i.e. 0 to store offsets in Zookeeper and 1 and beyond). The API version 0 is not described here.
|
If not authorized, handleOffsetCommitRequest
…FIXME
Note
|
handleOffsetCommitRequest is used exclusively when KafkaApis is requested to handle an OffsetCommit request.
|
sendResponseCallback
Method
sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]): Unit
sendResponseCallback
prints out the following DEBUG message to the logs for offsets with errors (i.e. unauthorized topics to read or non-existing topics):
Offset commit request with correlation id [correlationId] from client [clientId] on partition [topicPartition] failed due to [exceptionName]
In the end, sendResponseCallback
sendResponseMaybeThrottle a new OffsetCommitResponse
.
createInternalTopic
Internal Method
createInternalTopic(
topic: String): MetadataResponse.TopicMetadata
createInternalTopic
…FIXME
Note
|
createInternalTopic is used when KafkaApis is requested to getOrCreateInternalTopic and getTopicMetadata (for metadata of consumer_offsets and transaction_state internal topics).
|
sendResponseExemptThrottle
Internal Method
sendResponseExemptThrottle(
request: RequestChannel.Request,
response: AbstractResponse,
onComplete: Option[Send => Unit] = None): Unit
sendResponseExemptThrottle
…FIXME
Note
|
sendResponseExemptThrottle is used when KafkaApis is requested to…FIXME
|
getOrCreateInternalTopic
Internal Method
getOrCreateInternalTopic(
topic: String,
listenerName: ListenerName): MetadataResponse.TopicMetadata
getOrCreateInternalTopic
requests the MetadataCache for getTopicMetadata for the input topic
(and the ListenerName
).
In the end, getOrCreateInternalTopic
returns the TopicMetadata
if available or createInternalTopic.
Note
|
getOrCreateInternalTopic is used exclusively when KafkaApis is requested to handle a FindCoordinator request (and requests the metadata of consumer_offsets and transaction_state internal topics).
|
getTopicMetadata
Internal Method
getTopicMetadata(
allowAutoTopicCreation: Boolean,
topics: Set[String],
listenerName: ListenerName,
errorUnavailableEndpoints: Boolean,
errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata]
getTopicMetadata
…FIXME
Note
|
getTopicMetadata is used exclusively when KafkaApis is requested to handle a Metadata request.
|
Handling DescribeGroups Request — handleDescribeGroupRequest
Handler
handleDescribeGroupRequest(request: RequestChannel.Request): Unit
handleDescribeGroupRequest
…FIXME
Note
|
handleDescribeGroupRequest is used exclusively when KafkaApis is requested to handle a DescribeGroups request.
|
Handling AlterConfigs Request — handleAlterConfigsRequest
Handler
handleAlterConfigsRequest(request: RequestChannel.Request): Unit
handleAlterConfigsRequest
…FIXME
Note
|
handleAlterConfigsRequest is used exclusively when KafkaApis is requested to handle a AlterConfigs request.
|
createTopic
Internal Method
createTopic(
topic: String,
numPartitions: Int,
replicationFactor: Int,
properties: Properties = new Properties()): MetadataResponse.TopicMetadata
createTopic
…FIXME
Note
|
createTopic is used when KafkaApis is requested to createInternalTopic and getTopicMetadata.
|
Handling FindCoordinatorRequest — handleFindCoordinatorRequest
Handler
handleFindCoordinatorRequest(request: RequestChannel.Request): Unit
handleFindCoordinatorRequest
takes the FindCoordinatorRequest from the body (of the RequestChannel.Request).
handleFindCoordinatorRequest
checks permissions…FIXME
For an authorized request, handleFindCoordinatorRequest
branches off per CoordinatorType, i.e. GROUP or TRANSACTION.
For GROUP
coordinator type, handleFindCoordinatorRequest
does the following:
-
Requests the GroupCoordinator for partitionFor the coordinator key (of the
FindCoordinatorRequest
) -
getOrCreateInternalTopic for __consumer_offsets topic
For TRANSACTION
coordinator type, handleFindCoordinatorRequest
does the following:
-
Requests the TransactionCoordinator for partitionFor (for the
coordinatorKey
of theFindCoordinatorRequest
) -
getOrCreateInternalTopic for __transaction_state topic
In the end, handleFindCoordinatorRequest
sendResponseMaybeThrottle with a new FindCoordinatorResponse.
You should see the following TRACE message in the logs:
Sending FindCoordinator response [body] for correlation id [correlationId] to client [clientId].
Note
|
handleFindCoordinatorRequest is used exclusively when KafkaApis is requested to handle a FindCoordinator request.
|
Handling JoinGroupRequest — handleJoinGroupRequest
Handler
handleJoinGroupRequest(request: RequestChannel.Request): Unit
handleJoinGroupRequest
takes the JoinGroupRequest from the body (of the RequestChannel.Request) and simply requests the GroupCoordinator to handleJoinGroup (with sendResponseCallback to handle the response).
Note
|
handleJoinGroupRequest is used exclusively when KafkaApis is requested to handle a JoinGroup request.
|
Handling JoinGroup Response — sendResponseCallback
Method
sendResponseCallback(joinResult: JoinGroupResult): Unit
sendResponseCallback
creates a new JoinGroupResponse for the given JoinGroupResult
and prints out the following TRACE message to the logs:
Sending join group response [responseBody] for correlation id [correlationId] to client [clientId].
In the end, sendResponseCallback
sendResponseMaybeThrottle with the new JoinGroupResponse.
Handling AddOffsetsToTxn Request — handleAddOffsetsToTxnRequest
Handler
handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit
handleAddOffsetsToTxnRequest
…FIXME
Note
|
handleAddOffsetsToTxnRequest is used exclusively when KafkaApis is requested to handle a AddOffsetsToTxn request.
|
Handling Produce Request — handleProduceRequest
Handler
handleProduceRequest(
request: RequestChannel.Request): Unit
In summary, handleProduceRequest
takes the ProduceRequest from the body (of the RequestChannel.Request) and requests the ReplicaManager to appendRecords (with isFromClient
flag enabled).
Note
|
internalTopicsAllowed flag (when the ReplicaManager is requested to appendRecords) is enabled (true ) only when the client ID is __admin_client.
|
handleProduceRequest
…FIXME
handleProduceRequest
…FIXME
Note
|
handleProduceRequest is used when KafkaApis is requested to handle a Produce request.
|
handleWriteTxnMarkersRequest
Handler
handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit
In summary, handleWriteTxnMarkersRequest
requests the ReplicaManager to getMagic followed by appendRecords (with isFromClient
flag disabled).
handleWriteTxnMarkersRequest
…FIXME
Note
|
handleWriteTxnMarkersRequest is used exclusively when KafkaApis is requested to handle a WriteTxnMarkers request.
|
handleDeleteRecordsRequest
Handler
handleDeleteRecordsRequest(request: RequestChannel.Request): Unit
In summary, handleDeleteRecordsRequest
requests the ReplicaManager to deleteRecords.
handleDeleteRecordsRequest
…FIXME
Note
|
handleDeleteRecordsRequest is used exclusively when KafkaApis is requested to handle a DeleteRecords request.
|
handleOffsetForLeaderEpochRequest
Handler
handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit
In summary, handleOffsetForLeaderEpochRequest
requests the ReplicaManager to lastOffsetForLeaderEpoch.
handleOffsetForLeaderEpochRequest
…FIXME
Note
|
handleOffsetForLeaderEpochRequest is used exclusively when KafkaApis is requested to handle a OffsetForLeaderEpoch request.
|
handleListOffsetRequest
Handler
handleListOffsetRequest(request: RequestChannel.Request): Unit
In summary, handleListOffsetRequest
requests the ReplicaManager to fetchOffsetForTimestamp.
handleListOffsetRequest
…FIXME
Note
|
handleListOffsetRequest is used exclusively when KafkaApis is requested to handle a ListOffsets request.
|
handleOffsetDeleteRequest
Handler
handleOffsetDeleteRequest(
request: RequestChannel.Request): Unit
In summary, handleOffsetDeleteRequest
…FIXME
handleOffsetDeleteRequest
…FIXME
Note
|
handleOffsetDeleteRequest is used when KafkaApis is requested to handle a OffsetDelete request.
|
handleAlterPartitionReassignmentsRequest
Handler
handleAlterPartitionReassignmentsRequest(
request: RequestChannel.Request): Unit
In summary, handleAlterPartitionReassignmentsRequest
…FIXME
handleAlterPartitionReassignmentsRequest
…FIXME
Note
|
handleAlterPartitionReassignmentsRequest is used when KafkaApis is requested to handle a AlterPartitionReassignments request.
|
handleListPartitionReassignmentsRequest
Handler
handleListPartitionReassignmentsRequest(
request: RequestChannel.Request): Unit
In summary, handleListPartitionReassignmentsRequest
…FIXME
handleListPartitionReassignmentsRequest
…FIXME
Note
|
handleListPartitionReassignmentsRequest is used when KafkaApis is requested to handle a ListPartitionReassignments request.
|
isAuthorizedClusterAction
Internal Method
isAuthorizedClusterAction(request: RequestChannel.Request): Boolean
isAuthorizedClusterAction
simply authorize with ClusterAction
operation and ClusterResource
resource.
Note
|
isAuthorizedClusterAction is used when…FIXME
|
updateRecordConversionStats
Internal Method
updateRecordConversionStats(
request: RequestChannel.Request,
tp: TopicPartition,
conversionStats: RecordConversionStats): Unit
updateRecordConversionStats
…FIXME
Note
|
updateRecordConversionStats is used when…FIXME
|
Asserting Permissions for Cluster Action — authorizeClusterAction
Method
authorizeClusterAction(request: RequestChannel.Request): Unit
authorizeClusterAction
simply asserts that the RequestChannel.Request is authorized to execute ClusterAction
on a ClusterResource
. If so, authorizeClusterAction
does nothing and returns.
If not authorized, authorizeClusterAction
throws a ClusterAuthorizationException
:
Request [request] is not authorized.
Note
|
authorizeClusterAction is used when KafkaApis is requested to handleLeaderAndIsrRequest, handleStopReplicaRequest, handleUpdateMetadataRequest, handleControlledShutdownRequest, and handleWriteTxnMarkersRequest.
|
isBrokerEpochStale
Internal Method
isBrokerEpochStale(
brokerEpochInRequest: Long): Boolean
isBrokerEpochStale
…FIXME
Note
|
isBrokerEpochStale is used when KafkaApis is requested to handleLeaderAndIsrRequest, handleStopReplicaRequest, and handleUpdateMetadataRequest.
|
filterAuthorized
Internal Method
filterAuthorized(
request: RequestChannel.Request,
operation: AclOperation,
resourceType: ResourceType,
resourceNames: Seq[String],
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true): Set[String]
filterAuthorized
…FIXME
Note
|
filterAuthorized is used when…FIXME
|
authorizedOperations
Internal Method
authorizedOperations(
request: RequestChannel.Request,
resource: Resource): Int
authorizedOperations
…FIXME
Request | Resource Type | Resource Name |
---|---|---|
|
kafka-cluster |
|
|
Topic name |
|
|
Group ID |
Throwing ClusterAuthorizationException for Unauthorized Cluster Operation — authorizeClusterOperation
Internal Method
authorizeClusterOperation(
request: RequestChannel.Request,
operation: AclOperation): Unit
authorizeClusterOperation
simply throws a ClusterAuthorizationException
when the given request is not authorized for the given operation on CLUSTER
resource and the name as kafka-cluster
.
Request [request] is not authorized.
Request | AclOperation |
---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|