LogManager

LogManager is created and immediately started when KafkaServer is requested to start up.

LogManager.png
Figure 1. LogManager and KafkaServer

While being created, LogManager is given the log directories that are configured using log.dirs or log.dir configuration properties (default: /tmp/kafka-logs). The log directories are immediately validated and loaded (on a fixed thread pool with numRecoveryThreadsPerDataDir recovery threads).

LogManager uses the num.recovery.threads.per.data.dir dynamic configuration (default: 1) for the number of threads per log data directory for log recovery at startup and flushing at shutdown.

LogManager is used to create a ReplicaManager, a DynamicLogConfig, a TopicConfigHandler.

LogManager defaults to 30000ms for the initial task delay.

Tip

Enable ALL logging level for kafka.log.LogManager logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.log.LogManager=ALL

Refer to Logging.

Performance Metrics

LogManager is a KafkaMetricsGroup with the following performance metrics.

Table 1. LogManager’s Performance Metrics
Metric Name Description

OfflineLogDirectoryCount

The number of offline log directories

LogDirectoryOffline

Registered for every log directory to indicate whether it is online or offline

Possible values:

  • 0 when a log directory is online

  • 1 when a log directory is offline

The path of the directory is the tag of the metric.

The performance metrics are registered in kafka.log:type=LogManager group.

LogManager jconsole.png
Figure 2. LogManager in jconsole

Creating LogManager — apply Factory Method

apply(
  config: KafkaConfig,
  initialOfflineDirs: Seq[String],
  zkClient: KafkaZkClient,
  brokerState: BrokerState,
  kafkaScheduler: KafkaScheduler,
  time: Time,
  brokerTopicStats: BrokerTopicStats,
  logDirFailureChannel: LogDirFailureChannel): LogManager

apply…​FIXME

Note
apply is used exclusively when KafkaServer is requested to start up.

Creating LogManager Instance

LogManager takes the following to be created:

  • Absolute paths to log directories (Seq[File])

  • Initial offline directories (Seq[File])

  • Topic configurations - LogConfigs per topic name (Map[String, LogConfig])

  • Initial LogConfig

  • CleanerConfig

  • recoveryThreadsPerDataDir (based on the num.recovery.threads.per.data.dir dynamic configuration property)

  • flushCheckMs

  • flushRecoveryOffsetCheckpointMs

  • flushStartOffsetCheckpointMs

  • retentionCheckMs

  • maxPidExpirationMs

  • Scheduler

  • BrokerState

  • BrokerTopicStats

  • LogDirFailureChannel

  • Time

LogManager initializes the internal properties.

While being created, LogManager loads logs.

Recovering And Loading Logs In Log Data Directories — loadLogs Internal Method

loadLogs(): Unit

loadLogs prints out the following INFO message to the logs:

Loading logs.

For every live log directory, loadLogs first creates a fixed thread pool (with numRecoveryThreadsPerDataDir threads).

loadLogs then checks whether .kafka_cleanshutdown file exists in the log directory. If so, loadLogs prints out the following DEBUG message to the logs:

Found clean shutdown file. Skipping recovery for all logs in data directory: [dir]

loadLogs uses the recoveryPointCheckpoints to look up the OffsetCheckpointFile for the log directory (recovery-point-offset-checkpoint file) and then loads it.

loadLogs uses the logStartOffsetCheckpoints to look up the OffsetCheckpointFile for the log directory (recovery-point-offset-checkpoint file) and then loads it.

For every directory in the log directory, loadLogs creates a new thread to load the log directory with the recovery points and log start offsets (that have just been loaded).

loadLogs submits the new threads to load the log directory for execution on the fixed thread pool.

loadLogs then…​FIXME (finish me)

In the end, after having loaded the log directories successfully, loadLogs prints out the following INFO message to the logs:

Logs loading complete in [duration] ms.

In case .kafka_cleanshutdown file does not exist, loadLogs transitions the BrokerState to RecoveringFromUncleanShutdown.

In case of an exception while loading the OffsetCheckpointFile of a log directory (recovery-point-offset-checkpoint file), loadLogs simply prints out the following WARN messages to the logs:

Error occurred while reading recovery-point-offset-checkpoint file of directory [dir]
Resetting the recovery checkpoint to 0

In case of an exception while loading the OffsetCheckpointFile of a log directory (log-start-offset-checkpoint file), loadLogs simply prints out the following WARN messages to the logs:

Error occurred while reading log-start-offset-checkpoint file of directory [dir]

In case of an exception while load the log directory or any other task, loadLogs adds the log directory to a offlineDirs internal registry with the exception and prints out the following ERROR message to the logs:

Error while loading log dir [dir]
Note
loadLogs is used exclusively when LogManager is created.

Loading Partition Log Directory — loadLog Internal Method

loadLog(
  logDir: File,
  recoveryPoints: Map[TopicPartition, Long],
  logStartOffsets: Map[TopicPartition, Long]): Unit

loadLog first prints out the following DEBUG message to the logs:

Loading log '[logDir]'

loadLog gets the LogConfig for the topic (from the LogConfigs per topic) or defaults to the currentDefaultConfig.

loadLog gets logRecoveryPoint for the partition (from the given recoveryPoints) or defaults to 0.

loadLog gets logStartOffset for the partition (from the given logStartOffsets) or defaults to 0.

loadLog creates a Log.

In case the name of the given logDir ends with -delete suffix, loadLog addLogToBeDeleted.

Otherwise, loadLog adds the Log to the futureLogs or currentLogs internal registry whether it is isFuture or not, respectively.

In case there was Log already registered (the futureLogs or currentLogs internal registry) loadLog throws an IllegalStateException:

FIXME
Note
loadLog is used exclusively when LogManager is requested to recover and load the logs in log data directories.

Starting Up — startup Method

startup(): Unit

startup starts the background threads to flush logs and do log cleanup.

Internally, startup prints out the following INFO message to the logs:

Starting log cleanup with a period of [retentionCheckMs] ms.

startup requests the Scheduler to schedule a task with the name kafka-log-retention that cleanupLogs with the InitialTaskDelayMs delay and the retentionCheckMs execution period.

startup prints out the following INFO message to the logs:

Starting log flusher with a default period of [flushCheckMs] ms.

startup requests the Scheduler to schedule a task with the name kafka-log-flusher that flushDirtyLogs with the InitialTaskDelayMs delay and the flushCheckMs execution period.

startup requests the Scheduler to schedule a task with the name kafka-recovery-point-checkpoint that checkpointLogRecoveryOffsets with the InitialTaskDelayMs delay and the flushRecoveryOffsetCheckpointMs execution period.

startup requests the Scheduler to schedule a task with the name kafka-log-start-offset-checkpoint that checkpointLogStartOffsets with the InitialTaskDelayMs delay and the flushStartOffsetCheckpointMs execution period.

startup requests the Scheduler to schedule a task with the name kafka-delete-logs that deleteLogs with the InitialTaskDelayMs delay.

(only when the CleanerConfig has the enableCleaner flag enabled) startup requests the LogCleaner to start up.

Note
startup is used exclusively when KafkaServer is requested to start up.

BrokerTopicStats

When created, LogManager is given a BrokerTopicStats that is used exclusively to create Logs when recovering and loading logs in log data directories and looking up or creating a Log.

cleanupLogs Method

cleanupLogs(): Unit

cleanupLogs…​FIXME

Note
cleanupLogs is used when…​FIXME

Getting All Partition Logs — allLogs Method

allLogs: Iterable[Log]

allLogs…​FIXME

Note
allLogs is used when…​FIXME

addLogToBeDeleted Internal Method

addLogToBeDeleted(log: Log): Unit

addLogToBeDeleted…​FIXME

Note
addLogToBeDeleted is used when…​FIXME

asyncDelete Method

asyncDelete(
  topicPartition: TopicPartition,
  isFuture: Boolean = false): Log

asyncDelete…​FIXME

Note

asyncDelete is used when:

Looking Up Or Creating Log — getOrCreateLog Method

getOrCreateLog(
  topicPartition: TopicPartition,
  config: LogConfig,
  isNew: Boolean = false,
  isFuture: Boolean = false): Log

getOrCreateLog looks up the partition log and returns if found.

Otherwise, getOrCreateLog…​FIXME

Note
getOrCreateLog is used exclusively when Partition is requested to getOrCreateReplica.

getLog Method

getLog(
  topicPartition: TopicPartition,
  isFuture: Boolean = false): Option[Log]

getLog…​FIXME

Note
getLog is used when…​FIXME

liveLogDirs Method

liveLogDirs: Seq[File]

liveLogDirs…​FIXME

Note
liveLogDirs is used when…​FIXME

deleteLogs Internal Method

deleteLogs(): Unit

deleteLogs…​FIXME

Note
deleteLogs is used when…​FIXME

flushDirtyLogs Internal Method

flushDirtyLogs(): Unit

flushDirtyLogs…​FIXME

Note
flushDirtyLogs is used when…​FIXME

checkpointLogRecoveryOffsets Method

checkpointLogRecoveryOffsets(): Unit

checkpointLogRecoveryOffsets…​FIXME

Note
checkpointLogRecoveryOffsets is used when…​FIXME

checkpointLogStartOffsets Method

checkpointLogStartOffsets(): Unit

checkpointLogStartOffsets…​FIXME

Note
checkpointLogStartOffsets is used when…​FIXME

isLogDirOnline Method

isLogDirOnline(logDir: String): Boolean

isLogDirOnline…​FIXME

Note
isLogDirOnline is used when…​FIXME

Validating Data Log Directories — createAndValidateLogDirs Internal Method

createAndValidateLogDirs(
  dirs: Seq[File],
  initialOfflineDirs: Seq[File]): ConcurrentLinkedQueue[File]

For every directory in the given dirs, createAndValidateLogDirs makes sure that the data directory is available (i.e. it is a readable directory) or creates it.

createAndValidateLogDirs prints out the following INFO message to the logs when a data directory does not exist:

Log directory [dir] not found, creating it.
Note
createAndValidateLogDirs is given the logDirs and the initialOfflineDirs that LogManager is created with.

createAndValidateLogDirs throws…​FIXME

Note
createAndValidateLogDirs is used exclusively when LogManager is created.

truncateTo Method

truncateTo(
  partitionOffsets: Map[TopicPartition, Long],
  isFuture: Boolean): Unit

truncateTo…​FIXME

Note
truncateTo is used exclusively when Partition is requested to truncateTo.

truncateFullyAndStartAt Method

truncateFullyAndStartAt(
  topicPartition: TopicPartition,
  newOffset: Long,
  isFuture: Boolean): Unit

truncateFullyAndStartAt…​FIXME

Note
truncateFullyAndStartAt is used exclusively when Partition is requested to truncateFullyAndStartAt.

resizeRecoveryThreadPool Method

resizeRecoveryThreadPool(newSize: Int): Unit

resizeRecoveryThreadPool prints out the following INFO message to the logs and reconfigures the numRecoveryThreadsPerDataDir internal registry to be the given newSize.

Resizing recovery thread pool size for each data dir from [numRecoveryThreadsPerDataDir] to [newSize]
Note
resizeRecoveryThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure (with a new value of KafkaConfig.numRecoveryThreadsPerDataDir).

Shutting Down — shutdown Method

shutdown(): Unit

shutdown prints out the following INFO message to the logs:

Shutting down.

shutdown then…​FIXME

Note
shutdown is used exclusively when KafkaServer is requested to shutdown.

replaceCurrentWithFutureLog Method

replaceCurrentWithFutureLog(topicPartition: TopicPartition): Unit

replaceCurrentWithFutureLog…​FIXME

Note
replaceCurrentWithFutureLog is used exclusively when Partition is requested to maybeReplaceCurrentWithFutureReplica.

handleLogDirFailure Method

handleLogDirFailure(dir: String): Unit

handleLogDirFailure…​FIXME

Note
handleLogDirFailure is used exclusively when ReplicaManager is requested to handleLogDirFailure.

Internal Properties

Name Description

_liveLogDirs

Java’s ConcurrentLinkedQueue of live log directories (after createAndValidateLogDirs was executed with the logDirs and the initialOfflineDirs directories).

Used when…​FIXME

cleaner

currentDefaultConfig

Default LogConfig

Used when a custom LogConfig is not available in the topicConfigs

currentLogs

Pool of Logs per TopicPartition (Pool[TopicPartition, Log])

futureLogs

Pool of Logs per TopicPartition (Pool[TopicPartition, Log])

logStartOffsetCheckpoints

numRecoveryThreadsPerDataDir

Number of recovery threads per log data directory

Starts as the recoveryThreadsPerDataDir and can then be dynamically changed.

recoveryPointCheckpoints

results matching ""

    No results matching ""