NetworkClient — Non-Blocking KafkaClient

NetworkClient is a non-blocking KafkaClient that uses Selectable for network communication (i.e. sending and receiving messages).

Note
Selector is the one and only Selectable that uses Java’s selectable channels for stream-oriented connecting sockets (i.e. Java’s java.nio.channels.SocketChannel).

NetworkClient does the actual reads and writes (to sockets) every poll.

NetworkClient.png
Figure 1. NetworkClient

NetworkClient is created when:

  • KafkaConsumer is created (with ConsumerNetworkClient)

  • KafkaProducer is created (with Sender)

  • KafkaAdminClient is created (using createInternal)

  • AdminClient is created (with ConsumerNetworkClient)

  • ControllerChannelManager is requested to addNewBroker (and creates a RequestSendThread daemon thread and a ControllerBrokerStateInfo)

  • TransactionMarkerChannelManager is created

  • KafkaServer does doControlledShutdown

  • ReplicaFetcherBlockingSend is created

Table 1. NetworkClient’s Internal Properties (e.g. Registries and Counters)
Name Description

connectionStates

ClusterConnectionStates

Used when…​FIXME

Tip

Enable DEBUG logging level for org.apache.kafka.clients.NetworkClient logger to see what happens inside.

Add the following line to config/tools-log4j.properties (for Kafka tools):

log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG

Add the following line to config/log4j.properties:

log4j.logger.org.apache.kafka.clients.NetworkClient=DEBUG

Refer to Logging.

Establishing Connection to Broker Node — initiateConnect Internal Method

void initiateConnect(Node node, long now)

initiateConnect prints out the following DEBUG message to the logs:

Initiating connection to node [node]

initiateConnect requests ClusterConnectionStates to enter the connecting state for the connection to the broker node.

initiateConnect requests Selectable to connect to the broker node (at a given host and port).

Note
initiateConnect passes the sizes of send and receive buffers for the socket connection.

In case of an IO failure, initiateConnect requests ClusterConnectionStates to enter the disconnected state for the connection to the broker node.

initiateConnect requests MetadataUpdater for update.

You should see the following DEBUG message in the logs:

Error connecting to node [node]
Note

initiateConnect is used when:

ready Method

boolean ready(Node node, long now)
Note
ready is a part of KafkaClient Contract.

ready…​FIXME

Note
ready is used when…​FIXME

wakeup Method

void wakeup()
Note
wakeup is a part of KafkaClient Contract.

wakeup simply requests the internal Selectable to wakeup

Note
wakeup is used when…​FIXME

Reading and Writing to Socket — poll Method

List<ClientResponse> poll(long timeout, long now)
Note
poll is a part of KafkaClient Contract.

poll then requests Selectable to poll.

In the end, poll handles completed request sends, receives, disconnected connections, records any connections to new brokers, initiates API version requests, expire in-flight requests, and finally triggers their RequestCompletionHandlers.

In case abortedSends is not empty, poll creates a collection of ClientResponse with abortedSends, triggers their RequestCompletionHandlers and returns them.

handleCompletedReceives Method

void handleCompletedReceives(List<ClientResponse> responses, long now)

handleCompletedReceives…​FIXME

Note
handleCompletedReceives is used exclusively when NetworkClient polls.

Creating NetworkClient Instance

NetworkClient takes the following when created:

NetworkClient initializes the internal registries and counters.

Informing ClientResponse about Response Being Completed — completeResponses Internal Method

void completeResponses(List<ClientResponse> responses)

completeResponses informs every ClientResponse (in the input responses) that a response has been completed.

In case of any exception, completeResponses prints out the following ERROR message to the logs:

Uncaught error in request completion: [exception]
Note
completeResponses is used when NetworkClient poll (for both abortedSends and completed actions).

Creating ClientRequest — newClientRequest Method

ClientRequest newClientRequest(
  String nodeId,
  AbstractRequest.Builder<?> requestBuilder,
  long createdTimeMs,
  boolean expectResponse,
  int requestTimeoutMs,
  RequestCompletionHandler callback)
Note
newClientRequest is part of the KafkaClient Contract to…​FIXME.

newClientRequest simply creates a new ClientRequest (with the input parameters and the correlation incremented, the clientId and the defaultRequestTimeoutMs).

sendInternalMetadataRequest Internal Method

void sendInternalMetadataRequest(
  MetadataRequest.Builder builder,
  String nodeConnectionId,
  long now)

sendInternalMetadataRequest…​FIXME

Note
sendInternalMetadataRequest is used exclusively when DefaultMetadataUpdater is requested to maybeUpdate.

doSend Internal Method

void doSend(
  ClientRequest clientRequest,
  boolean isInternalRequest,
  long now)
void doSend(
  ClientRequest clientRequest,
  boolean isInternalRequest,
  long now,
  AbstractRequest request)

doSend…​FIXME

Note
doSend is used when NetworkClient is requested to send, sendInternalMetadataRequest and handleInitiateApiVersionRequests.

send Method

void send(ClientRequest request, long now)
Note
send is part of the KafkaClient Contract to…​FIXME.

send…​FIXME

results matching ""

    No results matching ""