NetworkClient — Non-Blocking Network KafkaClient
NetworkClient
is a non-blocking KafkaClient that uses a Selectable for network communication (i.e. sending and receiving messages).
Note
|
Selector is the one and only Selectable that uses Java’s selectable channels for stream-oriented connecting sockets (i.e. Java’s java.nio.channels.SocketChannel). |
NetworkClient
does the actual reads and writes (to sockets) every poll.
NetworkClient
uses the MetadataUpdater for the following:
NetworkClient
is created when:
-
KafkaConsumer
is created (withConsumerNetworkClient
) -
KafkaProducer
is created (withSender
) -
KafkaAdminClient
is created (usingcreateInternal
) -
AdminClient
is created (withConsumerNetworkClient
) -
ControllerChannelManager
is requested to addNewBroker (and creates aRequestSendThread
daemon thread and aControllerBrokerStateInfo
) -
TransactionMarkerChannelManager
is created -
KafkaServer
doesdoControlledShutdown
-
ReplicaFetcherBlockingSend
is created
Name | Description |
---|---|
Used when…FIXME |
Tip
|
Enable Add the following line to
Add the following line to
Refer to Logging. |
Establishing Connection to Broker Node — initiateConnect
Internal Method
void initiateConnect(Node node, long now)
initiateConnect
prints out the following DEBUG message to the logs:
Initiating connection to node [node]
initiateConnect
requests the ClusterConnectionStates to enter the connecting state for the connection to the broker node
.
initiateConnect
requests the Selectable to connect to the broker node
(at a given host and port).
In case of an IO failure, initiateConnect
requests the ClusterConnectionStates to enter the disconnected state for the connection to the broker node
.
initiateConnect
requests the MetadataUpdater for update.
You should see the following DEBUG message in the logs:
Error connecting to node [node]
Note
|
|
ready
Method
boolean ready(Node node, long now)
Note
|
ready is a part of KafkaClient Contract.
|
ready
…FIXME
wakeup
Method
void wakeup()
Note
|
wakeup is a part of KafkaClient Contract.
|
wakeup
simply requests the internal Selectable to wakeup
Note
|
wakeup is used when…FIXME
|
Reading and Writing to Socket — poll
Method
List<ClientResponse> poll(
long timeout,
long now)
Note
|
poll is a part of KafkaClient contract.
|
poll
requests the MetadataUpdater for cluster metadata update (if needed and possible).
poll
then requests Selectable to poll.
In the end, poll
handles completed request sends, receives, disconnected connections, records any connections to new brokers, initiates API version requests, expire in-flight requests, and finally triggers their RequestCompletionHandlers
.
In case abortedSends is not empty, poll
creates a collection of ClientResponse
with abortedSends, triggers their RequestCompletionHandlers
and returns them.
handleCompletedReceives
Internal Method
void handleCompletedReceives(
List<ClientResponse> responses,
long now)
handleCompletedReceives
…FIXME
Note
|
handleCompletedReceives is used exclusively when NetworkClient is requested to poll.
|
Creating NetworkClient Instance
NetworkClient
takes the following when created:
-
Client ID that is used to identify the client in requests to a Kafka server (when
NetworkClient
is requested to create a new ClientRequest) -
Size of the TCP send buffer (SO_SNDBUF) for socket connection (in bytes)
Use send.buffer.bytes property to configure it.
Used when
NetworkClient
establishes connection to a broker node. -
Size of the TCP receive buffer (SO_RCVBUF) for socket connection (in bytes)
Use receive.buffer.bytes property to configure it.
Used when
NetworkClient
establishes connection to a broker node -
throttleTimeSensor
Sensor
NetworkClient
initializes the internal registries and counters.
Informing ClientResponse about Response Being Completed — completeResponses
Internal Method
void completeResponses(List<ClientResponse> responses)
completeResponses
informs every ClientResponse
(in the input responses
) that a response has been completed.
In case of any exception, completeResponses
prints out the following ERROR message to the logs:
Uncaught error in request completion: [exception]
Note
|
completeResponses is used when NetworkClient is requested to poll (for both abortedSends and completed actions).
|
Creating ClientRequest — newClientRequest
Method
ClientRequest newClientRequest(
String nodeId,
AbstractRequest.Builder<?> requestBuilder,
long createdTimeMs,
boolean expectResponse,
int requestTimeoutMs,
RequestCompletionHandler callback)
Note
|
newClientRequest is part of the KafkaClient Contract to…FIXME.
|
newClientRequest
simply creates a new ClientRequest
(with the input parameters and the correlation incremented, the clientId and the defaultRequestTimeoutMs).
sendInternalMetadataRequest
Internal Method
void sendInternalMetadataRequest(
MetadataRequest.Builder builder,
String nodeConnectionId,
long now)
sendInternalMetadataRequest
…FIXME
Note
|
sendInternalMetadataRequest is used exclusively when DefaultMetadataUpdater is requested to maybeUpdate.
|
doSend
Internal Method
void doSend(
ClientRequest clientRequest,
boolean isInternalRequest,
long now)
void doSend(
ClientRequest clientRequest,
boolean isInternalRequest,
long now,
AbstractRequest request)
doSend
…FIXME
Note
|
doSend is used when NetworkClient is requested to send, sendInternalMetadataRequest and handleInitiateApiVersionRequests.
|
send
Method
void send(ClientRequest request, long now)
Note
|
send is part of the KafkaClient Contract to…FIXME.
|
send
…FIXME
handleDisconnections
Internal Method
void handleDisconnections(List<ClientResponse> responses, long now)
handleDisconnections
…FIXME
Note
|
handleDisconnections is used exclusively when NetworkClient is requested to poll.
|
handleTimedOutRequests
Internal Method
void handleTimedOutRequests(List<ClientResponse> responses, long now)
handleTimedOutRequests
…FIXME
Note
|
handleTimedOutRequests is used exclusively when NetworkClient is requested to poll.
|
processDisconnection
Internal Method
void processDisconnection(
List<ClientResponse> responses,
String nodeId,
long now,
ChannelState disconnectState)
processDisconnection
…FIXME
Note
|
processDisconnection is used when NetworkClient is requested to handleTimedOutRequests, handleApiVersionsResponse, and handleDisconnections.
|
handleApiVersionsResponse
Internal Method
void handleApiVersionsResponse(
List<ClientResponse> responses,
InFlightRequest req,
long now,
ApiVersionsResponse apiVersionsResponse)
handleApiVersionsResponse
…FIXME
Note
|
handleApiVersionsResponse is used exclusively when NetworkClient is requested to handleCompletedReceives (when requested to poll).
|
leastLoadedNode
Method
Node leastLoadedNode(long now)
Note
|
leastLoadedNode is part of the KafkaClient Contract to…FIXME.
|
leastLoadedNode
…FIXME
close
Method
void close()
Note
|
close is part of Java’s java.io.Closeable to close this stream and releases any system resources associated with it.
|
close
…FIXME
close
Method
void close(String nodeId)
Note
|
close is part of the KafkaClient Contract to…FIXME.
|
close
…FIXME
isReady
Method
boolean isReady(Node node, long now)
Note
|
isReady is part of the KafkaClient Contract to…FIXME.
|
isReady
…FIXME
disconnect
Method
void disconnect(String nodeId)
Note
|
disconnect is part of the KafkaClient Contract to…FIXME.
|
disconnect
…FIXME