LogManager
LogManager is created (via apply factory utility) and immediately started for KafkaServer when requested to start up.
LogManager uses the following configuration properties.
| Name | Description |
|---|---|
Used to enable a LogCleaner |
|
Used for the log directories that are immediately validated and loaded (on a fixed thread pool with numRecoveryThreadsPerDataDir recovery threads) |
|
Number of threads per log data directory for log recovery at startup and flushing at shutdown |
|
LogManager is used to create a ReplicaManager, a DynamicLogConfig, a TopicConfigHandler, and a Partition.
LogManager defaults to 30000ms for the initial task delay.
|
Tip
|
Enable Add the following line to
Refer to Logging. |
LogManager and LogCleaner
LogManager creates a LogCleaner only when the log.cleaner.enable flag (of the given CleanerConfig) is enabled (default: true).
LogManager uses LogCleaner for the following:
Performance Metrics
LogManager is a KafkaMetricsGroup with the following performance metrics.
| Metric Name | Description |
|---|---|
|
|
|
Registered for every log directory to indicate whether it is online or offline Possible values:
The path of the directory is the tag of the metric. |
The performance metrics are registered in kafka.log:type=LogManager group.
Creating LogManager — apply Utility
apply(
config: KafkaConfig,
initialOfflineDirs: Seq[String],
zkClient: KafkaZkClient,
brokerState: BrokerState,
kafkaScheduler: KafkaScheduler,
time: Time,
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel): LogManager
apply…FIXME
|
Note
|
apply is used exclusively when KafkaServer is requested to start up.
|
Creating LogManager Instance
LogManager takes the following to be created:
-
Topic configurations - LogConfigs per topic name
-
Initial LogConfig
-
num.recovery.threads.per.data.dir dynamic configuration property
-
log.flush.scheduler.interval.ms configuration property
-
log.flush.offset.checkpoint.interval.ms configuration property
-
log.flush.start.offset.checkpoint.interval.ms configuration property
-
log.retention.check.interval.ms configuration property
-
transactional.id.expiration.ms configuration property
LogManager initializes the internal properties.
While being created, LogManager loads logs.
abortAndPauseCleaning Method
abortAndPauseCleaning(
topicPartition: TopicPartition): Unit
abortAndPauseCleaning…FIXME
|
Note
|
abortAndPauseCleaning is used when ReplicaManager is requested to alterReplicaLogDirs and becomeLeaderOrFollower.
|
Recovering And Loading Logs In Log Data Directories — loadLogs Internal Method
loadLogs(): Unit
loadLogs prints out the following INFO message to the logs:
Loading logs.
For every live log directory, loadLogs first creates a fixed thread pool (with numRecoveryThreadsPerDataDir threads).
loadLogs then checks whether .kafka_cleanshutdown file exists in the log directory. If so, loadLogs prints out the following DEBUG message to the logs:
Found clean shutdown file. Skipping recovery for all logs in data directory: [dir]
loadLogs uses the recoveryPointCheckpoints to look up the OffsetCheckpointFile for the log directory (recovery-point-offset-checkpoint file) and then loads it.
loadLogs uses the logStartOffsetCheckpoints to look up the OffsetCheckpointFile for the log directory (recovery-point-offset-checkpoint file) and then loads it.
For every directory in the log directory, loadLogs creates a new thread to load the log directory with the recovery points and log start offsets (that have just been loaded).
loadLogs submits the new threads to load the log directory for execution on the fixed thread pool.
loadLogs then…FIXME (finish me)
In the end, after having loaded the log directories successfully, loadLogs prints out the following INFO message to the logs:
Logs loading complete in [duration] ms.
In case .kafka_cleanshutdown file does not exist, loadLogs transitions the BrokerState to RecoveringFromUncleanShutdown.
In case of an exception while loading the OffsetCheckpointFile of a log directory (recovery-point-offset-checkpoint file), loadLogs simply prints out the following WARN messages to the logs:
Error occurred while reading recovery-point-offset-checkpoint file of directory [dir]
Resetting the recovery checkpoint to 0
In case of an exception while loading the OffsetCheckpointFile of a log directory (log-start-offset-checkpoint file), loadLogs simply prints out the following WARN messages to the logs:
Error occurred while reading log-start-offset-checkpoint file of directory [dir]
In case of an exception while load the log directory or any other task, loadLogs adds the log directory to a offlineDirs internal registry with the exception and prints out the following ERROR message to the logs:
Error while loading log dir [dir]
|
Note
|
loadLogs is used exclusively when LogManager is created.
|
Loading Partition Log Directory — loadLog Internal Method
loadLog(
logDir: File,
recoveryPoints: Map[TopicPartition, Long],
logStartOffsets: Map[TopicPartition, Long]): Unit
loadLog first prints out the following DEBUG message to the logs:
Loading log '[logDir]'
loadLog then parses the topic and partition out of the directory name of the log (by the given logDir).
loadLog gets the LogConfig for the topic (from the LogConfigs per topic) or defaults to the currentDefaultConfig.
loadLog gets logRecoveryPoint for the partition (from the given recoveryPoints) or defaults to 0.
loadLog gets logStartOffset for the partition (from the given logStartOffsets) or defaults to 0.
loadLog creates a Log.
In case the name of the given logDir ends with -delete suffix, loadLog addLogToBeDeleted.
Otherwise, loadLog adds the Log to the futureLogs or currentLogs internal registry whether it is isFuture or not, respectively.
In case there was Log already registered (the futureLogs or currentLogs internal registry), loadLog throws an IllegalStateException:
FIXME
|
Note
|
loadLog is used exclusively when LogManager is requested to recover and load the logs in log data directories.
|
Starting Up — startup Method
startup(): Unit
startup starts the background threads to flush logs and do log cleanup.
Internally, startup prints out the following INFO message to the logs:
Starting log cleanup with a period of [retentionCheckMs] ms.
startup requests the Scheduler to schedule a task with the name kafka-log-retention that cleanupLogs with the InitialTaskDelayMs delay and the retentionCheckMs execution period.
startup prints out the following INFO message to the logs:
Starting log flusher with a default period of [flushCheckMs] ms.
startup requests the Scheduler to schedule a task with the name kafka-log-flusher that flushDirtyLogs with the InitialTaskDelayMs delay and the flushCheckMs execution period.
startup requests the Scheduler to schedule a task with the name kafka-recovery-point-checkpoint that checkpointLogRecoveryOffsets with the InitialTaskDelayMs delay and the flushRecoveryOffsetCheckpointMs execution period.
startup requests the Scheduler to schedule a task with the name kafka-log-start-offset-checkpoint that checkpointLogStartOffsets with the InitialTaskDelayMs delay and the flushStartOffsetCheckpointMs execution period.
startup requests the Scheduler to schedule a task with the name kafka-delete-logs that deleteLogs with the InitialTaskDelayMs delay.
(only when the CleanerConfig has the enableCleaner flag enabled) startup requests the LogCleaner to start up.
|
Note
|
startup is used exclusively when KafkaServer is requested to start up.
|
BrokerTopicStats
When created, LogManager is given a BrokerTopicStats that is used exclusively to create Logs when recovering and loading logs in log data directories and looking up or creating a Log.
cleanupLogs Method
cleanupLogs(): Unit
cleanupLogs prints out the following DEBUG message to the logs:
Beginning log cleanup...
cleanupLogs finds so-called deletable (non-compacted) logs by requesting the LogCleaner (if used) to pauseCleaningForNonCompactedPartitions or simply finds all logs in the currentLogs internal registry that are not compacted (by the compact flag of the LogConfig of the Log).
For every deletable log, cleanupLogs prints out the following DEBUG message to the logs:
Garbage collecting '[log.name]'
cleanupLogs requests the log to deleteOldSegments.
cleanupLogs finds the future log for the partition (of the deletable log) in the futureLogs internal registry and, if available, prints out the following DEBUG message to the logs followed by requesting it to deleteOldSegments.
Garbage collecting future log '[futureLog.name]'
In the end, cleanupLogs requests the LogCleaner (if used) to resumeCleaning and prints out the following DEBUG message to the logs:
Log cleanup completed. [total] files deleted in [duration] seconds
|
Note
|
cleanupLogs is used when LogManager is requested to start up (and schedules kafka-log-retention periodic task).
|
Getting All Partition Logs — allLogs Method
allLogs: Iterable[Log]
allLogs…FIXME
|
Note
|
allLogs is used when…FIXME
|
addLogToBeDeleted Internal Method
addLogToBeDeleted(log: Log): Unit
addLogToBeDeleted…FIXME
|
Note
|
addLogToBeDeleted is used when…FIXME
|
asyncDelete Method
asyncDelete(
topicPartition: TopicPartition,
isFuture: Boolean = false): Log
asyncDelete…FIXME
|
Note
|
|
Looking Up Or Creating New Partition Log — getOrCreateLog Method
getOrCreateLog(
topicPartition: TopicPartition,
config: LogConfig,
isNew: Boolean = false,
isFuture: Boolean = false): Log
getOrCreateLog looks up the partition log for the given TopicPartition (and returns it if found) or creates a new one.
When looking up the partition log was unsuccessful, getOrCreateLog finds the log directory for the TopicPartition. getOrCreateLog finds one in the preferredLogDirs internal registry and falls back on nextLogDirs (that simply gives all the log directories sorted by the number of partition logs).
getOrCreateLog creates the directory name for the TopicPartition (based on the given isFuture flag).
getOrCreateLog tries to createLogDirectory in the available log directories (one by one) until successful.
getOrCreateLog creates a new Log (using the maxPidExpirationMs and LogManager.ProducerIdExpirationCheckIntervalMs configuration properties).
getOrCreateLog registers the new Log with the TopicPartition in the futureLogs or currentLogs internal registries (based on the given isFuture flag).
getOrCreateLog prints out the following INFO message to the logs:
Created log for partition [topicPartition] in [logDir] with properties [config].
getOrCreateLog removes the TopicPartition from the preferredLogDirs internal registry.
|
Note
|
getOrCreateLog is used exclusively when Partition is requested to getOrCreateReplica.
|
getLog Method
getLog(
topicPartition: TopicPartition,
isFuture: Boolean = false): Option[Log]
getLog…FIXME
|
Note
|
getLog is used when…FIXME
|
checkpointLogRecoveryOffsets Method
checkpointLogRecoveryOffsets(): Unit
checkpointLogRecoveryOffsets…FIXME
|
Note
|
checkpointLogRecoveryOffsets is used when…FIXME
|
checkpointLogStartOffsets Method
checkpointLogStartOffsets(): Unit
checkpointLogStartOffsets…FIXME
|
Note
|
checkpointLogStartOffsets is used when…FIXME
|
isLogDirOnline Method
isLogDirOnline(logDir: String): Boolean
isLogDirOnline…FIXME
|
Note
|
isLogDirOnline is used when…FIXME
|
Validating Data Log Directories — createAndValidateLogDirs Internal Method
createAndValidateLogDirs(
dirs: Seq[File],
initialOfflineDirs: Seq[File]): ConcurrentLinkedQueue[File]
For every directory in the given dirs, createAndValidateLogDirs makes sure that the data directory is available (i.e. it is a readable directory) or creates it.
createAndValidateLogDirs prints out the following INFO message to the logs when a data directory does not exist:
Log directory [dir] not found, creating it.
|
Note
|
createAndValidateLogDirs is given the logDirs and the initialOfflineDirs that LogManager is created with.
|
createAndValidateLogDirs throws…FIXME
|
Note
|
createAndValidateLogDirs is used exclusively when LogManager is created.
|
truncateTo Method
truncateTo(
partitionOffsets: Map[TopicPartition, Long],
isFuture: Boolean): Unit
truncateTo…FIXME
|
Note
|
truncateTo is used when Partition is requested to truncateTo.
|
truncateFullyAndStartAt Method
truncateFullyAndStartAt(
topicPartition: TopicPartition,
newOffset: Long,
isFuture: Boolean): Unit
truncateFullyAndStartAt…FIXME
|
Note
|
truncateFullyAndStartAt is used exclusively when Partition is requested to truncateFullyAndStartAt.
|
resizeRecoveryThreadPool Method
resizeRecoveryThreadPool(newSize: Int): Unit
resizeRecoveryThreadPool prints out the following INFO message to the logs and reconfigures the numRecoveryThreadsPerDataDir internal registry to be the given newSize.
Resizing recovery thread pool size for each data dir from [numRecoveryThreadsPerDataDir] to [newSize]
|
Note
|
resizeRecoveryThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure (with a new value of KafkaConfig.numRecoveryThreadsPerDataDir).
|
Shutting Down — shutdown Method
shutdown(): Unit
shutdown prints out the following INFO message to the logs:
Shutting down.
shutdown then…FIXME
|
Note
|
shutdown is used exclusively when KafkaServer is requested to shutdown.
|
replaceCurrentWithFutureLog Method
replaceCurrentWithFutureLog(topicPartition: TopicPartition): Unit
replaceCurrentWithFutureLog…FIXME
|
Note
|
replaceCurrentWithFutureLog is used exclusively when Partition is requested to maybeReplaceCurrentWithFutureReplica.
|
handleLogDirFailure Method
handleLogDirFailure(
dir: String): Unit
handleLogDirFailure…FIXME
|
Note
|
handleLogDirFailure is used exclusively when ReplicaManager is requested to handleLogDirFailure.
|
Marking Start of Partition Log Initialization — initializingLog Method
initializingLog(
topicPartition: TopicPartition): Unit
initializingLog…FIXME
|
Note
|
initializingLog is used when Partition is requested to createLog.
|
Marking End of Partition Log Initialization — finishedInitializingLog Method
finishedInitializingLog(
topicPartition: TopicPartition,
maybeLog: Option[Log],
fetchLogConfig: () => LogConfig): Unit
finishedInitializingLog…FIXME
|
Note
|
finishedInitializingLog is used when Partition is requested to createLog.
|
flushDirtyLogs Internal Method
flushDirtyLogs(): Unit
flushDirtyLogs prints out the following DEBUG message to the logs:
Checking for dirty logs to flush...
flushDirtyLogs…FIXME
|
Note
|
flushDirtyLogs is used exclusively for the kafka-log-flusher task (when LogManager is requested to start up).
|
createLogDirectory Internal Method
createLogDirectory(
logDir: File,
logDirName: String): Try[File]
createLogDirectory…FIXME
|
Note
|
createLogDirectory is used when LogManager is requested to look up or create a new partition log.
|
nextLogDirs Internal Method
nextLogDirs(): List[File]
nextLogDirs…FIXME
|
Note
|
nextLogDirs is used when LogManager is requested to look up or create a new partition log.
|
Internal Properties
| Name | Description |
|---|---|
|
Java’s ConcurrentLinkedQueue of live log directories (after createAndValidateLogDirs was executed with the logDirs and the initialOfflineDirs directories). Used when…FIXME |
|
Default LogConfig Used when a custom |
|
Pool of Logs per |
|
Pool of Logs per |
|
|
|
Starts as the recoveryThreadsPerDataDir and can then be dynamically changed. |
|
|
|