SocketServer

SocketServer is a NIO socket server for a KafkaServer.

SocketServer is created and then started up exclusively when KafkaServer is requested to start up.

SocketServer startup.png
Figure 1. SocketServer’s Startup

SocketServer uses queued.max.requests configuration property for…​FIXME

Table 1. SocketServer’s Metrics (in kafka.network group)
Name Description

NetworkProcessorAvgIdlePercent

MemoryPoolAvailable

MemoryPoolUsed

SocketServer uses an empty metric prefix for…​FIXME

SocketServer uses ControlPlane metric prefix for…​FIXME

Creating SocketServer Instance

SocketServer takes the following to be created:

SocketServer initializes the internal properties.

Control-Plane Request Handler for Controller-Brokers Communication — controlPlaneRequestChannelOpt Internal Value

SocketServer creates a dedicated RequestChannel for communication between a controller and brokers when control.plane.listener.name configuration property is defined.

The RequestChannel is created for KafkaServer (when requested to start up). It is the time when KafkaServer creates dedicated control-plane request processor and control-plane request handler pool.

The RequestChannel is created with the queue size of 20 and the ControlPlane metric name prefix.

SocketServer manages the RequestChannel (when requested to stopProcessingRequests and shutdown).

The RequestChannel is assumed to be available when SocketServer is requested to createControlPlaneAcceptorAndProcessor (when requested to start up) based on control.plane.listener.name configuration property.

Creating Network Processor Thread — newProcessor Internal Method

newProcessor(
  id: Int,
  connectionQuotas: ConnectionQuotas,
  listenerName: ListenerName,
  securityProtocol: SecurityProtocol,
  memoryPool: MemoryPool): Processor

newProcessor simply creates a new Processor with the RequestChannel and the following configuration properties:

Note
newProcessor is used exclusively when SocketServer is requested to createControlPlaneAcceptorAndProcessor and addDataPlaneProcessors.

Starting Up — startup Method

startup(
  startupProcessors: Boolean = true): Unit

Internally, startup creates the ConnectionQuotas (with maxConnectionsPerIp and maxConnectionsPerIpOverrides).

For every endpoint (in endpoints registry) startup does the following:

  1. Creates up to numProcessorThreads number of Processors (for ConnectionQuotas and MemoryPool)

  2. Creates a Acceptor for the endpoint and processors

  3. Records the Acceptor in acceptors internal registry

  4. Starts a non-daemon thread for the Acceptor with the name as kafka-socket-acceptor-[listenerName]-[securityProtocol]-[port] (e.g. kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-9092) and waits until it has started fully

startup then registers metrics.

In the end, startup prints out the following INFO message to the logs:

Started [dataPlaneAcceptors] acceptor threads for data-plane
Note
startup is used exclusively when KafkaServer is requested to start up.

addProcessors Internal Method

addProcessors(
  acceptor: Acceptor,
  endpoint: EndPoint,
  newProcessorsPerListener: Int): Unit

addProcessors…​FIXME

Note
addProcessors is used when SocketServer is requested to createAcceptorAndProcessors and resizeThreadPool.

createAcceptorAndProcessors Internal Method

createAcceptorAndProcessors(
  processorsPerListener: Int,
  endpoints: Seq[EndPoint]): Unit

createAcceptorAndProcessors…​FIXME

Note
createAcceptorAndProcessors is used when SocketServer is requested to startup and addListeners.

resizeThreadPool Method

resizeThreadPool(
  oldNumNetworkThreads: Int,
  newNumNetworkThreads: Int): Unit

resizeThreadPool…​FIXME

Note
resizeThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure (the number of network threads).

addListeners Method

addListeners(listenersAdded: Seq[EndPoint]): Unit

addListeners…​FIXME

Note
addListeners is used exclusively when DynamicListenerConfig is requested to reconfigure.

Stopping Request Processors — stopProcessingRequests Method

stopProcessingRequests(): Unit

stopProcessingRequests…​FIXME

Note

stopProcessingRequests is used when:

Shutting Down — shutdown Method

shutdown(): Unit

shutdown…​FIXME

Note
shutdown is used when…​FIXME

updateMaxConnectionsPerIpOverride Method

updateMaxConnectionsPerIpOverride(
  maxConnectionsPerIpOverrides: Map[String, Int]): Unit

updateMaxConnectionsPerIpOverride…​FIXME

Note
updateMaxConnectionsPerIpOverride is used when…​FIXME

updateMaxConnectionsPerIp Method

updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit

updateMaxConnectionsPerIp…​FIXME

Note
updateMaxConnectionsPerIp is used when…​FIXME

removeListeners Method

removeListeners(listenersRemoved: Seq[EndPoint]): Unit

removeListeners…​FIXME

Note
removeListeners is used when…​FIXME

addDataPlaneProcessors Internal Method

addDataPlaneProcessors(
  acceptor: Acceptor,
  endpoint: EndPoint,
  newProcessorsPerListener: Int): Unit

addDataPlaneProcessors…​FIXME

Note
addDataPlaneProcessors is used when SocketServer is requested to createDataPlaneAcceptorsAndProcessors and resizeThreadPool.

createDataPlaneAcceptorsAndProcessors Internal Method

createDataPlaneAcceptorsAndProcessors(
  dataProcessorsPerListener: Int,
  endpoints: Seq[EndPoint]): Unit

createDataPlaneAcceptorsAndProcessors…​FIXME

Note
createDataPlaneAcceptorsAndProcessors is used when SocketServer is requested to start up and addListeners.

createControlPlaneAcceptorAndProcessor Internal Method

createControlPlaneAcceptorAndProcessor(
  endpointOpt: Option[EndPoint]): Unit

createControlPlaneAcceptorAndProcessor…​FIXME

Note
createControlPlaneAcceptorAndProcessor is used when SocketServer is requested to start up.

startDataPlaneProcessors Internal Method

startDataPlaneProcessors(
  authorizerFutures: Map[Endpoint, CompletableFuture[Void]] = Map.empty): Unit

startDataPlaneProcessors…​FIXME

Note
startDataPlaneProcessors is used when…​FIXME

createAcceptor Internal Method

createAcceptor(
  endPoint: EndPoint,
  metricPrefix: String) : Acceptor

createAcceptor…​FIXME

Note
createAcceptor is used when…​FIXME

Internal Properties

Name Description

acceptors

Acceptor threads per EndPoint

connectionQuotas

ConnectionQuotas

dataPlaneRequestChannel

RequestChannel (with the queue size of maxQueuedRequests and the DataPlaneMetricPrefix metric name prefix)

Initialized when SocketServer is requested to addDataPlaneProcessors

Used to create the dataPlaneRequestProcessor and dataPlaneRequestHandlerPool for KafkaServer

endpoints

EndPoints (aka listeners) per name (as configured using listeners Kafka property)

maxConnectionsPerIp

maxConnectionsPerIpOverrides

memoryPool

MemoryPool

numProcessorThreads

The number of processors per endpoint (as configured using num.network.threads Kafka property)

processors

Network processor threads per ID (initially totalProcessorThreads)

New processor threads are added in addProcessors

Used in stopProcessingRequests (to shut down the network processor threads)

requestChannel

A RequestChannel (with queued.max.requests queue size)

Used when:

totalProcessorThreads

Total number of processors, i.e. numProcessorThreads for every endpoint

results matching ""

    No results matching ""