SocketServer
SocketServer is a NIO socket server for a KafkaServer.
SocketServer is created and then started up exclusively when KafkaServer is requested to start up.
SocketServer uses queued.max.requests configuration property for…FIXME
| Name | Description |
|---|---|
SocketServer uses an empty metric prefix for…FIXME
SocketServer uses ControlPlane metric prefix for…FIXME
Creating SocketServer Instance
SocketServer takes the following to be created:
SocketServer initializes the internal properties.
Control-Plane Request Handler for Controller-Brokers Communication — controlPlaneRequestChannelOpt Internal Value
SocketServer creates a dedicated RequestChannel for communication between a controller and brokers when control.plane.listener.name configuration property is defined.
The RequestChannel is created for KafkaServer (when requested to start up). It is the time when KafkaServer creates dedicated control-plane request processor and control-plane request handler pool.
The RequestChannel is created with the queue size of 20 and the ControlPlane metric name prefix.
SocketServer manages the RequestChannel (when requested to stopProcessingRequests and shutdown).
The RequestChannel is assumed to be available when SocketServer is requested to createControlPlaneAcceptorAndProcessor (when requested to start up) based on control.plane.listener.name configuration property.
Creating Network Processor Thread — newProcessor Internal Method
newProcessor(
id: Int,
connectionQuotas: ConnectionQuotas,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
memoryPool: MemoryPool): Processor
newProcessor simply creates a new Processor with the RequestChannel and the following configuration properties:
|
Note
|
newProcessor is used exclusively when SocketServer is requested to createControlPlaneAcceptorAndProcessor and addDataPlaneProcessors.
|
Starting Up — startup Method
startup(
startupProcessors: Boolean = true): Unit
Internally, startup creates the ConnectionQuotas (with maxConnectionsPerIp and maxConnectionsPerIpOverrides).
For every endpoint (in endpoints registry) startup does the following:
-
Creates up to numProcessorThreads number of Processors (for ConnectionQuotas and MemoryPool)
-
Creates a
Acceptorfor the endpoint and processors -
Records the
Acceptorin acceptors internal registry -
Starts a non-daemon thread for the
Acceptorwith the name askafka-socket-acceptor-[listenerName]-[securityProtocol]-[port](e.g.kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-9092) and waits until it has started fully
startup then registers metrics.
In the end, startup prints out the following INFO message to the logs:
Started [dataPlaneAcceptors] acceptor threads for data-plane
|
Note
|
startup is used exclusively when KafkaServer is requested to start up.
|
addProcessors Internal Method
addProcessors(
acceptor: Acceptor,
endpoint: EndPoint,
newProcessorsPerListener: Int): Unit
addProcessors…FIXME
|
Note
|
addProcessors is used when SocketServer is requested to createAcceptorAndProcessors and resizeThreadPool.
|
createAcceptorAndProcessors Internal Method
createAcceptorAndProcessors(
processorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit
createAcceptorAndProcessors…FIXME
|
Note
|
createAcceptorAndProcessors is used when SocketServer is requested to startup and addListeners.
|
resizeThreadPool Method
resizeThreadPool(
oldNumNetworkThreads: Int,
newNumNetworkThreads: Int): Unit
resizeThreadPool…FIXME
|
Note
|
resizeThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure (the number of network threads).
|
addListeners Method
addListeners(listenersAdded: Seq[EndPoint]): Unit
addListeners…FIXME
|
Note
|
addListeners is used exclusively when DynamicListenerConfig is requested to reconfigure.
|
Stopping Request Processors — stopProcessingRequests Method
stopProcessingRequests(): Unit
stopProcessingRequests…FIXME
updateMaxConnectionsPerIpOverride Method
updateMaxConnectionsPerIpOverride(
maxConnectionsPerIpOverrides: Map[String, Int]): Unit
updateMaxConnectionsPerIpOverride…FIXME
|
Note
|
updateMaxConnectionsPerIpOverride is used when…FIXME
|
updateMaxConnectionsPerIp Method
updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit
updateMaxConnectionsPerIp…FIXME
|
Note
|
updateMaxConnectionsPerIp is used when…FIXME
|
removeListeners Method
removeListeners(listenersRemoved: Seq[EndPoint]): Unit
removeListeners…FIXME
|
Note
|
removeListeners is used when…FIXME
|
addDataPlaneProcessors Internal Method
addDataPlaneProcessors(
acceptor: Acceptor,
endpoint: EndPoint,
newProcessorsPerListener: Int): Unit
addDataPlaneProcessors…FIXME
|
Note
|
addDataPlaneProcessors is used when SocketServer is requested to createDataPlaneAcceptorsAndProcessors and resizeThreadPool.
|
createDataPlaneAcceptorsAndProcessors Internal Method
createDataPlaneAcceptorsAndProcessors(
dataProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit
createDataPlaneAcceptorsAndProcessors…FIXME
|
Note
|
createDataPlaneAcceptorsAndProcessors is used when SocketServer is requested to start up and addListeners.
|
createControlPlaneAcceptorAndProcessor Internal Method
createControlPlaneAcceptorAndProcessor(
endpointOpt: Option[EndPoint]): Unit
createControlPlaneAcceptorAndProcessor…FIXME
|
Note
|
createControlPlaneAcceptorAndProcessor is used when SocketServer is requested to start up.
|
startDataPlaneProcessors Internal Method
startDataPlaneProcessors(
authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit
startDataPlaneProcessors…FIXME
|
Note
|
startDataPlaneProcessors is used when…FIXME
|
createAcceptor Internal Method
createAcceptor(
endPoint: EndPoint,
metricPrefix: String) : Acceptor
createAcceptor…FIXME
|
Note
|
createAcceptor is used when…FIXME
|
Internal Properties
| Name | Description |
|---|---|
|
|
|
|
|
RequestChannel (with the queue size of maxQueuedRequests and the DataPlaneMetricPrefix metric name prefix) Initialized when Used to create the dataPlaneRequestProcessor and dataPlaneRequestHandlerPool for |
|
|
|
|
|
|
|
|
|
The number of processors per endpoint (as configured using num.network.threads Kafka property) |
|
Network processor threads per ID (initially totalProcessorThreads) New processor threads are added in addProcessors Used in stopProcessingRequests (to shut down the network processor threads) |
|
A RequestChannel (with queued.max.requests queue size) Used when:
|
|
Total number of processors, i.e. numProcessorThreads for every endpoint |