SocketServer

SocketServer is a NIO socket server.

SocketServer is created exclusively when KafkaServer is started.

SocketServer startup.png
Figure 1. SocketServer’s Startup

SocketServer uses queued.max.requests configuration property for…​FIXME

Table 1. SocketServer’s Metrics (in kafka.network group)
Name Description

NetworkProcessorAvgIdlePercent

MemoryPoolAvailable

MemoryPoolUsed

Table 2. SocketServer’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

acceptors

Acceptor threads per EndPoint

connectionQuotas

ConnectionQuotas

endpoints

EndPoints (aka listeners) per name (as configured using listeners Kafka property)

maxConnectionsPerIp

maxConnectionsPerIpOverrides

memoryPool

MemoryPool

numProcessorThreads

The number of processors per endpoint (as configured using num.network.threads Kafka property)

processors

Network processor threads per ID (initially totalProcessorThreads)

New processor threads are added in addProcessors

Used in stopProcessingRequests (to shut down the network processor threads)

requestChannel

A RequestChannel (with queued.max.requests queue size)

Used when:

totalProcessorThreads

Total number of processors, i.e. numProcessorThreads for every endpoint

Creating Network Processor Thread — newProcessor Internal Method

newProcessor(
  id: Int,
  connectionQuotas: ConnectionQuotas,
  listenerName: ListenerName,
  securityProtocol: SecurityProtocol,
  memoryPool: MemoryPool): Processor

newProcessor simply creates a new Processor with the RequestChannel and the following configuration properties:

Note
newProcessor is used exclusively when SocketServer is requested to addProcessors.

Starting Up (and Auxiliary Services) — startup Method

startup(startupProcessors: Boolean = true): Unit

Internally, startup creates the ConnectionQuotas (with maxConnectionsPerIp and maxConnectionsPerIpOverrides).

For every endpoint (in endpoints registry) startup does the following:

  1. Creates up to numProcessorThreads number of Processors (for ConnectionQuotas and MemoryPool)

  2. Creates a Acceptor for the endpoint and processors

  3. Records the Acceptor in acceptors internal registry

  4. Starts a non-daemon thread for the Acceptor with the name as kafka-socket-acceptor-[listenerName]-[securityProtocol]-[port] (e.g. kafka-socket-acceptor-ListenerName(PLAINTEXT)-PLAINTEXT-9092) and waits until it has started fully

startup then registers metrics.

In the end, startup prints out the following INFO message to the logs:

INFO [SocketServer brokerId=[brokerID]] Started [number] acceptor threads
Note
startup is used exclusively when KafkaServer is requested to start up.

Creating SocketServer Instance

SocketServer takes the following when created:

SocketServer initializes the internal registries and counters.

addProcessors Internal Method

addProcessors(
  acceptor: Acceptor,
  endpoint: EndPoint,
  newProcessorsPerListener: Int): Unit

addProcessors…​FIXME

Note
addProcessors is used when SocketServer is requested to createAcceptorAndProcessors and resizeThreadPool.

createAcceptorAndProcessors Internal Method

createAcceptorAndProcessors(
  processorsPerListener: Int,
  endpoints: Seq[EndPoint]): Unit

createAcceptorAndProcessors…​FIXME

Note
createAcceptorAndProcessors is used when SocketServer is requested to startup and addListeners.

resizeThreadPool Method

resizeThreadPool(
  oldNumNetworkThreads: Int,
  newNumNetworkThreads: Int): Unit

resizeThreadPool…​FIXME

Note
resizeThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure (the number of network threads).

addListeners Method

addListeners(listenersAdded: Seq[EndPoint]): Unit

addListeners…​FIXME

Note
addListeners is used exclusively when DynamicListenerConfig is requested to reconfigure.

Stopping Request Processors — stopProcessingRequests Method

stopProcessingRequests(): Unit

stopProcessingRequests…​FIXME

Note

stopProcessingRequests is used when:

Shutting Down — shutdown Method

shutdown(): Unit

shutdown…​FIXME

Note
shutdown is used when…​FIXME

updateMaxConnectionsPerIpOverride Method

updateMaxConnectionsPerIpOverride(
  maxConnectionsPerIpOverrides: Map[String, Int]): Unit

updateMaxConnectionsPerIpOverride…​FIXME

Note
updateMaxConnectionsPerIpOverride is used when…​FIXME

updateMaxConnectionsPerIp Method

updateMaxConnectionsPerIp(maxConnectionsPerIp: Int): Unit

updateMaxConnectionsPerIp…​FIXME

Note
updateMaxConnectionsPerIp is used when…​FIXME

removeListeners Method

removeListeners(listenersRemoved: Seq[EndPoint]): Unit

removeListeners…​FIXME

Note
removeListeners is used when…​FIXME

results matching ""

    No results matching ""