KafkaApis — API Request Handler

KafkaApis is responsible for handling requests to a Kafka broker (by means of handlers).

KafkaApis is created when KafkaServer is requested to start (for KafkaRequestHandlerPool).

KafkaApis.png
Figure 1. KafkaApis is Created for KafkaRequestHandlerPool when KafkaServer Starts Up

When created, KafkaApis is given broker services (from the owning KafkaServer) that are used to handle requests (from producers, consumers, brokers, administrative clients, etc.)

Some requests are meant for the controller broker and simply do nothing (no-ops) when received by a regular non-controller broker.

Table 1. KafkaApis’s API Keys and Handlers
API Key Handler

AddOffsetsToTxn

handleAddOffsetsToTxnRequest

AddPartitionsToTxn

handleAddPartitionToTxnRequest

AlterConfigs

handleAlterConfigsRequest

AlterPartitionReassignments

handleAlterPartitionReassignmentsRequest

AlterReplicaLogDirs

handleAlterReplicaLogDirsRequest

ApiVersions

handleApiVersionsRequest

ControlledShutdown

handleControlledShutdownRequest

CreateAcls

handleCreateAcls

CreateDelegationToken

handleCreateTokenRequest

CreatePartitions

handleCreatePartitionsRequest

CreateTopics

handleCreateTopicsRequest

DeleteAcls

handleDeleteAcls

DeleteGroups

handleDeleteGroupsRequest

DeleteRecords

handleDeleteRecordsRequest

DeleteTopics

handleDeleteTopicsRequest

DescribeConfigs

handleDescribeConfigsRequest

DescribeGroups

handleDescribeGroupRequest

DescribeLogDirs

handleDescribeLogDirsRequest

DescribeAcls

handleDescribeAcls

DescribeDelegationToken

handleDescribeTokensRequest

ElectLeaders

handleElectReplicaLeader

EndTxn

handleEndTxnRequest

Fetch

handleFetchRequest

ExpireDelegationToken

handleExpireTokenRequest

FindCoordinator

handleFindCoordinatorRequest

Heartbeat

handleHeartbeatRequest

InitProducerId

handleInitProducerIdRequest

IncrementalAlterConfigs

handleIncrementalAlterConfigsRequest

JoinGroup

handleJoinGroupRequest

LeaderAndIsr

Cluster action / handleLeaderAndIsrRequest

LeaveGroup

handleLeaveGroupRequest

ListGroups

handleListGroupsRequest

ListOffsets

handleListOffsetRequest

ListPartitionReassignments

handleListPartitionReassignmentsRequest

Metadata

handleTopicMetadataRequest

OffsetCommit

handleOffsetCommitRequest

OffsetDelete

handleOffsetDeleteRequest

OffsetFetch

handleOffsetFetchRequest

OffsetForLeaderEpoch

handleOffsetForLeaderEpochRequest

Produce

handleProduceRequest

RenewDelegationToken

handleRenewTokenRequest

SaslAuthenticate

handleSaslAuthenticateRequest

SaslHandshake

handleSaslHandshakeRequest

StopReplica

Cluster action / handleStopReplicaRequest

SyncGroup

handleSyncGroupRequest

TxnOffsetCommit

handleTxnOffsetCommitRequest

UpdateMetadata

Cluster action / handleUpdateMetadataRequest

WriteTxnMarkers

handleWriteTxnMarkersRequest

Tip

Enable ALL logging levels for kafka.server.KafkaApis logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.server.KafkaApis=ALL

Refer to Logging.


Please note that Kafka comes with a preconfigured kafka.server.KafkaApis logger in config/log4j.properties:

log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
log4j.additivity.kafka.server.KafkaApis=false

That means that the logs of KafkaApis go to logs/kafka-request.log file at TRACE logging level and are not added to the main logs (per log4j.additivity being off).

KafkaApis and Authorizer

KafkaApis may be given an Authorizer when created.

The Authorizer is used for the following:

KafkaApis and AdminZkClient

adminZkClient: AdminZkClient

When created, KafkaApis creates a AdminZkClient that is used to create a topic for the following handlers:

Routing API Requests to Respective Handlers — handle Method

handle(
  request: RequestChannel.Request): Unit

handle first prints out the following TRACE message to the logs:

Handling request:[request] from connection [id];securityProtocol:[protocol],principal:[principal]

handle then relays the input RequestChannel.Request to the corresponding handler per the apiKey (from the header of the input request).

Note
handle is used exclusively when KafkaRequestHandler thread is requested to run.

Handling ElectLeaders Request — handleElectReplicaLeader Handler

handleElectReplicaLeader(
  request: RequestChannel.Request): Unit
Caution
FIXME Describe me again

In summary, handleElectPreferredReplicaLeader requests the ReplicaManager to electPreferredLeaders.

Internally, handleElectPreferredReplicaLeader…​FIXME

Note
handleElectPreferredReplicaLeader is used exclusively when KafkaApis is requested to handle a ElectPreferredLeaders request.

Handling LeaderAndIsr Request — handleLeaderAndIsrRequest Handler

handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit

In summary, handleLeaderAndIsrRequest requests the ReplicaManager to become the leader or a follower (of partitions).

Internally, handleLeaderAndIsrRequest takes the LeaderAndIsrRequest from (the body of) the RequestChannel.Request.

When authorized for cluster action and not isBrokerEpochStale, handleLeaderAndIsrRequest requests the ReplicaManager to become the leader or a follower (of partitions) (with the onLeadershipChange callback that gets the Partitions of which the broker is a new leader or a follower).

handleLeaderAndIsrRequest isBrokerEpochStale and…​FIXME

Note
handleLeaderAndIsrRequest is used exclusively when KafkaApis is requested to handle a LeaderAndIsr request.

onLeadershipChange Callback

onLeadershipChange(
  updatedLeaders: Iterable[Partition],
  updatedFollowers: Iterable[Partition]): Unit

onLeadershipChange…​FIXME

Handling AlterReplicaLogDirs Request — handleAlterReplicaLogDirsRequest Handler

handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit

In summary, handleAlterReplicaLogDirsRequest requests the ReplicaManager to alterReplicaLogDirs.

handleAlterReplicaLogDirsRequest…​FIXME

Note
handleAlterReplicaLogDirsRequest is used exclusively when KafkaApis is requested to handle a AlterReplicaLogDirs request.

Handling CreateTopics Request — handleCreateTopicsRequest Handler

handleCreateTopicsRequest(request: RequestChannel.Request): Unit

handleCreateTopicsRequest…​FIXME

handleCreateTopicsRequest checks whether KafkaController is active…​FIXME

handleCreateTopicsRequest authorizes the Create operation for ClusterResource…​FIXME

In the end, handleCreateTopicsRequest requests AdminManager to create the topics.

Note
handleCreateTopicsRequest is used exclusively when KafkaApis is requested to handle a CreateTopics request.

Handling OffsetFetch Request — handleOffsetFetchRequest Handler

handleOffsetFetchRequest(request: RequestChannel.Request): Unit

handleOffsetFetchRequest…​FIXME

Note
handleOffsetFetchRequest is used exclusively when KafkaApis is requested to handle a OffsetFetch request.

Handling Fetch Request — handleFetchRequest Handler

handleFetchRequest(
  request: RequestChannel.Request): Unit

In summary, handleFetchRequest requests the ReplicaManager to fetch messages.

handleFetchRequest…​FIXME

Note
handleFetchRequest is used when KafkaApis is requested to handle a Fetch request.

Handling CreateAcls Request — handleCreateAcls Handler

handleCreateAcls(
  request: RequestChannel.Request): Unit

handleCreateAcls…​FIXME

Note
handleCreateAcls is used when KafkaApis is requested to handle a CreateAcls request.

Handling Metadata Request — handleTopicMetadataRequest Handler

handleTopicMetadataRequest(
  request: RequestChannel.Request): Unit

handleTopicMetadataRequest takes the MetadataRequest from (the body of) the RequestChannel.Request.

handleTopicMetadataRequest requests the MetadataCache for getAllTopics or its subset (per topics attribute of the MetadataRequest).

handleTopicMetadataRequest filters out the topics for which the current principal (user) is not authorized to execute Describe operation.

For every authorized topic, handleTopicMetadataRequest…​FIXME

handleTopicMetadataRequest creates a MetadataResponse.TopicMetadata with TOPIC_AUTHORIZATION_FAILED for every unauthorizedForCreateTopics and unauthorizedForDescribeTopics.

handleTopicMetadataRequest getTopicMetadata if there are authorizedTopics.

handleTopicMetadataRequest prints out the following TRACE message to the logs:

Sending topic metadata [completeTopicMetadata] and brokers [brokers] for correlation id [correlationId] to client [clientId]

In the end, handleTopicMetadataRequest sendResponseMaybeThrottle with a new MetadataResponse.

Note
handleTopicMetadataRequest is used exclusively when KafkaApis is requested to handle a Metadata request.

Authorizing Request for Operation on Resource — authorize Internal Method

authorize(
  request: RequestChannel.Request,
  operation: AclOperation,
  resourceType: ResourceType,
  resourceName: String,
  logIfAllowed: Boolean = true,
  logIfDenied: Boolean = true,
  refCount: Int = 1): Boolean

authorize simply requests the Authorizer (when defined) to authorize the given AclOperation on a broker resource (described by the ResourceType and resourceName).

authorize is positive (true) when the Authorizer returned ALLOWED.

Note
The Authorizer is created in KafkaServer (when the KafkaApis is created). It is configured using authorizer.class.name configuration property which is empty by default and so all operations are authorized.
Request AclOperation Resource Type Resource Name

OffsetCommit

READ

GROUP

groupId

Produce

WRITE

TRANSACTIONAL_ID

transactionalId

Produce

IDEMPOTENT_WRITE

CLUSTER

kafka-cluster

Fetch (from followers)

CLUSTER_ACTION

CLUSTER

kafka-cluster

Metadata (for auto-create topics)

CREATE

CLUSTER

kafka-cluster

Metadata

DESCRIBE

CLUSTER

kafka-cluster

OffsetFetch

DESCRIBE

CLUSTER

kafka-cluster

FindCoordinator

DESCRIBE

GROUP

Coordinator key

FindCoordinator

DESCRIBE

TRANSACTIONAL_ID

Coordinator key

DescribeGroups

DESCRIBE

GROUP

Group ID

ListGroups

DESCRIBE

CLUSTER

kafka-cluster

ListGroups

DESCRIBE

GROUP

Group ID

JoinGroup

READ

GROUP

Group ID

SyncGroup

READ

GROUP

Group ID

DeleteGroups

DELETE

GROUP

Group ID

Heartbeat

READ

GROUP

Group ID

LeaveGroup

READ

GROUP

Group ID

CreateTopics

CREATE

CLUSTER

kafka-cluster

InitProducerId

WRITE

TRANSACTIONAL_ID

Transactional ID

InitProducerId

IDEMPOTENT_WRITE

CLUSTER

kafka-cluster

EndTxn

WRITE

TRANSACTIONAL_ID

Transactional ID

AddPartitionsToTxn

WRITE

TRANSACTIONAL_ID

Transactional ID

AddOffsetsToTxn

WRITE

TRANSACTIONAL_ID

Transactional ID

AddOffsetsToTxn

READ

GROUP

Group ID

TxnOffsetCommit

WRITE

TRANSACTIONAL_ID

Transactional ID

TxnOffsetCommit

READ

GROUP

Group ID

OffsetForLeaderEpoch

CLUSTER_ACTION

CLUSTER

kafka-cluster

AlterConfigs (for brokers)

ALTER_CONFIGS

CLUSTER

kafka-cluster

AlterConfigs (for topics)

ALTER_CONFIGS

TOPIC

Topic name

IncrementalAlterConfigs (for brokers)

ALTER_CONFIGS

CLUSTER

kafka-cluster

IncrementalAlterConfigs (for topics)

ALTER_CONFIGS

TOPIC

Topic name

DescribeConfigs (for brokers)

DESCRIBE_CONFIGS

CLUSTER

kafka-cluster

DescribeConfigs (for topics)

DESCRIBE_CONFIGS

TOPIC

Topic name

AlterReplicaLogDirs

ALTER

CLUSTER

kafka-cluster

DescribeLogDirs

DESCRIBE

CLUSTER

kafka-cluster

DescribeDelegationToken

DESCRIBE

DELEGATION_TOKEN

Token ID

ElectLeaders

ALTER

CLUSTER

kafka-cluster

OffsetDelete

DELETE

GROUP

Group ID

Handling CreatePartitions Request — handleCreatePartitionsRequest Handler

handleCreatePartitionsRequest(request: RequestChannel.Request): Unit

handleCreatePartitionsRequest…​FIXME

Note
handleCreatePartitionsRequest is used when…​FIXME

Handling DeleteTopics Request — handleDeleteTopicsRequest Handler

handleDeleteTopicsRequest(
  request: RequestChannel.Request): Unit

handleDeleteTopicsRequest…​FIXME

Note
handleDeleteTopicsRequest is used when…​FIXME

Handling TxnOffsetCommit Request — handleTxnOffsetCommitRequest Handler

handleTxnOffsetCommitRequest(
  request: RequestChannel.Request): Unit

handleTxnOffsetCommitRequest…​FIXME

Note
handleTxnOffsetCommitRequest is used when…​FIXME

Handling SyncGroup Request — handleSyncGroupRequest Handler

handleSyncGroupRequest(
  request: RequestChannel.Request): Unit

handleSyncGroupRequest…​FIXME

Note
handleSyncGroupRequest is used when…​FIXME

Handling SaslHandshake Request — handleSaslHandshakeRequest Handler

handleSaslHandshakeRequest(
  request: RequestChannel.Request): Unit

handleSaslHandshakeRequest…​FIXME

Note
handleSaslHandshakeRequest is used when…​FIXME

Handling SaslAuthenticate Request — handleSaslAuthenticateRequest Handler

handleSaslAuthenticateRequest(
  request: RequestChannel.Request): Unit

handleSaslAuthenticateRequest…​FIXME

Note
handleSaslAuthenticateRequest is used when…​FIXME

Handling AddPartitionToTxn Request — handleAddPartitionToTxnRequest Handler

handleAddPartitionToTxnRequest(
  request: RequestChannel.Request): Unit

handleAddPartitionToTxnRequest…​FIXME

Note
handleAddPartitionToTxnRequest is used when…​FIXME

Handling ApiVersions Request — handleApiVersionsRequest Handler

handleApiVersionsRequest(
  request: RequestChannel.Request): Unit

handleApiVersionsRequest…​FIXME

Note
handleApiVersionsRequest is used when…​FIXME

Handling CreateToken Request — handleCreateTokenRequest Handler

handleCreateTokenRequest(
  request: RequestChannel.Request): Unit

handleCreateTokenRequest…​FIXME

Note
handleCreateTokenRequest is used when…​FIXME

Handling DeleteAcls Request — handleDeleteAcls Handler

handleDeleteAcls(
  request: RequestChannel.Request): Unit

handleDeleteAcls…​FIXME

Note
handleDeleteAcls is used when…​FIXME

Handling DeleteGroups Request — handleDeleteGroupsRequest Handler

handleDeleteGroupsRequest(
  request: RequestChannel.Request): Unit

handleDeleteGroupsRequest…​FIXME

Note
handleDeleteGroupsRequest is used when…​FIXME

Handling DescribeConfigs Request — handleDescribeConfigsRequest Handler

handleDescribeConfigsRequest(
  request: RequestChannel.Request): Unit

handleDescribeConfigsRequest takes the DescribeConfigsRequest from (the body of) the given RequestChannel.Request.

For every authorized operation, handleDescribeConfigsRequest requests the AdminManager to describeConfigs.

In the end, handleDescribeConfigsRequest sendResponseMaybeThrottle with a new DescribeConfigsResponse.

Note
handleDescribeConfigsRequest is used exclusively when KafkaApis is requested to handle a DescribeConfigs request.

Handling DescribeAcls Request — handleDescribeAcls Handler

handleDescribeAcls(
  request: RequestChannel.Request): Unit

handleDescribeAcls authorizeClusterOperation for the DESCRIBE operation.

handleDescribeAcls…​FIXME

Note
handleDescribeAcls is used when…​FIXME

Handling DescribeTokens Request — handleDescribeTokensRequest Handler

handleDescribeTokensRequest(
  request: RequestChannel.Request): Unit

handleDescribeTokensRequest…​FIXME

Note
handleDescribeTokensRequest is used when…​FIXME

Handling EndTxn Request — handleEndTxnRequest Handler

handleEndTxnRequest(
  request: RequestChannel.Request): Unit

handleEndTxnRequest…​FIXME

Note
handleEndTxnRequest is used when…​FIXME

Handling ExpireToken Request — handleExpireTokenRequest Handler

handleExpireTokenRequest(
  request: RequestChannel.Request): Unit

handleExpireTokenRequest…​FIXME

Note
handleExpireTokenRequest is used when…​FIXME

Handling Heartbeat Request — handleHeartbeatRequest Handler

handleHeartbeatRequest(
  request: RequestChannel.Request): Unit

handleHeartbeatRequest…​FIXME

Note
handleHeartbeatRequest is used when…​FIXME

Handling InitProducerId Request — handleInitProducerIdRequest Handler

handleInitProducerIdRequest(
  request: RequestChannel.Request): Unit

handleInitProducerIdRequest…​FIXME

Note
handleInitProducerIdRequest is used when…​FIXME

Handling IncrementalAlterConfigs Request — handleIncrementalAlterConfigsRequest Handler

handleIncrementalAlterConfigsRequest(
  request: RequestChannel.Request): Unit

handleIncrementalAlterConfigsRequest…​FIXME

Note
handleIncrementalAlterConfigsRequest is used when…​FIXME

Handling ListGroups Request — handleListGroupsRequest Handler

handleListGroupsRequest(
  request: RequestChannel.Request): Unit

handleListGroupsRequest…​FIXME

Note
handleListGroupsRequest is used when…​FIXME

Handling LeaveGroup Request — handleLeaveGroupRequest Handler

handleLeaveGroupRequest(
  request: RequestChannel.Request): Unit

handleLeaveGroupRequest…​FIXME

Note
handleLeaveGroupRequest is used when…​FIXME

Handling RenewToken Request — handleRenewTokenRequest Handler

handleRenewTokenRequest(
  request: RequestChannel.Request): Unit

handleRenewTokenRequest…​FIXME

Note
handleRenewTokenRequest is used when…​FIXME

Handling ControlledShutdown Request — handleControlledShutdownRequest Handler

handleControlledShutdownRequest(request: RequestChannel.Request): Unit

handleControlledShutdownRequest…​FIXME

Note
handleControlledShutdownRequest is used when…​FIXME

fetchOffsetForTimestamp Internal Method

fetchOffsetForTimestamp(topicPartition: TopicPartition, timestamp: Long): Option[TimestampOffset]

fetchOffsetForTimestamp…​FIXME

Note
fetchOffsetForTimestamp is used exclusively when KafkaApis is requested to handleListOffsetRequestV1AndAbove.

handleListOffsetRequestV0 Internal Method

handleListOffsetRequestV0(
  request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData]

handleListOffsetRequestV0…​FIXME

Note
handleListOffsetRequestV0 is used exclusively when KafkaApis is requested to handleListOffsetRequest (for the API version 0).

handleListOffsetRequestV1AndAbove Internal Method

handleListOffsetRequestV1AndAbove(
  request: RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData]

handleListOffsetRequestV1AndAbove…​FIXME

Note
handleListOffsetRequestV1AndAbove is used exclusively when KafkaApis is requested to handleListOffsetRequest (for the API version 1 or above).

Handling DescribeLogDirs Request — handleDescribeLogDirsRequest Handler

handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit

In summary, handleDescribeLogDirsRequest requests the ReplicaManager to describeLogDirs.

Internally, handleDescribeLogDirsRequest takes the DescribeLogDirsRequest from the body (of the RequestChannel.Request).

handleDescribeLogDirsRequest branches off per whether the DescribeLogDirsRequest was for isAllTopicPartitions or not.

Note
handleDescribeLogDirsRequest returns an empty list of log directories when the request is not authorized.

handleDescribeLogDirsRequest then requests the ReplicaManager to describeLogDirs with the requested TopicPartitions.

In the end, handleDescribeLogDirsRequest sendResponseMaybeThrottle with a DescribeLogDirsResponse and the LogDirInfos.

Note
handleDescribeLogDirsRequest is used exclusively when KafkaApis is requested to handle a DescribeLogDirs request.

sendResponseMaybeThrottle Internal Method

sendResponseMaybeThrottle(
  request: RequestChannel.Request,
  createResponse: Int => AbstractResponse,
  onComplete: Option[Send => Unit] = None): Unit

sendResponseMaybeThrottle…​FIXME

Note
sendResponseMaybeThrottle is used when…​FIXME

fetchOffsetsBefore Method

fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long]

fetchOffsetsBefore…​FIXME

Note
fetchOffsetsBefore is used exclusively when KafkaApis is requested to fetchOffsets.

fetchOffsets Method

fetchOffsets(
  logManager: LogManager,
  topicPartition: TopicPartition,
  timestamp: Long,
  maxNumOffsets: Int): Seq[Long]

fetchOffsets…​FIXME

Note
fetchOffsets is used exclusively when KafkaApis is requested to handleListOffsetRequestV0.

Handling StopReplica Request — handleStopReplicaRequest Handler

handleStopReplicaRequest(request: RequestChannel.Request): Unit

In summary, handleStopReplicaRequest requests the ReplicaManager to stopReplicas.

handleStopReplicaRequest…​FIXME

Note
handleStopReplicaRequest is used exclusively when KafkaApis is requested to handle a StopReplica request.

Handling UpdateMetadata Request (From Kafka Controller) — handleUpdateMetadataRequest Handler

handleUpdateMetadataRequest(
  request: RequestChannel.Request): Unit

handleUpdateMetadataRequest takes the UpdateMetadataRequest from (the body of) the RequestChannel.Request.

When authorized for cluster action and not isBrokerEpochStale, handleUpdateMetadataRequest requests the following:

handleUpdateMetadataRequest updates quotas…​FIXME

handleUpdateMetadataRequest requests the ReplicaManager to tryCompleteElection for every partition (based on the partitionStates of the UpdateMetadataRequest).

In the end, handleUpdateMetadataRequest sendResponseExemptThrottle (with a no-error UpdateMetadataResponse).

Note
handleUpdateMetadataRequest is used exclusively when KafkaApis is requested to handle a UpdateMetadata request.

Handling OffsetCommitRequest — handleOffsetCommitRequest Handler

handleOffsetCommitRequest(request: RequestChannel.Request): Unit

handleOffsetCommitRequest takes the OffsetCommitRequest from (the body of) the RequestChannel.Request.

If authorized, handleOffsetCommitRequest simply requests the GroupCoordinator to handleCommitOffsets (with the sendResponseCallback).

Note
If authorized, handleOffsetCommitRequest branches off per API version (i.e. 0 to store offsets in Zookeeper and 1 and beyond). The API version 0 is not described here.

If not authorized, handleOffsetCommitRequest…​FIXME

Note
handleOffsetCommitRequest is used exclusively when KafkaApis is requested to handle an OffsetCommit request.

sendResponseCallback Method

sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]): Unit

sendResponseCallback prints out the following DEBUG message to the logs for offsets with errors (i.e. unauthorized topics to read or non-existing topics):

Offset commit request with correlation id [correlationId] from client [clientId] on partition [topicPartition] failed due to [exceptionName]

In the end, sendResponseCallback sendResponseMaybeThrottle a new OffsetCommitResponse.

createInternalTopic Internal Method

createInternalTopic(
  topic: String): MetadataResponse.TopicMetadata

createInternalTopic…​FIXME

Note
createInternalTopic is used when KafkaApis is requested to getOrCreateInternalTopic and getTopicMetadata (for metadata of consumer_offsets and transaction_state internal topics).

sendResponseExemptThrottle Internal Method

sendResponseExemptThrottle(
  request: RequestChannel.Request,
  response: AbstractResponse,
  onComplete: Option[Send => Unit] = None): Unit

sendResponseExemptThrottle…​FIXME

Note
sendResponseExemptThrottle is used when KafkaApis is requested to…​FIXME

getOrCreateInternalTopic Internal Method

getOrCreateInternalTopic(
  topic: String,
  listenerName: ListenerName): MetadataResponse.TopicMetadata

getOrCreateInternalTopic requests the MetadataCache for getTopicMetadata for the input topic (and the ListenerName).

In the end, getOrCreateInternalTopic returns the TopicMetadata if available or createInternalTopic.

Note
getOrCreateInternalTopic is used exclusively when KafkaApis is requested to handle a FindCoordinator request (and requests the metadata of consumer_offsets and transaction_state internal topics).

getTopicMetadata Internal Method

getTopicMetadata(
  allowAutoTopicCreation: Boolean,
  topics: Set[String],
  listenerName: ListenerName,
  errorUnavailableEndpoints: Boolean,
  errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata]

getTopicMetadata…​FIXME

Note
getTopicMetadata is used exclusively when KafkaApis is requested to handle a Metadata request.

Handling DescribeGroups Request — handleDescribeGroupRequest Handler

handleDescribeGroupRequest(request: RequestChannel.Request): Unit

handleDescribeGroupRequest…​FIXME

Note
handleDescribeGroupRequest is used exclusively when KafkaApis is requested to handle a DescribeGroups request.

Handling AlterConfigs Request — handleAlterConfigsRequest Handler

handleAlterConfigsRequest(request: RequestChannel.Request): Unit

handleAlterConfigsRequest…​FIXME

Note
handleAlterConfigsRequest is used exclusively when KafkaApis is requested to handle a AlterConfigs request.

createTopic Internal Method

createTopic(
  topic: String,
  numPartitions: Int,
  replicationFactor: Int,
  properties: Properties = new Properties()): MetadataResponse.TopicMetadata

createTopic…​FIXME

Note
createTopic is used when KafkaApis is requested to createInternalTopic and getTopicMetadata.

Handling FindCoordinatorRequest — handleFindCoordinatorRequest Handler

handleFindCoordinatorRequest(request: RequestChannel.Request): Unit

handleFindCoordinatorRequest takes the FindCoordinatorRequest from the body (of the RequestChannel.Request).

handleFindCoordinatorRequest checks permissions…​FIXME

For an authorized request, handleFindCoordinatorRequest branches off per CoordinatorType, i.e. GROUP or TRANSACTION.

For GROUP coordinator type, handleFindCoordinatorRequest does the following:

  1. Requests the GroupCoordinator for partitionFor the coordinator key (of the FindCoordinatorRequest)

  2. getOrCreateInternalTopic for __consumer_offsets topic

For TRANSACTION coordinator type, handleFindCoordinatorRequest does the following:

  1. Requests the TransactionCoordinator for partitionFor (for the coordinatorKey of the FindCoordinatorRequest)

  2. getOrCreateInternalTopic for __transaction_state topic

In the end, handleFindCoordinatorRequest sendResponseMaybeThrottle with a new FindCoordinatorResponse.

You should see the following TRACE message in the logs:

Sending FindCoordinator response [body] for correlation id [correlationId] to client [clientId].
Note
handleFindCoordinatorRequest is used exclusively when KafkaApis is requested to handle a FindCoordinator request.

Handling JoinGroupRequest — handleJoinGroupRequest Handler

handleJoinGroupRequest(request: RequestChannel.Request): Unit

handleJoinGroupRequest takes the JoinGroupRequest from the body (of the RequestChannel.Request) and simply requests the GroupCoordinator to handleJoinGroup (with sendResponseCallback to handle the response).

Note
handleJoinGroupRequest is used exclusively when KafkaApis is requested to handle a JoinGroup request.

Handling JoinGroup Response — sendResponseCallback Method

sendResponseCallback(joinResult: JoinGroupResult): Unit

sendResponseCallback creates a new JoinGroupResponse for the given JoinGroupResult and prints out the following TRACE message to the logs:

Sending join group response [responseBody] for correlation id [correlationId] to client [clientId].

In the end, sendResponseCallback sendResponseMaybeThrottle with the new JoinGroupResponse.

Handling AddOffsetsToTxn Request — handleAddOffsetsToTxnRequest Handler

handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit

handleAddOffsetsToTxnRequest…​FIXME

Note
handleAddOffsetsToTxnRequest is used exclusively when KafkaApis is requested to handle a AddOffsetsToTxn request.

Handling Produce Request — handleProduceRequest Handler

handleProduceRequest(
  request: RequestChannel.Request): Unit

In summary, handleProduceRequest takes the ProduceRequest from the body (of the RequestChannel.Request) and requests the ReplicaManager to appendRecords (with isFromClient flag enabled).

Note
internalTopicsAllowed flag (when the ReplicaManager is requested to appendRecords) is enabled (true) only when the client ID is __admin_client.

handleProduceRequest…​FIXME

handleProduceRequest filterAuthorized the request for WRITE operation on a TOPIC resource to the topics (from the ProduceRequest).

handleProduceRequest…​FIXME

Note
handleProduceRequest is used when KafkaApis is requested to handle a Produce request.

handleWriteTxnMarkersRequest Handler

handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit

In summary, handleWriteTxnMarkersRequest requests the ReplicaManager to getMagic followed by appendRecords (with isFromClient flag disabled).

handleWriteTxnMarkersRequest…​FIXME

Note
handleWriteTxnMarkersRequest is used exclusively when KafkaApis is requested to handle a WriteTxnMarkers request.

handleDeleteRecordsRequest Handler

handleDeleteRecordsRequest(request: RequestChannel.Request): Unit

In summary, handleDeleteRecordsRequest requests the ReplicaManager to deleteRecords.

handleDeleteRecordsRequest…​FIXME

Note
handleDeleteRecordsRequest is used exclusively when KafkaApis is requested to handle a DeleteRecords request.

handleOffsetForLeaderEpochRequest Handler

handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit

In summary, handleOffsetForLeaderEpochRequest requests the ReplicaManager to lastOffsetForLeaderEpoch.

handleOffsetForLeaderEpochRequest…​FIXME

Note
handleOffsetForLeaderEpochRequest is used exclusively when KafkaApis is requested to handle a OffsetForLeaderEpoch request.

handleListOffsetRequest Handler

handleListOffsetRequest(request: RequestChannel.Request): Unit

In summary, handleListOffsetRequest requests the ReplicaManager to fetchOffsetForTimestamp.

handleListOffsetRequest…​FIXME

Note
handleListOffsetRequest is used exclusively when KafkaApis is requested to handle a ListOffsets request.

handleOffsetDeleteRequest Handler

handleOffsetDeleteRequest(
  request: RequestChannel.Request): Unit

In summary, handleOffsetDeleteRequest…​FIXME

handleOffsetDeleteRequest…​FIXME

Note
handleOffsetDeleteRequest is used when KafkaApis is requested to handle a OffsetDelete request.

handleAlterPartitionReassignmentsRequest Handler

handleAlterPartitionReassignmentsRequest(
  request: RequestChannel.Request): Unit

In summary, handleAlterPartitionReassignmentsRequest…​FIXME

handleAlterPartitionReassignmentsRequest…​FIXME

Note
handleAlterPartitionReassignmentsRequest is used when KafkaApis is requested to handle a AlterPartitionReassignments request.

handleListPartitionReassignmentsRequest Handler

handleListPartitionReassignmentsRequest(
  request: RequestChannel.Request): Unit

In summary, handleListPartitionReassignmentsRequest…​FIXME

handleListPartitionReassignmentsRequest…​FIXME

Note
handleListPartitionReassignmentsRequest is used when KafkaApis is requested to handle a ListPartitionReassignments request.

isAuthorizedClusterAction Internal Method

isAuthorizedClusterAction(request: RequestChannel.Request): Boolean

isAuthorizedClusterAction simply authorize with ClusterAction operation and ClusterResource resource.

Note
isAuthorizedClusterAction is used when…​FIXME

updateRecordConversionStats Internal Method

updateRecordConversionStats(
  request: RequestChannel.Request,
  tp: TopicPartition,
  conversionStats: RecordConversionStats): Unit

updateRecordConversionStats…​FIXME

Note
updateRecordConversionStats is used when…​FIXME

Asserting Permissions for Cluster Action — authorizeClusterAction Method

authorizeClusterAction(request: RequestChannel.Request): Unit

authorizeClusterAction simply asserts that the RequestChannel.Request is authorized to execute ClusterAction on a ClusterResource. If so, authorizeClusterAction does nothing and returns.

If not authorized, authorizeClusterAction throws a ClusterAuthorizationException:

Request [request] is not authorized.

isBrokerEpochStale Internal Method

isBrokerEpochStale(
  brokerEpochInRequest: Long): Boolean

isBrokerEpochStale…​FIXME

Note
isBrokerEpochStale is used when KafkaApis is requested to handleLeaderAndIsrRequest, handleStopReplicaRequest, and handleUpdateMetadataRequest.

filterAuthorized Internal Method

filterAuthorized(
  request: RequestChannel.Request,
  operation: AclOperation,
  resourceType: ResourceType,
  resourceNames: Seq[String],
  logIfAllowed: Boolean = true,
  logIfDenied: Boolean = true): Set[String]

filterAuthorized…​FIXME

Note
filterAuthorized is used when…​FIXME

authorizedOperations Internal Method

authorizedOperations(
  request: RequestChannel.Request,
  resource: Resource): Int

authorizedOperations…​FIXME

Request Resource Type Resource Name

Metadata

CLUSTER

kafka-cluster

Metadata

TOPIC

Topic name

DescribeGroups

GROUP

Group ID

Throwing ClusterAuthorizationException for Unauthorized Cluster Operation — authorizeClusterOperation Internal Method

authorizeClusterOperation(
  request: RequestChannel.Request,
  operation: AclOperation): Unit

authorizeClusterOperation simply throws a ClusterAuthorizationException when the given request is not authorized for the given operation on CLUSTER resource and the name as kafka-cluster.

Request [request] is not authorized.
Request AclOperation

LeaderAndIsr

CLUSTER_ACTION

StopReplica

CLUSTER_ACTION

UpdateMetadata

CLUSTER_ACTION

ControlledShutdown

CLUSTER_ACTION

WriteTxnMarkers

CLUSTER_ACTION

DescribeAcls

DESCRIBE

CreateAcls

ALTER

DeleteAcls

ALTER

AlterPartitionReassignments

ALTER

ListPartitionReassignments

DESCRIBE

results matching ""

    No results matching ""