KafkaApis — API Request Handler

KafkaApis is responsible for handling API requests (by means of handlers) to a KafkaServer and orchestrating interactions between the services.

Table 1. KafkaApis’s API Keys and Handlers
API Key Handler

AddOffsetsToTxn

handleAddOffsetsToTxnRequest

AlterConfigs

handleAlterConfigsRequest

AlterReplicaLogDirs

handleAlterReplicaLogDirsRequest

ControlledShutdown

handleControlledShutdownRequest

CreatePartitions

handleCreatePartitionsRequest

CreateTopics

handleCreateTopicsRequest

DeleteTopics

handleDeleteTopicsRequest

DescribeGroups

handleDescribeGroupRequest

DescribeLogDirs

handleDescribeLogDirsRequest

DeleteRecords

handleDeleteRecordsRequest

Fetch

handleFetchRequest

FindCoordinator

handleFindCoordinatorRequest

JoinGroup

handleJoinGroupRequest

LeaderAndIsr

handleLeaderAndIsrRequest

ListOffsets

handleListOffsetRequest

Metadata

handleTopicMetadataRequest

OffsetCommit

handleOffsetCommitRequest

OffsetFetch

handleOffsetFetchRequest

OffsetForLeaderEpoch

handleOffsetForLeaderEpochRequest

Produce

handleProduceRequest

StopReplica

handleStopReplicaRequest

UpdateMetadata

handleUpdateMetadataRequest

WriteTxnMarkers

handleWriteTxnMarkersRequest

KafkaApis is created exclusively when KafkaServer is started (and creates KafkaRequestHandlerPool).

KafkaApis.png
Figure 1. KafkaApis is Created for KafkaRequestHandlerPool when KafkaServer Starts Up

KafkaApis uses the ReplicaManager to handle the following API requests:

KafkaApis uses a AdminZkClient for…​FIXME

KafkaApis uses the MetadataCache for the following:

Tip

Enable INFO, DEBUG or TRACE logging levels for kafka.server.KafkaApis logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.server.KafkaApis=TRACE

Refer to Logging.


Please note that Kafka comes with a preconfigured kafka.server.KafkaApis logger in config/log4j.properties:

log4j.appender.requestAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.requestAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.requestAppender.File=${kafka.logs.dir}/kafka-request.log
log4j.appender.requestAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.requestAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.kafka.server.KafkaApis=TRACE, requestAppender
log4j.additivity.kafka.server.KafkaApis=false

That means that the logs of KafkaApis go to logs/kafka-request.log file at TRACE logging level and are not added to the main logs (per log4j.additivity being off).

Routing API Requests to Respective Handlers — handle Method

handle(request: RequestChannel.Request): Unit

handle first prints out the following TRACE message to the logs:

Handling request:[request] from connection [id];securityProtocol:[protocol],principal:[principal]

handle then relays the input RequestChannel.Request to the corresponding handler per the apiKey (from the header of the input request).

Note
handle is used exclusively when KafkaRequestHandler thread is requested to run.

Handling LeaderAndIsr Request — handleLeaderAndIsrRequest Handler

handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit

In summary, handleLeaderAndIsrRequest requests the ReplicaManager to becomeLeaderOrFollower.

handleLeaderAndIsrRequest takes the LeaderAndIsrRequest from the body (of the RequestChannel.Request).

When authorized cluster action, handleLeaderAndIsrRequest requests the ReplicaManager to becomeLeaderOrFollower (with the onLeadershipChange callback).

When unauthorized, handleLeaderAndIsrRequest…​FIXME

Note
handleLeaderAndIsrRequest is used exclusively when KafkaApis is requested to handle a LeaderAndIsr request.

onLeadershipChange Callback

onLeadershipChange(
  updatedLeaders: Iterable[Partition],
  updatedFollowers: Iterable[Partition]): Unit

onLeadershipChange…​FIXME

Handling AlterReplicaLogDirs Request — handleAlterReplicaLogDirsRequest Handler

handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit

In summary, handleAlterReplicaLogDirsRequest requests the ReplicaManager to alterReplicaLogDirs.

handleAlterReplicaLogDirsRequest…​FIXME

Note
handleAlterReplicaLogDirsRequest is used exclusively when KafkaApis is requested to handle a AlterReplicaLogDirs request.

Handling CreateTopics Request — handleCreateTopicsRequest Handler

handleCreateTopicsRequest(request: RequestChannel.Request): Unit

handleCreateTopicsRequest…​FIXME

handleCreateTopicsRequest checks whether KafkaController is active…​FIXME

handleCreateTopicsRequest authorizes the Create operation for ClusterResource…​FIXME

In the end, handleCreateTopicsRequest requests AdminManager to create the topics.

Note
handleCreateTopicsRequest is used exclusively when KafkaApis is requested to handle a CreateTopics request.

Handling OffsetFetch Request — handleOffsetFetchRequest Handler

handleOffsetFetchRequest(request: RequestChannel.Request): Unit

handleOffsetFetchRequest…​FIXME

Note
handleOffsetFetchRequest is used exclusively when KafkaApis is requested to handle a OffsetFetch request.

Handling Fetch Request — handleFetchRequest Handler

handleFetchRequest(request: RequestChannel.Request): Unit

In summary, handleFetchRequest requests the ReplicaManager to fetchMessages.

handleFetchRequest…​FIXME

Note
handleFetchRequest is used exclusively when KafkaApis is requested to handle a Fetch request.

Handling Metadata Request — handleTopicMetadataRequest Handler

handleTopicMetadataRequest(request: RequestChannel.Request): Unit

handleTopicMetadataRequest takes the MetadataRequest from the body (of the RequestChannel.Request).

handleTopicMetadataRequest requests the MetadataCache for getAllTopics or its subset (per topics attribute of the MetadataRequest).

handleTopicMetadataRequest filters out the topics for which the current principal (user) is not authorized to execute Describe operation.

For every authorized topic, handleTopicMetadataRequest…​FIXME

handleTopicMetadataRequest creates a MetadataResponse.TopicMetadata with TOPIC_AUTHORIZATION_FAILED for every unauthorizedForCreateTopics and unauthorizedForDescribeTopics.

handleTopicMetadataRequest getTopicMetadata if there are authorizedTopics.

handleTopicMetadataRequest prints out the following TRACE message to the logs:

Sending topic metadata [completeTopicMetadata] and brokers [brokers] for correlation id [correlationId] to client [clientId]

In the end, handleTopicMetadataRequest sendResponseMaybeThrottle with a new MetadataResponse.

Note
handleTopicMetadataRequest is used exclusively when KafkaApis is requested to handle a Metadata request.

Authorizing Operation on Resource — authorize Internal Method

authorize(
  session: RequestChannel.Session,
  operation: Operation,
  resource: Resource): Boolean

authorize simply requests the Authorizer to authorize the given Operation on the given Resource in the RequestChannel.Session.

Note
authorize is used when…​FIXME

Handling CreatePartitions Request — handleCreatePartitionsRequest Handler

handleCreatePartitionsRequest(request: RequestChannel.Request): Unit

handleCreatePartitionsRequest…​FIXME

Note
handleCreatePartitionsRequest is used when…​FIXME

Handling DeleteTopics Request — handleDeleteTopicsRequest Handler

handleDeleteTopicsRequest(request: RequestChannel.Request): Unit

handleDeleteTopicsRequest…​FIXME

Note
handleDeleteTopicsRequest is used when…​FIXME

Handling ControlledShutdown Request — handleControlledShutdownRequest Handler

handleControlledShutdownRequest(request: RequestChannel.Request): Unit

handleControlledShutdownRequest…​FIXME

Note
handleControlledShutdownRequest is used when…​FIXME

Creating KafkaApis Instance

KafkaApis takes the following when created:

KafkaApis initializes the internal registries and counters.

fetchOffsetForTimestamp Internal Method

fetchOffsetForTimestamp(topicPartition: TopicPartition, timestamp: Long): Option[TimestampOffset]

fetchOffsetForTimestamp…​FIXME

Note
fetchOffsetForTimestamp is used exclusively when KafkaApis is requested to handleListOffsetRequestV1AndAbove.

handleListOffsetRequestV0 Internal Method

handleListOffsetRequestV0(
  request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData]

handleListOffsetRequestV0…​FIXME

Note
handleListOffsetRequestV0 is used exclusively when KafkaApis is requested to handleListOffsetRequest (for the API version 0).

handleListOffsetRequestV1AndAbove Internal Method

handleListOffsetRequestV1AndAbove(
  request: RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData]

handleListOffsetRequestV1AndAbove…​FIXME

Note
handleListOffsetRequestV1AndAbove is used exclusively when KafkaApis is requested to handleListOffsetRequest (for the API version 1 or above).

Handling DescribeLogDirs Request — handleDescribeLogDirsRequest Handler

handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit

In summary, handleDescribeLogDirsRequest requests the ReplicaManager to describeLogDirs.

Internally, handleDescribeLogDirsRequest takes the DescribeLogDirsRequest from the body (of the RequestChannel.Request).

handleDescribeLogDirsRequest branches off per whether the DescribeLogDirsRequest was for isAllTopicPartitions or not.

Note
handleDescribeLogDirsRequest returns an empty list of log directories when the request is not authorized.

handleDescribeLogDirsRequest then requests the ReplicaManager to describeLogDirs with the requested TopicPartitions.

In the end, handleDescribeLogDirsRequest sendResponseMaybeThrottle with a DescribeLogDirsResponse and the LogDirInfos.

Note
handleDescribeLogDirsRequest is used exclusively when KafkaApis is requested to handle a DescribeLogDirs request.

sendResponseMaybeThrottle Internal Method

sendResponseMaybeThrottle(
  request: RequestChannel.Request,
  createResponse: Int => AbstractResponse,
  onComplete: Option[Send => Unit] = None): Unit

sendResponseMaybeThrottle…​FIXME

Note
sendResponseMaybeThrottle is used when…​FIXME

fetchOffsetsBefore Method

fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long]

fetchOffsetsBefore…​FIXME

Note
fetchOffsetsBefore is used exclusively when KafkaApis is requested to fetchOffsets.

fetchOffsets Method

fetchOffsets(
  logManager: LogManager,
  topicPartition: TopicPartition,
  timestamp: Long,
  maxNumOffsets: Int): Seq[Long]

fetchOffsets…​FIXME

Note
fetchOffsets is used exclusively when KafkaApis is requested to handleListOffsetRequestV0.

Handling StopReplica Request — handleStopReplicaRequest Handler

handleStopReplicaRequest(request: RequestChannel.Request): Unit

In summary, handleStopReplicaRequest requests the ReplicaManager to stopReplicas.

handleStopReplicaRequest…​FIXME

Note
handleStopReplicaRequest is used exclusively when KafkaApis is requested to handle a StopReplica request.

Handling UpdateMetadata Request — handleUpdateMetadataRequest Handler

handleUpdateMetadataRequest(request: RequestChannel.Request): Unit

In summary, handleUpdateMetadataRequest requests the ReplicaManager to maybeUpdateMetadataCache.

handleUpdateMetadataRequest…​FIXME

Note
handleUpdateMetadataRequest is used exclusively when KafkaApis is requested to handle a UpdateMetadata request.

Handling OffsetCommitRequest — handleOffsetCommitRequest Handler

handleOffsetCommitRequest(request: RequestChannel.Request): Unit

handleOffsetCommitRequest takes the OffsetCommitRequest from the body (of the RequestChannel.Request).

If authorized, handleOffsetCommitRequest simply requests the GroupCoordinator to handleCommitOffsets (with the sendResponseCallback).

Note
If authorized, handleOffsetCommitRequest branches off per API version (i.e. 0 to store offsets in Zookeeper and 1 and beyond). The API version 0 is not described here.

If not authorized, handleOffsetCommitRequest…​FIXME

Note
handleOffsetCommitRequest is used exclusively when KafkaApis is requested to handle an OffsetCommit request.

sendResponseCallback Method

sendResponseCallback(commitStatus: immutable.Map[TopicPartition, Errors]): Unit

sendResponseCallback prints out the following DEBUG message to the logs for offsets with errors (i.e. unauthorized topics to read or non-existing topics):

Offset commit request with correlation id [correlationId] from client [clientId] on partition [topicPartition] failed due to [exceptionName]

In the end, sendResponseCallback sendResponseMaybeThrottle a new OffsetCommitResponse.

createInternalTopic Internal Method

createInternalTopic(topic: String): MetadataResponse.TopicMetadata

createInternalTopic…​FIXME

Note
createInternalTopic is used when KafkaApis is requested to getOrCreateInternalTopic and getTopicMetadata.

getOrCreateInternalTopic Internal Method

getOrCreateInternalTopic(
  topic: String,
  listenerName: ListenerName): MetadataResponse.TopicMetadata

getOrCreateInternalTopic requests the MetadataCache for getTopicMetadata for the input topic (and the ListenerName).

In the end, getOrCreateInternalTopic returns the TopicMetadata if available or createInternalTopic.

Note
getOrCreateInternalTopic is used exclusively when KafkaApis is requested to handle a FindCoordinatorRequest.

getTopicMetadata Internal Method

getTopicMetadata(
  allowAutoTopicCreation: Boolean,
  topics: Set[String],
  listenerName: ListenerName,
  errorUnavailableEndpoints: Boolean,
  errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata]

getTopicMetadata…​FIXME

Note
getTopicMetadata is used exclusively when KafkaApis is requested to handle Metadata request.

Handling DescribeGroups Request — handleDescribeGroupRequest Handler

handleDescribeGroupRequest(request: RequestChannel.Request): Unit

handleDescribeGroupRequest…​FIXME

Note
handleDescribeGroupRequest is used exclusively when KafkaApis is requested to handle a DescribeGroups request.

Handling AlterConfigs Request — handleAlterConfigsRequest Handler

handleAlterConfigsRequest(request: RequestChannel.Request): Unit

handleAlterConfigsRequest…​FIXME

Note
handleAlterConfigsRequest is used exclusively when KafkaApis is requested to handle a AlterConfigs request.

createTopic Internal Method

createTopic(
  topic: String,
  numPartitions: Int,
  replicationFactor: Int,
  properties: Properties = new Properties()): MetadataResponse.TopicMetadata

createTopic…​FIXME

Note
createTopic is used when KafkaApis is requested to createInternalTopic and getTopicMetadata.

Handling FindCoordinatorRequest — handleFindCoordinatorRequest Handler

handleFindCoordinatorRequest(request: RequestChannel.Request): Unit

handleFindCoordinatorRequest takes the FindCoordinatorRequest from the body (of the RequestChannel.Request).

handleFindCoordinatorRequest checks permissions…​FIXME

For an authorized request, handleFindCoordinatorRequest branches off per CoordinatorType, i.e. GROUP or TRANSACTION.

For GROUP coordinator type, handleFindCoordinatorRequest does the following:

  1. Requests the GroupCoordinator for partitionFor the coordinator key (of the FindCoordinatorRequest)

  2. getOrCreateInternalTopic for __consumer_offsets topic

For TRANSACTION coordinator type, handleFindCoordinatorRequest does the following:

  1. Requests the TransactionCoordinator for partitionFor (for the coordinatorKey of the FindCoordinatorRequest)

  2. getOrCreateInternalTopic for __transaction_state topic

In the end, handleFindCoordinatorRequest sendResponseMaybeThrottle with a new FindCoordinatorResponse.

You should see the following TRACE message in the logs:

Sending FindCoordinator response [body] for correlation id [correlationId] to client [clientId].
Note
handleFindCoordinatorRequest is used exclusively when KafkaApis is requested to handle a FindCoordinator request.

Handling JoinGroupRequest — handleJoinGroupRequest Handler

handleJoinGroupRequest(request: RequestChannel.Request): Unit

handleJoinGroupRequest takes the JoinGroupRequest from the body (of the RequestChannel.Request) and simply requests the GroupCoordinator to handleJoinGroup (with sendResponseCallback to handle the response).

Note
handleJoinGroupRequest is used exclusively when KafkaApis is requested to handle a JoinGroup request.

Handling JoinGroup Response — sendResponseCallback Method

sendResponseCallback(joinResult: JoinGroupResult): Unit

sendResponseCallback creates a new JoinGroupResponse for the given JoinGroupResult and prints out the following TRACE message to the logs:

Sending join group response [responseBody] for correlation id [correlationId] to client [clientId].

In the end, sendResponseCallback sendResponseMaybeThrottle with the new JoinGroupResponse.

Handling AddOffsetsToTxn Request — handleAddOffsetsToTxnRequest Handler

handleAddOffsetsToTxnRequest(request: RequestChannel.Request): Unit

handleAddOffsetsToTxnRequest…​FIXME

Note
handleAddOffsetsToTxnRequest is used exclusively when KafkaApis is requested to handle a AddOffsetsToTxn request.

handleProduceRequest Handler

handleProduceRequest(request: RequestChannel.Request): Unit

In summary, handleProduceRequest requests the ReplicaManager to appendRecords (with isFromClient flag enabled).

handleProduceRequest…​FIXME

Note
handleProduceRequest is used exclusively when KafkaApis is requested to handle a Produce request.

handleWriteTxnMarkersRequest Handler

handleWriteTxnMarkersRequest(request: RequestChannel.Request): Unit

In summary, handleWriteTxnMarkersRequest requests the ReplicaManager to getMagic followed by appendRecords (with isFromClient flag disabled).

handleWriteTxnMarkersRequest…​FIXME

Note
handleWriteTxnMarkersRequest is used exclusively when KafkaApis is requested to handle a WriteTxnMarkers request.

handleDeleteRecordsRequest Handler

handleDeleteRecordsRequest(request: RequestChannel.Request): Unit

In summary, handleDeleteRecordsRequest requests the ReplicaManager to deleteRecords.

handleDeleteRecordsRequest…​FIXME

Note
handleDeleteRecordsRequest is used exclusively when KafkaApis is requested to handle a DeleteRecords request.

handleOffsetForLeaderEpochRequest Handler

handleOffsetForLeaderEpochRequest(request: RequestChannel.Request): Unit

In summary, handleOffsetForLeaderEpochRequest requests the ReplicaManager to lastOffsetForLeaderEpoch.

handleOffsetForLeaderEpochRequest…​FIXME

Note
handleOffsetForLeaderEpochRequest is used exclusively when KafkaApis is requested to handle a OffsetForLeaderEpoch request.

handleListOffsetRequest Handler

handleListOffsetRequest(request: RequestChannel.Request): Unit

In summary, handleListOffsetRequest requests the ReplicaManager to fetchOffsetForTimestamp.

handleListOffsetRequest…​FIXME

Note
handleListOffsetRequest is used exclusively when KafkaApis is requested to handle a ListOffsets request.

isAuthorizedClusterAction Internal Method

isAuthorizedClusterAction(request: RequestChannel.Request): Boolean

isAuthorizedClusterAction simply authorize with ClusterAction operation and ClusterResource resource.

Note
isAuthorizedClusterAction is used when…​FIXME

results matching ""

    No results matching ""