LogManager

LogManager is created (via apply factory utility) and immediately started for KafkaServer when requested to start up.

LogManager.png
Figure 1. LogManager and KafkaServer

LogManager runs until (and is requested to shut down when) KafkaServer is requested to shut down.

LogManager uses the following configuration properties.

Table 1. LogManager’s Configuration Properties
Name Description

file.delete.delay.ms

log.cleaner.backoff.ms

log.cleaner.dedupe.buffer.size

log.cleaner.enable

Used to enable a LogCleaner

log.cleaner.io.buffer.load.factor

log.cleaner.io.buffer.size

log.cleaner.io.max.bytes.per.second

log.cleaner.threads

log.dirs or log.dir

Used for the log directories that are immediately validated and loaded (on a fixed thread pool with numRecoveryThreadsPerDataDir recovery threads)

log.flush.offset.checkpoint.interval.ms

log.flush.scheduler.interval.ms

log.flush.start.offset.checkpoint.interval.ms

log.retention.check.interval.ms

message.max.bytes

num.recovery.threads.per.data.dir

transactional.id.expiration.ms

LogManager is used to create a ReplicaManager, a DynamicLogConfig, a TopicConfigHandler, and a Partition.

LogManager defaults to 30000ms for the initial task delay.

Tip

Enable ALL logging level for kafka.log.LogManager logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.log.LogManager=ALL

Refer to Logging.

LogManager and LogCleaner

LogManager creates a LogCleaner only when the log.cleaner.enable flag (of the given CleanerConfig) is enabled (default: true).

LogManager uses LogCleaner for the following:

Performance Metrics

LogManager is a KafkaMetricsGroup with the following performance metrics.

Table 2. LogManager’s Performance Metrics
Metric Name Description

OfflineLogDirectoryCount

The number of offline log directories

LogDirectoryOffline

Registered for every log directory to indicate whether it is online or offline

Possible values:

  • 0 when a log directory is online

  • 1 when a log directory is offline

The path of the directory is the tag of the metric.

The performance metrics are registered in kafka.log:type=LogManager group.

LogManager jconsole.png
Figure 2. LogManager in jconsole

Creating LogManager — apply Utility

apply(
  config: KafkaConfig,
  initialOfflineDirs: Seq[String],
  zkClient: KafkaZkClient,
  brokerState: BrokerState,
  kafkaScheduler: KafkaScheduler,
  time: Time,
  brokerTopicStats: BrokerTopicStats,
  logDirFailureChannel: LogDirFailureChannel): LogManager

apply…​FIXME

Note
apply is used exclusively when KafkaServer is requested to start up.

Creating LogManager Instance

LogManager takes the following to be created:

LogManager initializes the internal properties.

While being created, LogManager loads logs.

abortAndPauseCleaning Method

abortAndPauseCleaning(
  topicPartition: TopicPartition): Unit

abortAndPauseCleaning…​FIXME

Note
abortAndPauseCleaning is used when ReplicaManager is requested to alterReplicaLogDirs and becomeLeaderOrFollower.

Recovering And Loading Logs In Log Data Directories — loadLogs Internal Method

loadLogs(): Unit

loadLogs prints out the following INFO message to the logs:

Loading logs.

For every live log directory, loadLogs first creates a fixed thread pool (with numRecoveryThreadsPerDataDir threads).

loadLogs then checks whether .kafka_cleanshutdown file exists in the log directory. If so, loadLogs prints out the following DEBUG message to the logs:

Found clean shutdown file. Skipping recovery for all logs in data directory: [dir]

loadLogs uses the recoveryPointCheckpoints to look up the OffsetCheckpointFile for the log directory (recovery-point-offset-checkpoint file) and then loads it.

loadLogs uses the logStartOffsetCheckpoints to look up the OffsetCheckpointFile for the log directory (recovery-point-offset-checkpoint file) and then loads it.

For every directory in the log directory, loadLogs creates a new thread to load the log directory with the recovery points and log start offsets (that have just been loaded).

loadLogs submits the new threads to load the log directory for execution on the fixed thread pool.

loadLogs then…​FIXME (finish me)

In the end, after having loaded the log directories successfully, loadLogs prints out the following INFO message to the logs:

Logs loading complete in [duration] ms.

In case .kafka_cleanshutdown file does not exist, loadLogs transitions the BrokerState to RecoveringFromUncleanShutdown.

In case of an exception while loading the OffsetCheckpointFile of a log directory (recovery-point-offset-checkpoint file), loadLogs simply prints out the following WARN messages to the logs:

Error occurred while reading recovery-point-offset-checkpoint file of directory [dir]
Resetting the recovery checkpoint to 0

In case of an exception while loading the OffsetCheckpointFile of a log directory (log-start-offset-checkpoint file), loadLogs simply prints out the following WARN messages to the logs:

Error occurred while reading log-start-offset-checkpoint file of directory [dir]

In case of an exception while load the log directory or any other task, loadLogs adds the log directory to a offlineDirs internal registry with the exception and prints out the following ERROR message to the logs:

Error while loading log dir [dir]
Note
loadLogs is used exclusively when LogManager is created.

Loading Partition Log Directory — loadLog Internal Method

loadLog(
  logDir: File,
  recoveryPoints: Map[TopicPartition, Long],
  logStartOffsets: Map[TopicPartition, Long]): Unit

loadLog first prints out the following DEBUG message to the logs:

Loading log '[logDir]'

loadLog gets the LogConfig for the topic (from the LogConfigs per topic) or defaults to the currentDefaultConfig.

loadLog gets logRecoveryPoint for the partition (from the given recoveryPoints) or defaults to 0.

loadLog gets logStartOffset for the partition (from the given logStartOffsets) or defaults to 0.

loadLog creates a Log.

In case the name of the given logDir ends with -delete suffix, loadLog addLogToBeDeleted.

Otherwise, loadLog adds the Log to the futureLogs or currentLogs internal registry whether it is isFuture or not, respectively.

In case there was Log already registered (the futureLogs or currentLogs internal registry), loadLog throws an IllegalStateException:

FIXME
Note
loadLog is used exclusively when LogManager is requested to recover and load the logs in log data directories.

Starting Up — startup Method

startup(): Unit

startup starts the background threads to flush logs and do log cleanup.

Internally, startup prints out the following INFO message to the logs:

Starting log cleanup with a period of [retentionCheckMs] ms.

startup requests the Scheduler to schedule a task with the name kafka-log-retention that cleanupLogs with the InitialTaskDelayMs delay and the retentionCheckMs execution period.

startup prints out the following INFO message to the logs:

Starting log flusher with a default period of [flushCheckMs] ms.

startup requests the Scheduler to schedule a task with the name kafka-log-flusher that flushDirtyLogs with the InitialTaskDelayMs delay and the flushCheckMs execution period.

startup requests the Scheduler to schedule a task with the name kafka-recovery-point-checkpoint that checkpointLogRecoveryOffsets with the InitialTaskDelayMs delay and the flushRecoveryOffsetCheckpointMs execution period.

startup requests the Scheduler to schedule a task with the name kafka-log-start-offset-checkpoint that checkpointLogStartOffsets with the InitialTaskDelayMs delay and the flushStartOffsetCheckpointMs execution period.

startup requests the Scheduler to schedule a task with the name kafka-delete-logs that deleteLogs with the InitialTaskDelayMs delay.

(only when the CleanerConfig has the enableCleaner flag enabled) startup requests the LogCleaner to start up.

Note
startup is used exclusively when KafkaServer is requested to start up.

BrokerTopicStats

When created, LogManager is given a BrokerTopicStats that is used exclusively to create Logs when recovering and loading logs in log data directories and looking up or creating a Log.

cleanupLogs Method

cleanupLogs(): Unit

cleanupLogs prints out the following DEBUG message to the logs:

Beginning log cleanup...

cleanupLogs finds so-called deletable (non-compacted) logs by requesting the LogCleaner (if used) to pauseCleaningForNonCompactedPartitions or simply finds all logs in the currentLogs internal registry that are not compacted (by the compact flag of the LogConfig of the Log).

For every deletable log, cleanupLogs prints out the following DEBUG message to the logs:

Garbage collecting '[log.name]'

cleanupLogs requests the log to deleteOldSegments.

cleanupLogs finds the future log for the partition (of the deletable log) in the futureLogs internal registry and, if available, prints out the following DEBUG message to the logs followed by requesting it to deleteOldSegments.

Garbage collecting future log '[futureLog.name]'

In the end, cleanupLogs requests the LogCleaner (if used) to resumeCleaning and prints out the following DEBUG message to the logs:

Log cleanup completed. [total] files deleted in [duration] seconds
Note
cleanupLogs is used when LogManager is requested to start up (and schedules kafka-log-retention periodic task).

Getting All Partition Logs — allLogs Method

allLogs: Iterable[Log]

allLogs…​FIXME

Note
allLogs is used when…​FIXME

addLogToBeDeleted Internal Method

addLogToBeDeleted(log: Log): Unit

addLogToBeDeleted…​FIXME

Note
addLogToBeDeleted is used when…​FIXME

asyncDelete Method

asyncDelete(
  topicPartition: TopicPartition,
  isFuture: Boolean = false): Log

asyncDelete…​FIXME

Note

asyncDelete is used when:

Looking Up Or Creating New Partition Log — getOrCreateLog Method

getOrCreateLog(
  topicPartition: TopicPartition,
  config: LogConfig,
  isNew: Boolean = false,
  isFuture: Boolean = false): Log

getOrCreateLog looks up the partition log for the given TopicPartition (and returns it if found) or creates a new one.

When looking up the partition log was unsuccessful, getOrCreateLog finds the log directory for the TopicPartition. getOrCreateLog finds one in the preferredLogDirs internal registry and falls back on nextLogDirs (that simply gives all the log directories sorted by the number of partition logs).

getOrCreateLog creates the directory name for the TopicPartition (based on the given isFuture flag).

getOrCreateLog tries to createLogDirectory in the available log directories (one by one) until successful.

getOrCreateLog creates a new Log (using the maxPidExpirationMs and LogManager.ProducerIdExpirationCheckIntervalMs configuration properties).

getOrCreateLog registers the new Log with the TopicPartition in the futureLogs or currentLogs internal registries (based on the given isFuture flag).

getOrCreateLog prints out the following INFO message to the logs:

Created log for partition [topicPartition] in [logDir] with properties [config].

getOrCreateLog removes the TopicPartition from the preferredLogDirs internal registry.

Note
getOrCreateLog is used exclusively when Partition is requested to getOrCreateReplica.

getLog Method

getLog(
  topicPartition: TopicPartition,
  isFuture: Boolean = false): Option[Log]

getLog…​FIXME

Note
getLog is used when…​FIXME

liveLogDirs Method

liveLogDirs: Seq[File]

liveLogDirs…​FIXME

Note
liveLogDirs is used when…​FIXME

deleteLogs Internal Method

deleteLogs(): Unit

deleteLogs…​FIXME

Note
deleteLogs is used when…​FIXME

checkpointLogRecoveryOffsets Method

checkpointLogRecoveryOffsets(): Unit

checkpointLogRecoveryOffsets…​FIXME

Note
checkpointLogRecoveryOffsets is used when…​FIXME

checkpointLogStartOffsets Method

checkpointLogStartOffsets(): Unit

checkpointLogStartOffsets…​FIXME

Note
checkpointLogStartOffsets is used when…​FIXME

isLogDirOnline Method

isLogDirOnline(logDir: String): Boolean

isLogDirOnline…​FIXME

Note
isLogDirOnline is used when…​FIXME

Validating Data Log Directories — createAndValidateLogDirs Internal Method

createAndValidateLogDirs(
  dirs: Seq[File],
  initialOfflineDirs: Seq[File]): ConcurrentLinkedQueue[File]

For every directory in the given dirs, createAndValidateLogDirs makes sure that the data directory is available (i.e. it is a readable directory) or creates it.

createAndValidateLogDirs prints out the following INFO message to the logs when a data directory does not exist:

Log directory [dir] not found, creating it.
Note
createAndValidateLogDirs is given the logDirs and the initialOfflineDirs that LogManager is created with.

createAndValidateLogDirs throws…​FIXME

Note
createAndValidateLogDirs is used exclusively when LogManager is created.

truncateTo Method

truncateTo(
  partitionOffsets: Map[TopicPartition, Long],
  isFuture: Boolean): Unit

truncateTo…​FIXME

Note
truncateTo is used when Partition is requested to truncateTo.

truncateFullyAndStartAt Method

truncateFullyAndStartAt(
  topicPartition: TopicPartition,
  newOffset: Long,
  isFuture: Boolean): Unit

truncateFullyAndStartAt…​FIXME

Note
truncateFullyAndStartAt is used exclusively when Partition is requested to truncateFullyAndStartAt.

resizeRecoveryThreadPool Method

resizeRecoveryThreadPool(newSize: Int): Unit

resizeRecoveryThreadPool prints out the following INFO message to the logs and reconfigures the numRecoveryThreadsPerDataDir internal registry to be the given newSize.

Resizing recovery thread pool size for each data dir from [numRecoveryThreadsPerDataDir] to [newSize]
Note
resizeRecoveryThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure (with a new value of KafkaConfig.numRecoveryThreadsPerDataDir).

Shutting Down — shutdown Method

shutdown(): Unit

shutdown prints out the following INFO message to the logs:

Shutting down.

shutdown then…​FIXME

Note
shutdown is used exclusively when KafkaServer is requested to shutdown.

replaceCurrentWithFutureLog Method

replaceCurrentWithFutureLog(topicPartition: TopicPartition): Unit

replaceCurrentWithFutureLog…​FIXME

Note
replaceCurrentWithFutureLog is used exclusively when Partition is requested to maybeReplaceCurrentWithFutureReplica.

handleLogDirFailure Method

handleLogDirFailure(
  dir: String): Unit

handleLogDirFailure…​FIXME

Note
handleLogDirFailure is used exclusively when ReplicaManager is requested to handleLogDirFailure.

Marking Start of Partition Log Initialization — initializingLog Method

initializingLog(
  topicPartition: TopicPartition): Unit

initializingLog…​FIXME

Note
initializingLog is used when Partition is requested to createLog.

Marking End of Partition Log Initialization — finishedInitializingLog Method

finishedInitializingLog(
  topicPartition: TopicPartition,
  maybeLog: Option[Log],
  fetchLogConfig: () => LogConfig): Unit

finishedInitializingLog…​FIXME

Note
finishedInitializingLog is used when Partition is requested to createLog.

flushDirtyLogs Internal Method

flushDirtyLogs(): Unit

flushDirtyLogs prints out the following DEBUG message to the logs:

Checking for dirty logs to flush...

flushDirtyLogs…​FIXME

Note
flushDirtyLogs is used exclusively for the kafka-log-flusher task (when LogManager is requested to start up).

createLogDirectory Internal Method

createLogDirectory(
  logDir: File,
  logDirName: String): Try[File]

createLogDirectory…​FIXME

Note
createLogDirectory is used when LogManager is requested to look up or create a new partition log.

nextLogDirs Internal Method

nextLogDirs(): List[File]

nextLogDirs…​FIXME

Note
nextLogDirs is used when LogManager is requested to look up or create a new partition log.

Internal Properties

Name Description

_liveLogDirs

Java’s ConcurrentLinkedQueue of live log directories (after createAndValidateLogDirs was executed with the logDirs and the initialOfflineDirs directories).

Used when…​FIXME

currentDefaultConfig

Default LogConfig

Used when a custom LogConfig is not available in the topicConfigs

currentLogs

Pool of Logs per TopicPartition (Pool[TopicPartition, Log])

futureLogs

Pool of Logs per TopicPartition (Pool[TopicPartition, Log])

logStartOffsetCheckpoints

numRecoveryThreadsPerDataDir

Number of recovery threads per log data directory

Starts as the recoveryThreadsPerDataDir and can then be dynamically changed.

preferredLogDirs

preferredLogDirs: ConcurrentHashMap[TopicPartition, String]

recoveryPointCheckpoints

results matching ""

    No results matching ""