SocketServer
SocketServer
is a NIO socket server for a KafkaServer.
SocketServer
is created and then started up exclusively when KafkaServer
is requested to start up.
SocketServer
uses queued.max.requests configuration property for…FIXME
Name | Description |
---|---|
SocketServer
uses an empty metric prefix for…FIXME
SocketServer
uses ControlPlane metric prefix for…FIXME
Creating SocketServer Instance
SocketServer
takes the following to be created:
SocketServer
initializes the internal properties.
Control-Plane Request Handler for Controller-Brokers Communication — controlPlaneRequestChannelOpt
Internal Value
SocketServer
creates a dedicated RequestChannel for communication between a controller and brokers when control.plane.listener.name configuration property is defined.
The RequestChannel
is created for KafkaServer
(when requested to start up). It is the time when KafkaServer
creates dedicated control-plane request processor and control-plane request handler pool.
The RequestChannel
is created with the queue size of 20
and the ControlPlane metric name prefix.
SocketServer
manages the RequestChannel
(when requested to stopProcessingRequests and shutdown).
The RequestChannel
is assumed to be available when SocketServer
is requested to createControlPlaneAcceptorAndProcessor (when requested to start up) based on control.plane.listener.name configuration property.
Creating Network Processor Thread — newProcessor
Internal Method
newProcessor(
id: Int,
connectionQuotas: ConnectionQuotas,
listenerName: ListenerName,
securityProtocol: SecurityProtocol,
memoryPool: MemoryPool): Processor
newProcessor
simply creates a new Processor with the RequestChannel and the following configuration properties:
Note
|
newProcessor is used exclusively when SocketServer is requested to createControlPlaneAcceptorAndProcessor and addDataPlaneProcessors.
|
Starting Up — startup
Method
startup(
startupProcessors: Boolean = true): Unit
Internally, startup
creates the ConnectionQuotas (with maxConnectionsPerIp and maxConnectionsPerIpOverrides).
For every endpoint (in endpoints registry) startup
does the following:
-
Creates up to numProcessorThreads number of Processors (for ConnectionQuotas and MemoryPool)
-
Creates a
Acceptor
for the endpoint and processors -
Records the
Acceptor
in acceptors internal registry -
Starts a non-daemon thread for the
Acceptor
with the name askafka-socket-acceptor-[listenerName]-[securityProtocol]-[port]
(e.g.kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-9092
) and waits until it has started fully
startup
then registers metrics.
In the end, startup
prints out the following INFO message to the logs:
Started [dataPlaneAcceptors] acceptor threads for data-plane
Note
|
startup is used exclusively when KafkaServer is requested to start up.
|
addProcessors
Internal Method
addProcessors(
acceptor: Acceptor,
endpoint: EndPoint,
newProcessorsPerListener: Int): Unit
addProcessors
…FIXME
Note
|
addProcessors is used when SocketServer is requested to createAcceptorAndProcessors and resizeThreadPool.
|
createAcceptorAndProcessors
Internal Method
createAcceptorAndProcessors(
processorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit
createAcceptorAndProcessors
…FIXME
Note
|
createAcceptorAndProcessors is used when SocketServer is requested to startup and addListeners.
|
resizeThreadPool
Method
resizeThreadPool(
oldNumNetworkThreads: Int,
newNumNetworkThreads: Int): Unit
resizeThreadPool
…FIXME
Note
|
resizeThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure (the number of network threads).
|
addListeners
Method
addListeners(listenersAdded: Seq[EndPoint]): Unit
addListeners
…FIXME
Note
|
addListeners is used exclusively when DynamicListenerConfig is requested to reconfigure.
|
Stopping Request Processors — stopProcessingRequests
Method
stopProcessingRequests(): Unit
stopProcessingRequests
…FIXME
updateMaxConnectionsPerIpOverride
Method
updateMaxConnectionsPerIpOverride(
maxConnectionsPerIpOverrides: Map[String, Int]): Unit
updateMaxConnectionsPerIpOverride
…FIXME
Note
|
updateMaxConnectionsPerIpOverride is used when…FIXME
|
updateMaxConnectionsPerIp
Method
updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit
updateMaxConnectionsPerIp
…FIXME
Note
|
updateMaxConnectionsPerIp is used when…FIXME
|
removeListeners
Method
removeListeners(listenersRemoved: Seq[EndPoint]): Unit
removeListeners
…FIXME
Note
|
removeListeners is used when…FIXME
|
addDataPlaneProcessors
Internal Method
addDataPlaneProcessors(
acceptor: Acceptor,
endpoint: EndPoint,
newProcessorsPerListener: Int): Unit
addDataPlaneProcessors
…FIXME
Note
|
addDataPlaneProcessors is used when SocketServer is requested to createDataPlaneAcceptorsAndProcessors and resizeThreadPool.
|
createDataPlaneAcceptorsAndProcessors
Internal Method
createDataPlaneAcceptorsAndProcessors(
dataProcessorsPerListener: Int,
endpoints: Seq[EndPoint]): Unit
createDataPlaneAcceptorsAndProcessors
…FIXME
Note
|
createDataPlaneAcceptorsAndProcessors is used when SocketServer is requested to start up and addListeners.
|
createControlPlaneAcceptorAndProcessor
Internal Method
createControlPlaneAcceptorAndProcessor(
endpointOpt: Option[EndPoint]): Unit
createControlPlaneAcceptorAndProcessor
…FIXME
Note
|
createControlPlaneAcceptorAndProcessor is used when SocketServer is requested to start up.
|
startDataPlaneProcessors
Internal Method
startDataPlaneProcessors(
authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit
startDataPlaneProcessors
…FIXME
Note
|
startDataPlaneProcessors is used when…FIXME
|
createAcceptor
Internal Method
createAcceptor(
endPoint: EndPoint,
metricPrefix: String) : Acceptor
createAcceptor
…FIXME
Note
|
createAcceptor is used when…FIXME
|
Internal Properties
Name | Description |
---|---|
|
|
|
|
|
RequestChannel (with the queue size of maxQueuedRequests and the DataPlaneMetricPrefix metric name prefix) Initialized when Used to create the dataPlaneRequestProcessor and dataPlaneRequestHandlerPool for |
|
|
|
|
|
|
|
|
|
The number of processors per endpoint (as configured using num.network.threads Kafka property) |
|
Network processor threads per ID (initially totalProcessorThreads) New processor threads are added in addProcessors Used in stopProcessingRequests (to shut down the network processor threads) |
|
A RequestChannel (with queued.max.requests queue size) Used when:
|
|
Total number of processors, i.e. numProcessorThreads for every endpoint |