KafkaApis — API Request Handler
KafkaApis is responsible for handling requests to a Kafka broker (by means of handlers).
KafkaApis is created when KafkaServer is requested to start (for KafkaRequestHandlerPool).
When created, KafkaApis is given broker services (from the owning KafkaServer) that are used to handle requests (from producers, consumers, brokers, administrative clients, etc.)
Some requests are meant for the controller broker and simply do nothing (no-ops) when received by a regular non-controller broker.
| API Key | Handler |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Cluster action / handleLeaderAndIsrRequest |
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
Cluster action / handleStopReplicaRequest |
|
|
|
|
|
Cluster action / handleUpdateMetadataRequest |
|
|
Tip
|
Enable Add the following line to
Refer to Logging. Please note that Kafka comes with a preconfigured
That means that the logs of |
KafkaApis and Authorizer
KafkaApis may be given an Authorizer when created.
The Authorizer is used for the following:
KafkaApis and AdminZkClient
adminZkClient: AdminZkClient
When created, KafkaApis creates a AdminZkClient that is used to create a topic for the following handlers:
-
handleFindCoordinatorRequest (for
consumer_offsetsandtransaction_stateinternal topics) -
handleTopicMetadataRequest (for
consumer_offsetsandtransaction_stateinternal topics or any other topic with auto-creation enabled)
Routing API Requests to Respective Handlers — handle Method
handle(
request: RequestChannel.Request): Unit
handle first prints out the following TRACE message to the logs:
Handling request:[request] from connection [id];securityProtocol:[protocol],principal:[principal]
handle then relays the input RequestChannel.Request to the corresponding handler per the apiKey (from the header of the input request).
|
Note
|
handle is used exclusively when KafkaRequestHandler thread is requested to run.
|
Handling ElectLeaders Request — handleElectReplicaLeader Handler
handleElectReplicaLeader(
request: RequestChannel.Request): Unit
|
Caution
|
FIXME Describe me again |
In summary, handleElectPreferredReplicaLeader requests the ReplicaManager to electPreferredLeaders.
Internally, handleElectPreferredReplicaLeader…FIXME
|
Note
|
handleElectPreferredReplicaLeader is used exclusively when KafkaApis is requested to handle a ElectPreferredLeaders request.
|
Handling LeaderAndIsr Request — handleLeaderAndIsrRequest Handler
handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit
In summary, handleLeaderAndIsrRequest requests the ReplicaManager to become the leader or a follower (of partitions).
Internally, handleLeaderAndIsrRequest takes the LeaderAndIsrRequest from (the body of) the RequestChannel.Request.
When authorized for cluster action and not isBrokerEpochStale, handleLeaderAndIsrRequest requests the ReplicaManager to become the leader or a follower (of partitions) (with the onLeadershipChange callback that gets the Partitions of which the broker is a new leader or a follower).
handleLeaderAndIsrRequest isBrokerEpochStale and…FIXME
|
Note
|
handleLeaderAndIsrRequest is used exclusively when KafkaApis is requested to handle a LeaderAndIsr request.
|
Handling AlterReplicaLogDirs Request — handleAlterReplicaLogDirsRequest Handler
handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit
In summary, handleAlterReplicaLogDirsRequest requests the ReplicaManager to alterReplicaLogDirs.
handleAlterReplicaLogDirsRequest…FIXME
|
Note
|
handleAlterReplicaLogDirsRequest is used exclusively when KafkaApis is requested to handle a AlterReplicaLogDirs request.
|
Handling CreateTopics Request — handleCreateTopicsRequest Handler
handleCreateTopicsRequest(request: RequestChannel.Request): Unit
handleCreateTopicsRequest…FIXME
handleCreateTopicsRequest checks whether KafkaController is active…FIXME
handleCreateTopicsRequest authorizes the Create operation for ClusterResource…FIXME
In the end, handleCreateTopicsRequest requests AdminManager to create the topics.
|
Note
|
handleCreateTopicsRequest is used exclusively when KafkaApis is requested to handle a CreateTopics request.
|
Handling OffsetFetch Request — handleOffsetFetchRequest Handler
handleOffsetFetchRequest(request: RequestChannel.Request): Unit
handleOffsetFetchRequest…FIXME
|
Note
|
handleOffsetFetchRequest is used exclusively when KafkaApis is requested to handle a OffsetFetch request.
|
Handling Fetch Request — handleFetchRequest Handler
handleFetchRequest(
request: RequestChannel.Request): Unit
In summary, handleFetchRequest requests the ReplicaManager to fetch messages.
handleFetchRequest…FIXME
|
Note
|
handleFetchRequest is used when KafkaApis is requested to handle a Fetch request.
|
Handling CreateAcls Request — handleCreateAcls Handler
handleCreateAcls(
request: RequestChannel.Request): Unit
handleCreateAcls…FIXME
|
Note
|
handleCreateAcls is used when KafkaApis is requested to handle a CreateAcls request.
|
Handling Metadata Request — handleTopicMetadataRequest Handler
handleTopicMetadataRequest(
request: RequestChannel.Request): Unit
handleTopicMetadataRequest takes the MetadataRequest from (the body of) the RequestChannel.Request.
handleTopicMetadataRequest requests the MetadataCache for getAllTopics or its subset (per topics attribute of the MetadataRequest).
handleTopicMetadataRequest filters out the topics for which the current principal (user) is not authorized to execute Describe operation.
For every authorized topic, handleTopicMetadataRequest…FIXME
handleTopicMetadataRequest creates a MetadataResponse.TopicMetadata with TOPIC_AUTHORIZATION_FAILED for every unauthorizedForCreateTopics and unauthorizedForDescribeTopics.
handleTopicMetadataRequest getTopicMetadata if there are authorizedTopics.
handleTopicMetadataRequest prints out the following TRACE message to the logs:
Sending topic metadata [completeTopicMetadata] and brokers [brokers] for correlation id [correlationId] to client [clientId]
In the end, handleTopicMetadataRequest sendResponseMaybeThrottle with a new MetadataResponse.
|
Note
|
handleTopicMetadataRequest is used exclusively when KafkaApis is requested to handle a Metadata request.
|
Authorizing Request for Operation on Resource — authorize Internal Method
authorize(
request: RequestChannel.Request,
operation: AclOperation,
resourceType: ResourceType,
resourceName: String,
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true,
refCount: Int = 1): Boolean
authorize simply requests the Authorizer (when defined) to authorize the given AclOperation on a broker resource (described by the ResourceType and resourceName).
authorize is positive (true) when the Authorizer returned ALLOWED.
|
Note
|
The Authorizer is created in KafkaServer (when the KafkaApis is created). It is configured using authorizer.class.name configuration property which is empty by default and so all operations are authorized.
|
| Request | AclOperation | Resource Type | Resource Name |
|---|---|---|---|
|
|
groupId |
|
|
|
transactionalId |
|
|
|
kafka-cluster |
|
Fetch (from followers) |
|
|
kafka-cluster |
Metadata (for auto-create topics) |
|
|
kafka-cluster |
|
|
kafka-cluster |
|
|
|
kafka-cluster |
|
|
|
Coordinator key |
|
|
|
Coordinator key |
|
|
|
Group ID |
|
|
|
kafka-cluster |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
Group ID |
|
|
|
kafka-cluster |
|
|
|
Transactional ID |
|
|
|
kafka-cluster |
|
|
|
Transactional ID |
|
|
|
Transactional ID |
|
|
|
Transactional ID |
|
|
|
Group ID |
|
|
|
Transactional ID |
|
|
|
Group ID |
|
|
|
kafka-cluster |
|
AlterConfigs (for brokers) |
|
|
kafka-cluster |
AlterConfigs (for topics) |
|
|
Topic name |
IncrementalAlterConfigs (for brokers) |
|
|
kafka-cluster |
IncrementalAlterConfigs (for topics) |
|
|
Topic name |
DescribeConfigs (for brokers) |
|
|
kafka-cluster |
DescribeConfigs (for topics) |
|
|
Topic name |
|
|
kafka-cluster |
|
|
|
kafka-cluster |
|
|
|
Token ID |
|
|
|
kafka-cluster |
|
|
|
Group ID |
Handling CreatePartitions Request — handleCreatePartitionsRequest Handler
handleCreatePartitionsRequest(request: RequestChannel.Request): Unit
handleCreatePartitionsRequest…FIXME
|
Note
|
handleCreatePartitionsRequest is used when…FIXME
|
Handling DeleteTopics Request — handleDeleteTopicsRequest Handler
handleDeleteTopicsRequest(
request: RequestChannel.Request): Unit
handleDeleteTopicsRequest…FIXME
|
Note
|
handleDeleteTopicsRequest is used when…FIXME
|
Handling TxnOffsetCommit Request — handleTxnOffsetCommitRequest Handler
handleTxnOffsetCommitRequest(
request: RequestChannel.Request): Unit
handleTxnOffsetCommitRequest…FIXME
|
Note
|
handleTxnOffsetCommitRequest is used when…FIXME
|
Handling SyncGroup Request — handleSyncGroupRequest Handler
handleSyncGroupRequest(
request: RequestChannel.Request): Unit
handleSyncGroupRequest…FIXME
|
Note
|
handleSyncGroupRequest is used when…FIXME
|
Handling SaslHandshake Request — handleSaslHandshakeRequest Handler
handleSaslHandshakeRequest(
request: RequestChannel.Request): Unit
handleSaslHandshakeRequest…FIXME
|
Note
|
handleSaslHandshakeRequest is used when…FIXME
|
Handling SaslAuthenticate Request — handleSaslAuthenticateRequest Handler
handleSaslAuthenticateRequest(
request: RequestChannel.Request): Unit
handleSaslAuthenticateRequest…FIXME
|
Note
|
handleSaslAuthenticateRequest is used when…FIXME
|
Handling AddPartitionToTxn Request — handleAddPartitionToTxnRequest Handler
handleAddPartitionToTxnRequest(
request: RequestChannel.Request): Unit
handleAddPartitionToTxnRequest…FIXME
|
Note
|
handleAddPartitionToTxnRequest is used when…FIXME
|
Handling ApiVersions Request — handleApiVersionsRequest Handler
handleApiVersionsRequest(
request: RequestChannel.Request): Unit
handleApiVersionsRequest…FIXME
|
Note
|
handleApiVersionsRequest is used when…FIXME
|
Handling CreateToken Request — handleCreateTokenRequest Handler
handleCreateTokenRequest(
request: RequestChannel.Request): Unit
handleCreateTokenRequest…FIXME
|
Note
|
handleCreateTokenRequest is used when…FIXME
|
Handling DeleteAcls Request — handleDeleteAcls Handler
handleDeleteAcls(
request: RequestChannel.Request): Unit
handleDeleteAcls…FIXME
|
Note
|
handleDeleteAcls is used when…FIXME
|
Handling DeleteGroups Request — handleDeleteGroupsRequest Handler
handleDeleteGroupsRequest(
request: RequestChannel.Request): Unit
handleDeleteGroupsRequest…FIXME
|
Note
|
handleDeleteGroupsRequest is used when…FIXME
|
Handling DescribeConfigs Request — handleDescribeConfigsRequest Handler
handleDescribeConfigsRequest(
request: RequestChannel.Request): Unit
handleDescribeConfigsRequest takes the DescribeConfigsRequest from (the body of) the given RequestChannel.Request.
handleDescribeConfigsRequest authorizes the DescribeConfigs operation on the broker and topic resources (of the DescribeConfigsRequest).
For every authorized operation, handleDescribeConfigsRequest requests the AdminManager to describeConfigs.
In the end, handleDescribeConfigsRequest sendResponseMaybeThrottle with a new DescribeConfigsResponse.
|
Note
|
handleDescribeConfigsRequest is used exclusively when KafkaApis is requested to handle a DescribeConfigs request.
|
Handling DescribeAcls Request — handleDescribeAcls Handler
handleDescribeAcls(
request: RequestChannel.Request): Unit
handleDescribeAcls authorizeClusterOperation for the DESCRIBE operation.
handleDescribeAcls…FIXME
|
Note
|
handleDescribeAcls is used when…FIXME
|
Handling DescribeTokens Request — handleDescribeTokensRequest Handler
handleDescribeTokensRequest(
request: RequestChannel.Request): Unit
handleDescribeTokensRequest…FIXME
|
Note
|
handleDescribeTokensRequest is used when…FIXME
|
Handling EndTxn Request — handleEndTxnRequest Handler
handleEndTxnRequest(
request: RequestChannel.Request): Unit
handleEndTxnRequest…FIXME
|
Note
|
handleEndTxnRequest is used when…FIXME
|
Handling ExpireToken Request — handleExpireTokenRequest Handler
handleExpireTokenRequest(
request: RequestChannel.Request): Unit
handleExpireTokenRequest…FIXME
|
Note
|
handleExpireTokenRequest is used when…FIXME
|
Handling Heartbeat Request — handleHeartbeatRequest Handler
handleHeartbeatRequest(
request: RequestChannel.Request): Unit
handleHeartbeatRequest…FIXME
|
Note
|
handleHeartbeatRequest is used when…FIXME
|
Handling InitProducerId Request — handleInitProducerIdRequest Handler
handleInitProducerIdRequest(
request: RequestChannel.Request): Unit
handleInitProducerIdRequest…FIXME
|
Note
|
handleInitProducerIdRequest is used when…FIXME
|
Handling IncrementalAlterConfigs Request — handleIncrementalAlterConfigsRequest Handler
handleIncrementalAlterConfigsRequest(
request: RequestChannel.Request): Unit
handleIncrementalAlterConfigsRequest…FIXME
|
Note
|
handleIncrementalAlterConfigsRequest is used when…FIXME
|
Handling ListGroups Request — handleListGroupsRequest Handler
handleListGroupsRequest(
request: RequestChannel.Request): Unit
handleListGroupsRequest…FIXME
|
Note
|
handleListGroupsRequest is used when…FIXME
|
Handling LeaveGroup Request — handleLeaveGroupRequest Handler
handleLeaveGroupRequest(
request: RequestChannel.Request): Unit
handleLeaveGroupRequest…FIXME
|
Note
|
handleLeaveGroupRequest is used when…FIXME
|
Handling RenewToken Request — handleRenewTokenRequest Handler
handleRenewTokenRequest(
request: RequestChannel.Request): Unit
handleRenewTokenRequest…FIXME
|
Note
|
handleRenewTokenRequest is used when…FIXME
|
Handling ControlledShutdown Request — handleControlledShutdownRequest Handler
handleControlledShutdownRequest(request: RequestChannel.Request): Unit
handleControlledShutdownRequest…FIXME
|
Note
|
handleControlledShutdownRequest is used when…FIXME
|
fetchOffsetForTimestamp Internal Method
fetchOffsetForTimestamp(topicPartition: TopicPartition, timestamp: Long): Option[TimestampOffset]
fetchOffsetForTimestamp…FIXME
|
Note
|
fetchOffsetForTimestamp is used exclusively when KafkaApis is requested to handleListOffsetRequestV1AndAbove.
|
handleListOffsetRequestV0 Internal Method
handleListOffsetRequestV0(
request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData]
handleListOffsetRequestV0…FIXME
|
Note
|
handleListOffsetRequestV0 is used exclusively when KafkaApis is requested to handleListOffsetRequest (for the API version 0).
|
handleListOffsetRequestV1AndAbove Internal Method
handleListOffsetRequestV1AndAbove(
request: RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData]
handleListOffsetRequestV1AndAbove…FIXME
|
Note
|
handleListOffsetRequestV1AndAbove is used exclusively when KafkaApis is requested to handleListOffsetRequest (for the API version 1 or above).
|
Handling DescribeLogDirs Request — handleDescribeLogDirsRequest Handler
handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit
In summary, handleDescribeLogDirsRequest requests the ReplicaManager to describeLogDirs.
Internally, handleDescribeLogDirsRequest takes the DescribeLogDirsRequest from the body (of the RequestChannel.Request).
handleDescribeLogDirsRequest branches off per whether the DescribeLogDirsRequest was for isAllTopicPartitions or not.
-
For all TopicPartitions,
handleDescribeLogDirsRequestrequests the ReplicaManager for the LogManager that is requested for all the partition logs and their TopicPartitions. -
For specific
TopicPartitions,handleDescribeLogDirsRequestrequests them from the DescribeLogDirsRequest.
|
Note
|
handleDescribeLogDirsRequest returns an empty list of log directories when the request is not authorized.
|
handleDescribeLogDirsRequest then requests the ReplicaManager to describeLogDirs with the requested TopicPartitions.
In the end, handleDescribeLogDirsRequest sendResponseMaybeThrottle with a DescribeLogDirsResponse and the LogDirInfos.
|
Note
|
handleDescribeLogDirsRequest is used exclusively when KafkaApis is requested to handle a DescribeLogDirs request.
|
sendResponseMaybeThrottle Internal Method
sendResponseMaybeThrottle(
request: RequestChannel.Request,
createResponse: Int => AbstractResponse,
onComplete: Option[Send => Unit] = None): Unit
sendResponseMaybeThrottle…FIXME
|
Note
|
sendResponseMaybeThrottle is used when…FIXME
|
fetchOffsetsBefore Method
fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long]
fetchOffsetsBefore…FIXME
|
Note
|
fetchOffsetsBefore is used exclusively when KafkaApis is requested to fetchOffsets.
|
fetchOffsets Method
fetchOffsets(
logManager: LogManager,
topicPartition: TopicPartition,
timestamp: Long,
maxNumOffsets: Int): Seq[Long]
fetchOffsets…FIXME
|
Note
|
fetchOffsets is used exclusively when KafkaApis is requested to handleListOffsetRequestV0.
|
Handling StopReplica Request — handleStopReplicaRequest Handler
handleStopReplicaRequest(request: RequestChannel.Request): Unit
In summary, handleStopReplicaRequest requests the ReplicaManager to stopReplicas.
handleStopReplicaRequest…FIXME
|
Note
|
handleStopReplicaRequest is used exclusively when KafkaApis is requested to handle a StopReplica request.
|
Handling UpdateMetadata Request (From Kafka Controller) — handleUpdateMetadataRequest Handler
handleUpdateMetadataRequest(
request: RequestChannel.Request): Unit
handleUpdateMetadataRequest takes the UpdateMetadataRequest from (the body of) the RequestChannel.Request.
When authorized for cluster action and not isBrokerEpochStale, handleUpdateMetadataRequest requests the following:
-
ReplicaManager to maybeUpdateMetadataCache (that gives deleted partitions)
-
AdminManager to tryCompleteDelayedTopicOperations for all the topics (based on the partitionStates of the
UpdateMetadataRequest)
handleUpdateMetadataRequest updates quotas…FIXME
handleUpdateMetadataRequest requests the ReplicaManager to tryCompleteElection for every partition (based on the partitionStates of the UpdateMetadataRequest).
In the end, handleUpdateMetadataRequest sendResponseExemptThrottle (with a no-error UpdateMetadataResponse).
|
Note
|
handleUpdateMetadataRequest is used exclusively when KafkaApis is requested to handle a UpdateMetadata request.
|
Handling OffsetCommitRequest — handleOffsetCommitRequest Handler
handleOffsetCommitRequest(request: RequestChannel.Request): Unit
handleOffsetCommitRequest takes the OffsetCommitRequest from (the body of) the RequestChannel.Request.
If authorized, handleOffsetCommitRequest simply requests the GroupCoordinator to handleCommitOffsets (with the sendResponseCallback).
|
Note
|
If authorized, handleOffsetCommitRequest branches off per API version (i.e. 0 to store offsets in Zookeeper and 1 and beyond). The API version 0 is not described here.
|
If not authorized, handleOffsetCommitRequest…FIXME
|
Note
|
handleOffsetCommitRequest is used exclusively when KafkaApis is requested to handle an OffsetCommit request.
|
sendResponseCallback Method
sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]): Unit
sendResponseCallback prints out the following DEBUG message to the logs for offsets with errors (i.e. unauthorized topics to read or non-existing topics):
Offset commit request with correlation id [correlationId] from client [clientId] on partition [topicPartition] failed due to [exceptionName]
In the end, sendResponseCallback sendResponseMaybeThrottle a new OffsetCommitResponse.
createInternalTopic Internal Method
createInternalTopic(
topic: String): MetadataResponse.TopicMetadata
createInternalTopic…FIXME
|
Note
|
createInternalTopic is used when KafkaApis is requested to getOrCreateInternalTopic and getTopicMetadata (for metadata of consumer_offsets and transaction_state internal topics).
|
sendResponseExemptThrottle Internal Method
sendResponseExemptThrottle(
request: RequestChannel.Request,
response: AbstractResponse,
onComplete: Option[Send => Unit] = None): Unit
sendResponseExemptThrottle…FIXME
|
Note
|
sendResponseExemptThrottle is used when KafkaApis is requested to…FIXME
|
getOrCreateInternalTopic Internal Method
getOrCreateInternalTopic(
topic: String,
listenerName: ListenerName): MetadataResponse.TopicMetadata
getOrCreateInternalTopic requests the MetadataCache for getTopicMetadata for the input topic (and the ListenerName).
In the end, getOrCreateInternalTopic returns the TopicMetadata if available or createInternalTopic.
|
Note
|
getOrCreateInternalTopic is used exclusively when KafkaApis is requested to handle a FindCoordinator request (and requests the metadata of consumer_offsets and transaction_state internal topics).
|
getTopicMetadata Internal Method
getTopicMetadata(
allowAutoTopicCreation: Boolean,
topics: Set[String],
listenerName: ListenerName,
errorUnavailableEndpoints: Boolean,
errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata]
getTopicMetadata…FIXME
|
Note
|
getTopicMetadata is used exclusively when KafkaApis is requested to handle a Metadata request.
|
Handling DescribeGroups Request — handleDescribeGroupRequest Handler
handleDescribeGroupRequest(request: RequestChannel.Request): Unit
handleDescribeGroupRequest…FIXME
|
Note
|
handleDescribeGroupRequest is used exclusively when KafkaApis is requested to handle a DescribeGroups request.
|
Handling AlterConfigs Request — handleAlterConfigsRequest Handler
handleAlterConfigsRequest(request: RequestChannel.Request): Unit
handleAlterConfigsRequest…FIXME
|
Note
|
handleAlterConfigsRequest is used exclusively when KafkaApis is requested to handle a AlterConfigs request.
|
createTopic Internal Method
createTopic(
topic: String,
numPartitions: Int,
replicationFactor: Int,
properties: Properties = new Properties()): MetadataResponse.TopicMetadata
createTopic…FIXME
|
Note
|
createTopic is used when KafkaApis is requested to createInternalTopic and getTopicMetadata.
|
Handling FindCoordinatorRequest — handleFindCoordinatorRequest Handler
handleFindCoordinatorRequest(request: RequestChannel.Request): Unit
handleFindCoordinatorRequest takes the FindCoordinatorRequest from the body (of the RequestChannel.Request).
handleFindCoordinatorRequest checks permissions…FIXME
For an authorized request, handleFindCoordinatorRequest branches off per CoordinatorType, i.e. GROUP or TRANSACTION.
For GROUP coordinator type, handleFindCoordinatorRequest does the following:
-
Requests the GroupCoordinator for partitionFor the coordinator key (of the
FindCoordinatorRequest) -
getOrCreateInternalTopic for __consumer_offsets topic
For TRANSACTION coordinator type, handleFindCoordinatorRequest does the following:
-
Requests the TransactionCoordinator for partitionFor (for the
coordinatorKeyof theFindCoordinatorRequest) -
getOrCreateInternalTopic for __transaction_state topic
In the end, handleFindCoordinatorRequest sendResponseMaybeThrottle with a new FindCoordinatorResponse.
You should see the following TRACE message in the logs:
Sending FindCoordinator response [body] for correlation id [correlationId] to client [clientId].
|
Note
|
handleFindCoordinatorRequest is used exclusively when KafkaApis is requested to handle a FindCoordinator request.
|
Handling JoinGroupRequest — handleJoinGroupRequest Handler
handleJoinGroupRequest(request: RequestChannel.Request): Unit
handleJoinGroupRequest takes the JoinGroupRequest from the body (of the RequestChannel.Request) and simply requests the GroupCoordinator to handleJoinGroup (with sendResponseCallback to handle the response).
|
Note
|
handleJoinGroupRequest is used exclusively when KafkaApis is requested to handle a JoinGroup request.
|
Handling JoinGroup Response — sendResponseCallback Method
sendResponseCallback(joinResult: JoinGroupResult): Unit
sendResponseCallback creates a new JoinGroupResponse for the given JoinGroupResult and prints out the following TRACE message to the logs:
Sending join group response [responseBody] for correlation id [correlationId] to client [clientId].
In the end, sendResponseCallback sendResponseMaybeThrottle with the new JoinGroupResponse.
Handling AddOffsetsToTxn Request — handleAddOffsetsToTxnRequest Handler
handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit
handleAddOffsetsToTxnRequest…FIXME
|
Note
|
handleAddOffsetsToTxnRequest is used exclusively when KafkaApis is requested to handle a AddOffsetsToTxn request.
|
Handling Produce Request — handleProduceRequest Handler
handleProduceRequest(
request: RequestChannel.Request): Unit
In summary, handleProduceRequest takes the ProduceRequest from the body (of the RequestChannel.Request) and requests the ReplicaManager to appendRecords (with isFromClient flag enabled).
|
Note
|
internalTopicsAllowed flag (when the ReplicaManager is requested to appendRecords) is enabled (true) only when the client ID is __admin_client.
|
handleProduceRequest…FIXME
handleProduceRequest…FIXME
|
Note
|
handleProduceRequest is used when KafkaApis is requested to handle a Produce request.
|
handleWriteTxnMarkersRequest Handler
handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit
In summary, handleWriteTxnMarkersRequest requests the ReplicaManager to getMagic followed by appendRecords (with isFromClient flag disabled).
handleWriteTxnMarkersRequest…FIXME
|
Note
|
handleWriteTxnMarkersRequest is used exclusively when KafkaApis is requested to handle a WriteTxnMarkers request.
|
handleDeleteRecordsRequest Handler
handleDeleteRecordsRequest(request: RequestChannel.Request): Unit
In summary, handleDeleteRecordsRequest requests the ReplicaManager to deleteRecords.
handleDeleteRecordsRequest…FIXME
|
Note
|
handleDeleteRecordsRequest is used exclusively when KafkaApis is requested to handle a DeleteRecords request.
|
handleOffsetForLeaderEpochRequest Handler
handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit
In summary, handleOffsetForLeaderEpochRequest requests the ReplicaManager to lastOffsetForLeaderEpoch.
handleOffsetForLeaderEpochRequest…FIXME
|
Note
|
handleOffsetForLeaderEpochRequest is used exclusively when KafkaApis is requested to handle a OffsetForLeaderEpoch request.
|
handleListOffsetRequest Handler
handleListOffsetRequest(request: RequestChannel.Request): Unit
In summary, handleListOffsetRequest requests the ReplicaManager to fetchOffsetForTimestamp.
handleListOffsetRequest…FIXME
|
Note
|
handleListOffsetRequest is used exclusively when KafkaApis is requested to handle a ListOffsets request.
|
handleOffsetDeleteRequest Handler
handleOffsetDeleteRequest(
request: RequestChannel.Request): Unit
In summary, handleOffsetDeleteRequest…FIXME
handleOffsetDeleteRequest…FIXME
|
Note
|
handleOffsetDeleteRequest is used when KafkaApis is requested to handle a OffsetDelete request.
|
handleAlterPartitionReassignmentsRequest Handler
handleAlterPartitionReassignmentsRequest(
request: RequestChannel.Request): Unit
In summary, handleAlterPartitionReassignmentsRequest…FIXME
handleAlterPartitionReassignmentsRequest…FIXME
|
Note
|
handleAlterPartitionReassignmentsRequest is used when KafkaApis is requested to handle a AlterPartitionReassignments request.
|
handleListPartitionReassignmentsRequest Handler
handleListPartitionReassignmentsRequest(
request: RequestChannel.Request): Unit
In summary, handleListPartitionReassignmentsRequest…FIXME
handleListPartitionReassignmentsRequest…FIXME
|
Note
|
handleListPartitionReassignmentsRequest is used when KafkaApis is requested to handle a ListPartitionReassignments request.
|
isAuthorizedClusterAction Internal Method
isAuthorizedClusterAction(request: RequestChannel.Request): Boolean
isAuthorizedClusterAction simply authorize with ClusterAction operation and ClusterResource resource.
|
Note
|
isAuthorizedClusterAction is used when…FIXME
|
updateRecordConversionStats Internal Method
updateRecordConversionStats(
request: RequestChannel.Request,
tp: TopicPartition,
conversionStats: RecordConversionStats): Unit
updateRecordConversionStats…FIXME
|
Note
|
updateRecordConversionStats is used when…FIXME
|
Asserting Permissions for Cluster Action — authorizeClusterAction Method
authorizeClusterAction(request: RequestChannel.Request): Unit
authorizeClusterAction simply asserts that the RequestChannel.Request is authorized to execute ClusterAction on a ClusterResource. If so, authorizeClusterAction does nothing and returns.
If not authorized, authorizeClusterAction throws a ClusterAuthorizationException:
Request [request] is not authorized.
|
Note
|
authorizeClusterAction is used when KafkaApis is requested to handleLeaderAndIsrRequest, handleStopReplicaRequest, handleUpdateMetadataRequest, handleControlledShutdownRequest, and handleWriteTxnMarkersRequest.
|
isBrokerEpochStale Internal Method
isBrokerEpochStale(
brokerEpochInRequest: Long): Boolean
isBrokerEpochStale…FIXME
|
Note
|
isBrokerEpochStale is used when KafkaApis is requested to handleLeaderAndIsrRequest, handleStopReplicaRequest, and handleUpdateMetadataRequest.
|
filterAuthorized Internal Method
filterAuthorized(
request: RequestChannel.Request,
operation: AclOperation,
resourceType: ResourceType,
resourceNames: Seq[String],
logIfAllowed: Boolean = true,
logIfDenied: Boolean = true): Set[String]
filterAuthorized…FIXME
|
Note
|
filterAuthorized is used when…FIXME
|
authorizedOperations Internal Method
authorizedOperations(
request: RequestChannel.Request,
resource: Resource): Int
authorizedOperations…FIXME
| Request | Resource Type | Resource Name |
|---|---|---|
|
kafka-cluster |
|
|
Topic name |
|
|
Group ID |
Throwing ClusterAuthorizationException for Unauthorized Cluster Operation — authorizeClusterOperation Internal Method
authorizeClusterOperation(
request: RequestChannel.Request,
operation: AclOperation): Unit
authorizeClusterOperation simply throws a ClusterAuthorizationException when the given request is not authorized for the given operation on CLUSTER resource and the name as kafka-cluster.
Request [request] is not authorized.
| Request | AclOperation |
|---|---|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|