LogManager

LogManager is created and immediately started when KafkaServer is requested to start up.

LogManager.png
Figure 1. LogManager and KafkaServer

While being created, LogManager is given the log directories that are configured using log.dirs or log.dir configuration properties (default: /tmp/kafka-logs). The log directories are immediately validated and loaded.

LogManager is a KafkaMetricsGroup and registers performance metrics.

Table 1. LogManager’s Performance Metrics
Metric Name Description

OfflineLogDirectoryCount

The number of offline log directories

LogDirectoryOffline

Registered for every log directory to indicate whether it is online or offline

Possible values:

  • 0 when a log directory is online

  • 1 when a log directory is offline

The path of the directory is the tag of the metric.

The performance metrics are registered in kafka.log:type=LogManager group.

LogManager jconsole.png
Figure 2. LogManager in jconsole

LogManager uses the num.recovery.threads.per.data.dir dynamic configuration (default: 1) for the number of threads per log data directory for log recovery at startup and flushing at shutdown.

LogManager is used to create a ReplicaManager, a DynamicLogConfig, a TopicConfigHandler.

LogManager defaults to 30000ms for the initial task delay.

Table 2. LogManager’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

_liveLogDirs

Java’s ConcurrentLinkedQueue of live log directories (after createAndValidateLogDirs was executed with the logDirs and the initialOfflineDirs directories).

Used when…​FIXME

cleaner

numRecoveryThreadsPerDataDir

Number of recovery threads per log data directory

Starts as the recoveryThreadsPerDataDir and can then be dynamically changed.

Tip

Enable INFO or DEBUG logging level for kafka.log.LogManager logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.log.LogManager=DEBUG

Refer to Logging.

Starting Up — startup Method

startup(): Unit

startup starts the background threads to flush logs and do log cleanup.

Internally, startup prints out the following INFO message to the logs:

Starting log cleanup with a period of [retentionCheckMs] ms.

startup requests the Scheduler to schedule a task with the name kafka-log-retention that cleanupLogs with the InitialTaskDelayMs delay, the retentionCheckMs execution period.

startup prints out the following INFO message to the logs:

Starting log flusher with a default period of [flushCheckMs] ms.

startup requests the Scheduler to schedule a task with…​FIXME

Note
startup is used when…​FIXME

cleanupLogs Method

cleanupLogs(): Unit

cleanupLogs…​FIXME

Note
cleanupLogs is used when…​FIXME

Getting All Partition Logs — allLogs Method

allLogs: Iterable[Log]

allLogs…​FIXME

Note
allLogs is used when…​FIXME

addLogToBeDeleted Internal Method

addLogToBeDeleted(log: Log): Unit

addLogToBeDeleted…​FIXME

Note
addLogToBeDeleted is used when…​FIXME

asyncDelete Method

asyncDelete(
  topicPartition: TopicPartition,
  isFuture: Boolean = false): Log

asyncDelete…​FIXME

Note

asyncDelete is used when:

getOrCreateLog Method

getOrCreateLog(
  topicPartition: TopicPartition,
  config: LogConfig,
  isNew: Boolean = false,
  isFuture: Boolean = false): Log

getOrCreateLog…​FIXME

Note
getOrCreateLog is used exclusively when Partition is requested to getOrCreateReplica.

loadLog Internal Method

loadLog(
  logDir: File,
  recoveryPoints: Map[TopicPartition, Long],
  logStartOffsets: Map[TopicPartition, Long]): Unit

loadLog…​FIXME

Note
loadLog is used exclusively when LogManager is requested to loadLogs.

Loading Logs — loadLogs Internal Method

loadLogs(): Unit

loadLogs prints out the following INFO message to the logs:

Loading logs.

For every live log directory, loadLogs…​FIXME

Note
loadLogs is used exclusively when LogManager is created.

Creating LogManager Instance

LogManager takes the following when created:

  • Absolute paths to log directories

  • Initial offline directories

  • Topic configurations (Map[String, LogConfig])

  • Initial LogConfig

  • CleanerConfig

  • recoveryThreadsPerDataDir (based on the num.recovery.threads.per.data.dir dynamic configuration property)

  • flushCheckMs

  • flushRecoveryOffsetCheckpointMs

  • flushStartOffsetCheckpointMs

  • retentionCheckMs

  • maxPidExpirationMs

  • Scheduler

  • BrokerState

  • BrokerTopicStats

  • LogDirFailureChannel

  • Time

LogManager initializes the internal registries and counters.

While being created, LogManager load logs.

Creating LogManager — apply Factory Method

apply(
  config: KafkaConfig,
  initialOfflineDirs: Seq[String],
  zkClient: KafkaZkClient,
  brokerState: BrokerState,
  kafkaScheduler: KafkaScheduler,
  time: Time,
  brokerTopicStats: BrokerTopicStats,
  logDirFailureChannel: LogDirFailureChannel): LogManager

apply…​FIXME

Note
apply is used exclusively when KafkaServer is requested to start up.

liveLogDirs Method

liveLogDirs: Seq[File]

liveLogDirs…​FIXME

Note
liveLogDirs is used when…​FIXME

deleteLogs Internal Method

deleteLogs(): Unit

deleteLogs…​FIXME

Note
deleteLogs is used when…​FIXME

flushDirtyLogs Internal Method

flushDirtyLogs(): Unit

flushDirtyLogs…​FIXME

Note
flushDirtyLogs is used when…​FIXME

checkpointLogRecoveryOffsets Method

checkpointLogRecoveryOffsets(): Unit

checkpointLogRecoveryOffsets…​FIXME

Note
checkpointLogRecoveryOffsets is used when…​FIXME

checkpointLogStartOffsets Method

checkpointLogStartOffsets(): Unit

checkpointLogStartOffsets…​FIXME

Note
checkpointLogStartOffsets is used when…​FIXME

isLogDirOnline Method

isLogDirOnline(logDir: String): Boolean

isLogDirOnline…​FIXME

Note
isLogDirOnline is used when…​FIXME

Validating Data Log Directories — createAndValidateLogDirs Internal Method

createAndValidateLogDirs(
  dirs: Seq[File],
  initialOfflineDirs: Seq[File]): ConcurrentLinkedQueue[File]

For every directory in the given dirs, createAndValidateLogDirs makes sure that the data directory is available (i.e. it is a readable directory) or creates it.

createAndValidateLogDirs prints out the following INFO message to the logs when a data directory does not exist:

Log directory [dir] not found, creating it.
Note
createAndValidateLogDirs is given the logDirs and the initialOfflineDirs that LogManager is created with.

createAndValidateLogDirs throws…​FIXME

Note
createAndValidateLogDirs is used exclusively when LogManager is created.

truncateTo Method

truncateTo(
  partitionOffsets: Map[TopicPartition, Long],
  isFuture: Boolean): Unit

truncateTo…​FIXME

Note
truncateTo is used exclusively when Partition is requested to truncateTo.

truncateFullyAndStartAt Method

truncateFullyAndStartAt(
  topicPartition: TopicPartition,
  newOffset: Long,
  isFuture: Boolean): Unit

truncateFullyAndStartAt…​FIXME

Note
truncateFullyAndStartAt is used exclusively when Partition is requested to truncateFullyAndStartAt.

resizeRecoveryThreadPool Method

resizeRecoveryThreadPool(newSize: Int): Unit

resizeRecoveryThreadPool prints out the following INFO message to the logs and reconfigures the numRecoveryThreadsPerDataDir internal registry to be the given newSize.

Resizing recovery thread pool size for each data dir from [numRecoveryThreadsPerDataDir] to [newSize]
Note
resizeRecoveryThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure (with a new value of KafkaConfig.numRecoveryThreadsPerDataDir).

Shutting Down — shutdown Method

shutdown(): Unit

shutdown prints out the following INFO message to the logs:

Shutting down.

shutdown then…​FIXME

Note
shutdown is used exclusively when KafkaServer is requested to shutdown.

replaceCurrentWithFutureLog Method

replaceCurrentWithFutureLog(topicPartition: TopicPartition): Unit

replaceCurrentWithFutureLog…​FIXME

Note
replaceCurrentWithFutureLog is used exclusively when Partition is requested to maybeReplaceCurrentWithFutureReplica.

handleLogDirFailure Method

handleLogDirFailure(dir: String): Unit

handleLogDirFailure…​FIXME

Note
handleLogDirFailure is used exclusively when ReplicaManager is requested to handleLogDirFailure.

results matching ""

    No results matching ""