NetworkClient — Non-Blocking Network KafkaClient
NetworkClient is a non-blocking KafkaClient that uses a Selectable for network communication (i.e. sending and receiving messages).
|
Note
|
Selector is the one and only Selectable that uses Java’s selectable channels for stream-oriented connecting sockets (i.e. Java’s java.nio.channels.SocketChannel). |
NetworkClient does the actual reads and writes (to sockets) every poll.
NetworkClient uses the MetadataUpdater for the following:
NetworkClient is created when:
-
KafkaConsumeris created (withConsumerNetworkClient) -
KafkaProduceris created (withSender) -
KafkaAdminClientis created (usingcreateInternal) -
AdminClientis created (withConsumerNetworkClient) -
ControllerChannelManageris requested to addNewBroker (and creates aRequestSendThreaddaemon thread and aControllerBrokerStateInfo) -
TransactionMarkerChannelManageris created -
KafkaServerdoesdoControlledShutdown -
ReplicaFetcherBlockingSendis created
| Name | Description |
|---|---|
Used when…FIXME |
|
Tip
|
Enable Add the following line to
Add the following line to
Refer to Logging. |
Establishing Connection to Broker Node — initiateConnect Internal Method
void initiateConnect(Node node, long now)
initiateConnect prints out the following DEBUG message to the logs:
Initiating connection to node [node]
initiateConnect requests the ClusterConnectionStates to enter the connecting state for the connection to the broker node.
initiateConnect requests the Selectable to connect to the broker node (at a given host and port).
In case of an IO failure, initiateConnect requests the ClusterConnectionStates to enter the disconnected state for the connection to the broker node.
initiateConnect requests the MetadataUpdater for update.
You should see the following DEBUG message in the logs:
Error connecting to node [node]
|
Note
|
|
ready Method
boolean ready(Node node, long now)
|
Note
|
ready is a part of KafkaClient Contract.
|
ready…FIXME
wakeup Method
void wakeup()
|
Note
|
wakeup is a part of KafkaClient Contract.
|
wakeup simply requests the internal Selectable to wakeup
|
Note
|
wakeup is used when…FIXME
|
Reading and Writing to Socket — poll Method
List<ClientResponse> poll(
long timeout,
long now)
|
Note
|
poll is a part of KafkaClient contract.
|
poll requests the MetadataUpdater for cluster metadata update (if needed and possible).
poll then requests Selectable to poll.
In the end, poll handles completed request sends, receives, disconnected connections, records any connections to new brokers, initiates API version requests, expire in-flight requests, and finally triggers their RequestCompletionHandlers.
In case abortedSends is not empty, poll creates a collection of ClientResponse with abortedSends, triggers their RequestCompletionHandlers and returns them.
handleCompletedReceives Internal Method
void handleCompletedReceives(
List<ClientResponse> responses,
long now)
handleCompletedReceives…FIXME
|
Note
|
handleCompletedReceives is used exclusively when NetworkClient is requested to poll.
|
Creating NetworkClient Instance
NetworkClient takes the following when created:
-
Client ID that is used to identify the client in requests to a Kafka server (when
NetworkClientis requested to create a new ClientRequest) -
Size of the TCP send buffer (SO_SNDBUF) for socket connection (in bytes)
Use send.buffer.bytes property to configure it.
Used when
NetworkClientestablishes connection to a broker node. -
Size of the TCP receive buffer (SO_RCVBUF) for socket connection (in bytes)
Use receive.buffer.bytes property to configure it.
Used when
NetworkClientestablishes connection to a broker node -
throttleTimeSensorSensor
NetworkClient initializes the internal registries and counters.
Informing ClientResponse about Response Being Completed — completeResponses Internal Method
void completeResponses(List<ClientResponse> responses)
completeResponses informs every ClientResponse (in the input responses) that a response has been completed.
In case of any exception, completeResponses prints out the following ERROR message to the logs:
Uncaught error in request completion: [exception]
|
Note
|
completeResponses is used when NetworkClient is requested to poll (for both abortedSends and completed actions).
|
Creating ClientRequest — newClientRequest Method
ClientRequest newClientRequest(
String nodeId,
AbstractRequest.Builder<?> requestBuilder,
long createdTimeMs,
boolean expectResponse,
int requestTimeoutMs,
RequestCompletionHandler callback)
|
Note
|
newClientRequest is part of the KafkaClient Contract to…FIXME.
|
newClientRequest simply creates a new ClientRequest (with the input parameters and the correlation incremented, the clientId and the defaultRequestTimeoutMs).
sendInternalMetadataRequest Internal Method
void sendInternalMetadataRequest(
MetadataRequest.Builder builder,
String nodeConnectionId,
long now)
sendInternalMetadataRequest…FIXME
|
Note
|
sendInternalMetadataRequest is used exclusively when DefaultMetadataUpdater is requested to maybeUpdate.
|
doSend Internal Method
void doSend(
ClientRequest clientRequest,
boolean isInternalRequest,
long now)
void doSend(
ClientRequest clientRequest,
boolean isInternalRequest,
long now,
AbstractRequest request)
doSend…FIXME
|
Note
|
doSend is used when NetworkClient is requested to send, sendInternalMetadataRequest and handleInitiateApiVersionRequests.
|
send Method
void send(ClientRequest request, long now)
|
Note
|
send is part of the KafkaClient Contract to…FIXME.
|
send…FIXME
handleDisconnections Internal Method
void handleDisconnections(List<ClientResponse> responses, long now)
handleDisconnections…FIXME
|
Note
|
handleDisconnections is used exclusively when NetworkClient is requested to poll.
|
handleTimedOutRequests Internal Method
void handleTimedOutRequests(List<ClientResponse> responses, long now)
handleTimedOutRequests…FIXME
|
Note
|
handleTimedOutRequests is used exclusively when NetworkClient is requested to poll.
|
processDisconnection Internal Method
void processDisconnection(
List<ClientResponse> responses,
String nodeId,
long now,
ChannelState disconnectState)
processDisconnection…FIXME
|
Note
|
processDisconnection is used when NetworkClient is requested to handleTimedOutRequests, handleApiVersionsResponse, and handleDisconnections.
|
handleApiVersionsResponse Internal Method
void handleApiVersionsResponse(
List<ClientResponse> responses,
InFlightRequest req,
long now,
ApiVersionsResponse apiVersionsResponse)
handleApiVersionsResponse…FIXME
|
Note
|
handleApiVersionsResponse is used exclusively when NetworkClient is requested to handleCompletedReceives (when requested to poll).
|
leastLoadedNode Method
Node leastLoadedNode(long now)
|
Note
|
leastLoadedNode is part of the KafkaClient Contract to…FIXME.
|
leastLoadedNode…FIXME
close Method
void close()
|
Note
|
close is part of Java’s java.io.Closeable to close this stream and releases any system resources associated with it.
|
close…FIXME
close Method
void close(String nodeId)
|
Note
|
close is part of the KafkaClient Contract to…FIXME.
|
close…FIXME
isReady Method
boolean isReady(Node node, long now)
|
Note
|
isReady is part of the KafkaClient Contract to…FIXME.
|
isReady…FIXME
disconnect Method
void disconnect(String nodeId)
|
Note
|
disconnect is part of the KafkaClient Contract to…FIXME.
|
disconnect…FIXME