NetworkClient — Non-Blocking KafkaClient

NetworkClient is a non-blocking KafkaClient that uses Selectable for network communication (i.e. sending and receiving messages).

Selector is the one and only Selectable that uses Java’s selectable channels for stream-oriented connecting sockets (i.e. Java’s java.nio.channels.SocketChannel).

NetworkClient does the actual reads and writes (to sockets) every poll.

Figure 1. NetworkClient

NetworkClient is created when:

  • KafkaConsumer is created (with ConsumerNetworkClient)

  • KafkaProducer is created (with Sender)

  • KafkaAdminClient is created (using createInternal)

  • AdminClient is created (with ConsumerNetworkClient)

  • ControllerChannelManager is requested to addNewBroker (and creates a RequestSendThread daemon thread and a ControllerBrokerStateInfo)

  • TransactionMarkerChannelManager is created

  • KafkaServer does doControlledShutdown

  • ReplicaFetcherBlockingSend is created

Table 1. NetworkClient’s Internal Properties (e.g. Registries and Counters)
Name Description



Used when…​FIXME


Enable DEBUG logging level for org.apache.kafka.clients.NetworkClient logger to see what happens inside.

Add the following line to config/ (for Kafka tools):

Add the following line to config/

Refer to Logging.

Establishing Connection to Broker Node — initiateConnect Internal Method

void initiateConnect(Node node, long now)

initiateConnect prints out the following DEBUG message to the logs:

Initiating connection to node [node]

initiateConnect requests ClusterConnectionStates to enter the connecting state for the connection to the broker node.

initiateConnect requests Selectable to connect to the broker node (at a given host and port).

initiateConnect passes the sizes of send and receive buffers for the socket connection.

In case of an IO failure, initiateConnect requests ClusterConnectionStates to enter the disconnected state for the connection to the broker node.

initiateConnect requests MetadataUpdater for update.

You should see the following DEBUG message in the logs:

Error connecting to node [node]

initiateConnect is used when:

ready Method

boolean ready(Node node, long now)
ready is a part of KafkaClient Contract.


ready is used when…​FIXME

wakeup Method

void wakeup()
wakeup is a part of KafkaClient Contract.

wakeup simply requests the internal Selectable to wakeup

wakeup is used when…​FIXME

Reading and Writing to Socket — poll Method

List<ClientResponse> poll(long timeout, long now)
poll is a part of KafkaClient Contract.

poll then requests Selectable to poll.

In the end, poll handles completed request sends, receives, disconnected connections, records any connections to new brokers, initiates API version requests, expire in-flight requests, and finally triggers their RequestCompletionHandlers.

In case abortedSends is not empty, poll creates a collection of ClientResponse with abortedSends, triggers their RequestCompletionHandlers and returns them.

handleCompletedReceives Method

void handleCompletedReceives(List<ClientResponse> responses, long now)


handleCompletedReceives is used exclusively when NetworkClient polls.

Creating NetworkClient Instance

NetworkClient takes the following when created:

NetworkClient initializes the internal registries and counters.

Informing ClientResponse about Response Being Completed — completeResponses Internal Method

void completeResponses(List<ClientResponse> responses)

completeResponses informs every ClientResponse (in the input responses) that a response has been completed.

In case of any exception, completeResponses prints out the following ERROR message to the logs:

Uncaught error in request completion: [exception]
completeResponses is used when NetworkClient poll (for both abortedSends and completed actions).

Creating ClientRequest — newClientRequest Method

ClientRequest newClientRequest(
  String nodeId,
  AbstractRequest.Builder<?> requestBuilder,
  long createdTimeMs,
  boolean expectResponse,
  int requestTimeoutMs,
  RequestCompletionHandler callback)
newClientRequest is part of the KafkaClient Contract to…​FIXME.

newClientRequest simply creates a new ClientRequest (with the input parameters and the correlation incremented, the clientId and the defaultRequestTimeoutMs).

sendInternalMetadataRequest Internal Method

void sendInternalMetadataRequest(
  MetadataRequest.Builder builder,
  String nodeConnectionId,
  long now)


sendInternalMetadataRequest is used exclusively when DefaultMetadataUpdater is requested to maybeUpdate.

doSend Internal Method

void doSend(
  ClientRequest clientRequest,
  boolean isInternalRequest,
  long now)
void doSend(
  ClientRequest clientRequest,
  boolean isInternalRequest,
  long now,
  AbstractRequest request)


doSend is used when NetworkClient is requested to send, sendInternalMetadataRequest and handleInitiateApiVersionRequests.

send Method

void send(ClientRequest request, long now)
send is part of the KafkaClient Contract to…​FIXME.


results matching ""

    No results matching ""