KafkaApis — API Request Handler

KafkaApis handles API requests (by means of handlers).

KafkaApis is created exclusively when KafkaServer is started (and creates KafkaRequestHandlerPool).

KafkaApis.png
Figure 1. KafkaApis is Created for KafkaRequestHandlerPool when KafkaServer Starts Up

KafkaApis uses a AdminZkClient for…​FIXME

Table 1. KafkaApis’s API Keys and Handlers (in alphabetical order)
API Key Handler

AlterConfigs

handleAlterConfigsRequest

AlterReplicaLogDirs

handleLeaderAndIsrRequest

ControlledShutdown

handleControlledShutdownRequest

CreatePartitions

handleCreatePartitionsRequest

CreateTopics

handleCreateTopicsRequest

DeleteTopics

handleDeleteTopicsRequest

DescribeGroups

handleDescribeGroupRequest

DescribeLogDirs

handleDescribeLogDirsRequest

Fetch

handleFetchRequest

FindCoordinator

handleFindCoordinatorRequest

LeaderAndIsr

handleAlterReplicaLogDirsRequest

Metadata

handleTopicMetadataRequest

OffsetCommit

handleOffsetCommitRequest

OffsetFetch

handleOffsetFetchRequest

StopReplica

handleStopReplicaRequest

UpdateMetadata

handleUpdateMetadataRequest

KafkaApis uses the ReplicaManager in the following handlers:

Tip

Enable INFO, DEBUG or TRACE logging levels for kafka.server.KafkaApis logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.server.KafkaApis=TRACE

Refer to Logging.

Routing API Requests — handle Method

handle(request: RequestChannel.Request): Unit

handle first prints out the following TRACE message to the logs:

Handling request:[request] from connection [id];securityProtocol:[protocol],principal:[principal]

handle then relays the input request to the corresponding handler per the apiKey (from the header of the input request).

Note
handle is used exclusively when KafkaRequestHandler thread is started.

Handling LeaderAndIsr Request — handleLeaderAndIsrRequest Handler

handleLeaderAndIsrRequest(request: RequestChannel.Request): Unit

handleLeaderAndIsrRequest…​FIXME

Note
handleLeaderAndIsrRequest is used exclusively when KafkaApis is requested to handle a LeaderAndIsr request.

Handling AlterReplicaLogDirs Request — handleAlterReplicaLogDirsRequest Handler

handleAlterReplicaLogDirsRequest(request: RequestChannel.Request): Unit

handleAlterReplicaLogDirsRequest…​FIXME

Note
handleAlterReplicaLogDirsRequest is used exclusively when KafkaApis is requested to handle a AlterReplicaLogDirs request.

Handling CreateTopics Request — handleCreateTopicsRequest Handler

handleCreateTopicsRequest(request: RequestChannel.Request): Unit

handleCreateTopicsRequest…​FIXME

handleCreateTopicsRequest checks whether KafkaController is active…​FIXME

handleCreateTopicsRequest authorizes the Create operation for ClusterResource…​FIXME

In the end, handleCreateTopicsRequest requests AdminManager to create the topics.

Note
handleCreateTopicsRequest is used exclusively when KafkaApis is requested to handle a CreateTopics request.

Handling OffsetFetch Request — handleOffsetFetchRequest Handler

handleOffsetFetchRequest(request: RequestChannel.Request): Unit

handleOffsetFetchRequest…​FIXME

Note
handleOffsetFetchRequest is used exclusively when KafkaApis is requested to handle a OffsetFetch request.

Handling Fetch Request — handleFetchRequest Handler

handleFetchRequest(request: RequestChannel.Request): Unit

handleFetchRequest…​FIXME

Note
handleFetchRequest is used exclusively when KafkaApis is requested to handle a Fetch request.

Handling Metadata Request — handleTopicMetadataRequest Handler

handleTopicMetadataRequest(request: RequestChannel.Request): Unit

handleTopicMetadataRequest takes the MetadataRequest from the body (from the input request).

handleTopicMetadataRequest requests the MetadataCache for getAllTopics or its subset (per topics attribute of the MetadataRequest).

handleTopicMetadataRequest filters out the topics for which the current principal (user) is not authorized to execute Describe operation.

For every authorized topic, handleTopicMetadataRequest…​FIXME

handleTopicMetadataRequest creates a MetadataResponse.TopicMetadata with TOPIC_AUTHORIZATION_FAILED for every unauthorizedForCreateTopics and unauthorizedForDescribeTopics.

handleTopicMetadataRequest getTopicMetadata if there are authorizedTopics.

handleTopicMetadataRequest prints out the following TRACE message to the logs:

Sending topic metadata [completeTopicMetadata] and brokers [brokers] for correlation id [correlationId] to client [clientId]

In the end, handleTopicMetadataRequest sendResponseMaybeThrottle with a new MetadataResponse.

Note
handleTopicMetadataRequest is used exclusively when KafkaApis is requested to handle a Metadata request.

authorize Internal Method

authorize(session: RequestChannel.Session, operation: Operation, resource: Resource): Boolean

authorize…​FIXME

Note
authorize is used when…​FIXME

Handling CreatePartitions Request — handleCreatePartitionsRequest Handler

handleCreatePartitionsRequest(request: RequestChannel.Request): Unit

handleCreatePartitionsRequest…​FIXME

Note
handleCreatePartitionsRequest is used when…​FIXME

Handling DeleteTopics Request — handleDeleteTopicsRequest Handler

handleDeleteTopicsRequest(request: RequestChannel.Request): Unit

handleDeleteTopicsRequest…​FIXME

Note
handleDeleteTopicsRequest is used when…​FIXME

Handling ControlledShutdown Request — handleControlledShutdownRequest Handler

handleControlledShutdownRequest(request: RequestChannel.Request): Unit

handleControlledShutdownRequest…​FIXME

Note
handleControlledShutdownRequest is used when…​FIXME

Creating KafkaApis Instance

KafkaApis takes the following when created:

KafkaApis initializes the internal registries and counters.

fetchOffsetForTimestamp Internal Method

fetchOffsetForTimestamp(topicPartition: TopicPartition, timestamp: Long): Option[TimestampOffset]

fetchOffsetForTimestamp…​FIXME

Note
fetchOffsetForTimestamp is used exclusively when KafkaApis is requested to handleListOffsetRequestV1AndAbove.

handleListOffsetRequestV0 Internal Method

handleListOffsetRequestV0(
  request : RequestChannel.Request) : Map[TopicPartition, ListOffsetResponse.PartitionData]

handleListOffsetRequestV0…​FIXME

Note
handleListOffsetRequestV0 is used exclusively when KafkaApis is requested to handleListOffsetRequest (for the API version 0).

handleListOffsetRequestV1AndAbove Internal Method

handleListOffsetRequestV1AndAbove(
  request: RequestChannel.Request): Map[TopicPartition, ListOffsetResponse.PartitionData]

handleListOffsetRequestV1AndAbove…​FIXME

Note
handleListOffsetRequestV1AndAbove is used exclusively when KafkaApis is requested to handleListOffsetRequest (for the API version 1 or above).

Handling DescribeLogDirs Request — handleDescribeLogDirsRequest Handler

handleDescribeLogDirsRequest(request: RequestChannel.Request): Unit

handleDescribeLogDirsRequest takes the DescribeLogDirsRequest (from the body of the input RequestChannel.Request).

handleDescribeLogDirsRequest branches off per whether the DescribeLogDirsRequest was for isAllTopicPartitions or not.

Note
handleDescribeLogDirsRequest returns an empty list of log directories when the request is not authorized.

handleDescribeLogDirsRequest then requests the ReplicaManager to describeLogDirs with the requested TopicPartitions.

In the end, handleDescribeLogDirsRequest sendResponseMaybeThrottle with a DescribeLogDirsResponse and the LogDirInfos.

Note
handleDescribeLogDirsRequest is used exclusively when KafkaApis is requested to handle a DescribeLogDirs request.

sendResponseMaybeThrottle Internal Method

sendResponseMaybeThrottle(
  request: RequestChannel.Request,
  createResponse: Int => AbstractResponse,
  onComplete: Option[Send => Unit] = None): Unit

sendResponseMaybeThrottle…​FIXME

Note
sendResponseMaybeThrottle is used when…​FIXME

fetchOffsetsBefore Method

fetchOffsetsBefore(log: Log, timestamp: Long, maxNumOffsets: Int): Seq[Long]

fetchOffsetsBefore…​FIXME

Note
fetchOffsetsBefore is used exclusively when KafkaApis is requested to fetchOffsets.

fetchOffsets Method

fetchOffsets(
  logManager: LogManager,
  topicPartition: TopicPartition,
  timestamp: Long,
  maxNumOffsets: Int): Seq[Long]

fetchOffsets…​FIXME

Note
fetchOffsets is used exclusively when KafkaApis is requested to handleListOffsetRequestV0.

Handling StopReplica Request — handleStopReplicaRequest Handler

handleStopReplicaRequest(request: RequestChannel.Request): Unit

handleStopReplicaRequest…​FIXME

Note
handleStopReplicaRequest is used exclusively when KafkaApis is requested to handle a StopReplica request.

Handling UpdateMetadata Request — handleUpdateMetadataRequest Handler

handleUpdateMetadataRequest(request: RequestChannel.Request): Unit

handleUpdateMetadataRequest…​FIXME

Note
handleUpdateMetadataRequest is used exclusively when KafkaApis is requested to handle a UpdateMetadata request.

Handling OffsetCommit Request — handleOffsetCommitRequest Handler

handleOffsetCommitRequest(request: RequestChannel.Request): Unit

handleOffsetCommitRequest…​FIXME

Note
handleOffsetCommitRequest is used exclusively when KafkaApis is requested to handle a OffsetCommit request.

createInternalTopic Internal Method

createInternalTopic(topic: String): MetadataResponse.TopicMetadata

createInternalTopic…​FIXME

Note
createInternalTopic is used when KafkaApis is requested to getOrCreateInternalTopic and getTopicMetadata.

getOrCreateInternalTopic Internal Method

getOrCreateInternalTopic(
  topic: String,
  listenerName: ListenerName): MetadataResponse.TopicMetadata

getOrCreateInternalTopic requests the MetadataCache for getTopicMetadata for the input topic (and the ListenerName).

In the end, getOrCreateInternalTopic returns the TopicMetadata if available or createInternalTopic.

Note
getOrCreateInternalTopic is used exclusively when KafkaApis is requested to handle a FindCoordinator request.

getTopicMetadata Internal Method

getTopicMetadata(
  allowAutoTopicCreation: Boolean,
  topics: Set[String],
  listenerName: ListenerName,
  errorUnavailableEndpoints: Boolean,
  errorUnavailableListeners: Boolean): Seq[MetadataResponse.TopicMetadata]

getTopicMetadata…​FIXME

Note
getTopicMetadata is used exclusively when KafkaApis is requested to handle Metadata request.

Handling DescribeGroups Request — handleDescribeGroupRequest Handler

handleDescribeGroupRequest(request: RequestChannel.Request): Unit

handleDescribeGroupRequest…​FIXME

Note
handleDescribeGroupRequest is used exclusively when KafkaApis is requested to handle a DescribeGroups request.

Handling AlterConfigs Request — handleAlterConfigsRequest Handler

handleAlterConfigsRequest(request: RequestChannel.Request): Unit

handleAlterConfigsRequest…​FIXME

Note
handleAlterConfigsRequest is used exclusively when KafkaApis is requested to handle a AlterConfigs request.

createTopic Internal Method

createTopic(
  topic: String,
  numPartitions: Int,
  replicationFactor: Int,
  properties: Properties = new Properties()): MetadataResponse.TopicMetadata

createTopic…​FIXME

Note
createTopic is used when KafkaApis is requested to createInternalTopic and getTopicMetadata.

Handling FindCoordinator Request — handleFindCoordinatorRequest Handler

handleFindCoordinatorRequest(request: RequestChannel.Request): Unit

handleFindCoordinatorRequest takes the FindCoordinatorRequest from the body (from the input request).

handleFindCoordinatorRequest checks permissions…​FIXME

For an authorized request, handleFindCoordinatorRequest branches off per CoordinatorType, i.e. GROUP or TRANSACTION.

For GROUP coordinator type, handleFindCoordinatorRequest does the following:

  1. Requests the GroupCoordinator for partitionFor (for the coordinatorKey of the FindCoordinatorRequest)

  2. getOrCreateInternalTopic for __consumer_offsets topic

For TRANSACTION coordinator type, handleFindCoordinatorRequest does the following:

  1. Requests the TransactionCoordinator for partitionFor (for the coordinatorKey of the FindCoordinatorRequest)

  2. getOrCreateInternalTopic for __transaction_state topic

In the end, handleFindCoordinatorRequest sendResponseMaybeThrottle with a new FindCoordinatorResponse.

You should see the following TRACE message in the logs:

Sending FindCoordinator response [body] for correlation id [correlationId] to client [clientId].
Note
handleFindCoordinatorRequest is used exclusively when KafkaApis is requested to handle a FindCoordinator request.

results matching ""

    No results matching ""