LogManager
LogManager
is created (via apply factory utility) and immediately started for KafkaServer
when requested to start up.
LogManager
uses the following configuration properties.
Name | Description |
---|---|
Used to enable a LogCleaner |
|
Used for the log directories that are immediately validated and loaded (on a fixed thread pool with numRecoveryThreadsPerDataDir recovery threads) |
|
Number of threads per log data directory for log recovery at startup and flushing at shutdown |
|
LogManager
is used to create a ReplicaManager, a DynamicLogConfig, a TopicConfigHandler, and a Partition.
LogManager
defaults to 30000ms
for the initial task delay.
Tip
|
Enable Add the following line to
Refer to Logging. |
LogManager and LogCleaner
LogManager
creates a LogCleaner only when the log.cleaner.enable flag (of the given CleanerConfig) is enabled (default: true
).
LogManager
uses LogCleaner for the following:
Performance Metrics
LogManager
is a KafkaMetricsGroup with the following performance metrics.
Metric Name | Description |
---|---|
|
|
|
Registered for every log directory to indicate whether it is online or offline Possible values:
The path of the directory is the tag of the metric. |
The performance metrics are registered in kafka.log:type=LogManager group.
Creating LogManager — apply
Utility
apply(
config: KafkaConfig,
initialOfflineDirs: Seq[String],
zkClient: KafkaZkClient,
brokerState: BrokerState,
kafkaScheduler: KafkaScheduler,
time: Time,
brokerTopicStats: BrokerTopicStats,
logDirFailureChannel: LogDirFailureChannel): LogManager
apply
…FIXME
Note
|
apply is used exclusively when KafkaServer is requested to start up.
|
Creating LogManager Instance
LogManager
takes the following to be created:
-
Topic configurations - LogConfigs per topic name
-
Initial LogConfig
-
num.recovery.threads.per.data.dir dynamic configuration property
-
log.flush.scheduler.interval.ms configuration property
-
log.flush.offset.checkpoint.interval.ms configuration property
-
log.flush.start.offset.checkpoint.interval.ms configuration property
-
log.retention.check.interval.ms configuration property
-
transactional.id.expiration.ms configuration property
LogManager
initializes the internal properties.
While being created, LogManager
loads logs.
abortAndPauseCleaning
Method
abortAndPauseCleaning(
topicPartition: TopicPartition): Unit
abortAndPauseCleaning
…FIXME
Note
|
abortAndPauseCleaning is used when ReplicaManager is requested to alterReplicaLogDirs and becomeLeaderOrFollower.
|
Recovering And Loading Logs In Log Data Directories — loadLogs
Internal Method
loadLogs(): Unit
loadLogs
prints out the following INFO message to the logs:
Loading logs.
For every live log directory, loadLogs
first creates a fixed thread pool (with numRecoveryThreadsPerDataDir threads).
loadLogs
then checks whether .kafka_cleanshutdown file exists in the log directory. If so, loadLogs
prints out the following DEBUG message to the logs:
Found clean shutdown file. Skipping recovery for all logs in data directory: [dir]
loadLogs
uses the recoveryPointCheckpoints to look up the OffsetCheckpointFile
for the log directory (recovery-point-offset-checkpoint file) and then loads it.
loadLogs
uses the logStartOffsetCheckpoints to look up the OffsetCheckpointFile
for the log directory (recovery-point-offset-checkpoint file) and then loads it.
For every directory in the log directory, loadLogs
creates a new thread to load the log directory with the recovery points and log start offsets (that have just been loaded).
loadLogs
submits the new threads to load the log directory for execution on the fixed thread pool.
loadLogs
then…FIXME (finish me)
In the end, after having loaded the log directories successfully, loadLogs
prints out the following INFO message to the logs:
Logs loading complete in [duration] ms.
In case .kafka_cleanshutdown file does not exist, loadLogs
transitions the BrokerState to RecoveringFromUncleanShutdown
.
In case of an exception while loading the OffsetCheckpointFile
of a log directory (recovery-point-offset-checkpoint file), loadLogs
simply prints out the following WARN messages to the logs:
Error occurred while reading recovery-point-offset-checkpoint file of directory [dir]
Resetting the recovery checkpoint to 0
In case of an exception while loading the OffsetCheckpointFile
of a log directory (log-start-offset-checkpoint file), loadLogs
simply prints out the following WARN messages to the logs:
Error occurred while reading log-start-offset-checkpoint file of directory [dir]
In case of an exception while load the log directory or any other task, loadLogs
adds the log directory to a offlineDirs internal registry with the exception and prints out the following ERROR message to the logs:
Error while loading log dir [dir]
Note
|
loadLogs is used exclusively when LogManager is created.
|
Loading Partition Log Directory — loadLog
Internal Method
loadLog(
logDir: File,
recoveryPoints: Map[TopicPartition, Long],
logStartOffsets: Map[TopicPartition, Long]): Unit
loadLog
first prints out the following DEBUG message to the logs:
Loading log '[logDir]'
loadLog
then parses the topic and partition out of the directory name of the log (by the given logDir
).
loadLog
gets the LogConfig for the topic (from the LogConfigs per topic) or defaults to the currentDefaultConfig.
loadLog
gets logRecoveryPoint
for the partition (from the given recoveryPoints
) or defaults to 0
.
loadLog
gets logStartOffset
for the partition (from the given logStartOffsets
) or defaults to 0
.
loadLog
creates a Log.
In case the name of the given logDir
ends with -delete suffix, loadLog
addLogToBeDeleted.
Otherwise, loadLog
adds the Log
to the futureLogs or currentLogs internal registry whether it is isFuture or not, respectively.
In case there was Log
already registered (the futureLogs or currentLogs internal registry), loadLog
throws an IllegalStateException
:
FIXME
Note
|
loadLog is used exclusively when LogManager is requested to recover and load the logs in log data directories.
|
Starting Up — startup
Method
startup(): Unit
startup
starts the background threads to flush logs and do log cleanup.
Internally, startup
prints out the following INFO message to the logs:
Starting log cleanup with a period of [retentionCheckMs] ms.
startup
requests the Scheduler to schedule a task with the name kafka-log-retention that cleanupLogs with the InitialTaskDelayMs delay and the retentionCheckMs execution period.
startup
prints out the following INFO message to the logs:
Starting log flusher with a default period of [flushCheckMs] ms.
startup
requests the Scheduler to schedule a task with the name kafka-log-flusher that flushDirtyLogs with the InitialTaskDelayMs delay and the flushCheckMs execution period.
startup
requests the Scheduler to schedule a task with the name kafka-recovery-point-checkpoint that checkpointLogRecoveryOffsets with the InitialTaskDelayMs delay and the flushRecoveryOffsetCheckpointMs execution period.
startup
requests the Scheduler to schedule a task with the name kafka-log-start-offset-checkpoint that checkpointLogStartOffsets with the InitialTaskDelayMs delay and the flushStartOffsetCheckpointMs execution period.
startup
requests the Scheduler to schedule a task with the name kafka-delete-logs that deleteLogs with the InitialTaskDelayMs delay.
(only when the CleanerConfig has the enableCleaner flag enabled) startup
requests the LogCleaner to start up.
Note
|
startup is used exclusively when KafkaServer is requested to start up.
|
BrokerTopicStats
When created, LogManager
is given a BrokerTopicStats that is used exclusively to create Logs when recovering and loading logs in log data directories and looking up or creating a Log.
cleanupLogs
Method
cleanupLogs(): Unit
cleanupLogs
prints out the following DEBUG message to the logs:
Beginning log cleanup...
cleanupLogs
finds so-called deletable (non-compacted) logs by requesting the LogCleaner (if used) to pauseCleaningForNonCompactedPartitions or simply finds all logs in the currentLogs internal registry that are not compacted (by the compact flag of the LogConfig of the Log).
For every deletable log, cleanupLogs
prints out the following DEBUG message to the logs:
Garbage collecting '[log.name]'
cleanupLogs
requests the log to deleteOldSegments.
cleanupLogs
finds the future log for the partition (of the deletable log) in the futureLogs internal registry and, if available, prints out the following DEBUG message to the logs followed by requesting it to deleteOldSegments.
Garbage collecting future log '[futureLog.name]'
In the end, cleanupLogs
requests the LogCleaner (if used) to resumeCleaning and prints out the following DEBUG message to the logs:
Log cleanup completed. [total] files deleted in [duration] seconds
Note
|
cleanupLogs is used when LogManager is requested to start up (and schedules kafka-log-retention periodic task).
|
Getting All Partition Logs — allLogs
Method
allLogs: Iterable[Log]
allLogs
…FIXME
Note
|
allLogs is used when…FIXME
|
addLogToBeDeleted
Internal Method
addLogToBeDeleted(log: Log): Unit
addLogToBeDeleted
…FIXME
Note
|
addLogToBeDeleted is used when…FIXME
|
asyncDelete
Method
asyncDelete(
topicPartition: TopicPartition,
isFuture: Boolean = false): Log
asyncDelete
…FIXME
Note
|
|
Looking Up Or Creating New Partition Log — getOrCreateLog
Method
getOrCreateLog(
topicPartition: TopicPartition,
config: LogConfig,
isNew: Boolean = false,
isFuture: Boolean = false): Log
getOrCreateLog
looks up the partition log for the given TopicPartition
(and returns it if found) or creates a new one.
When looking up the partition log was unsuccessful, getOrCreateLog
finds the log directory for the TopicPartition
. getOrCreateLog
finds one in the preferredLogDirs internal registry and falls back on nextLogDirs (that simply gives all the log directories sorted by the number of partition logs).
getOrCreateLog
creates the directory name for the TopicPartition
(based on the given isFuture
flag).
getOrCreateLog
tries to createLogDirectory in the available log directories (one by one) until successful.
getOrCreateLog
creates a new Log (using the maxPidExpirationMs and LogManager.ProducerIdExpirationCheckIntervalMs
configuration properties).
getOrCreateLog
registers the new Log
with the TopicPartition
in the futureLogs or currentLogs internal registries (based on the given isFuture flag).
getOrCreateLog
prints out the following INFO message to the logs:
Created log for partition [topicPartition] in [logDir] with properties [config].
getOrCreateLog
removes the TopicPartition
from the preferredLogDirs internal registry.
Note
|
getOrCreateLog is used exclusively when Partition is requested to getOrCreateReplica.
|
getLog
Method
getLog(
topicPartition: TopicPartition,
isFuture: Boolean = false): Option[Log]
getLog
…FIXME
Note
|
getLog is used when…FIXME
|
checkpointLogRecoveryOffsets
Method
checkpointLogRecoveryOffsets(): Unit
checkpointLogRecoveryOffsets
…FIXME
Note
|
checkpointLogRecoveryOffsets is used when…FIXME
|
checkpointLogStartOffsets
Method
checkpointLogStartOffsets(): Unit
checkpointLogStartOffsets
…FIXME
Note
|
checkpointLogStartOffsets is used when…FIXME
|
isLogDirOnline
Method
isLogDirOnline(logDir: String): Boolean
isLogDirOnline
…FIXME
Note
|
isLogDirOnline is used when…FIXME
|
Validating Data Log Directories — createAndValidateLogDirs
Internal Method
createAndValidateLogDirs(
dirs: Seq[File],
initialOfflineDirs: Seq[File]): ConcurrentLinkedQueue[File]
For every directory in the given dirs
, createAndValidateLogDirs
makes sure that the data directory is available (i.e. it is a readable directory) or creates it.
createAndValidateLogDirs
prints out the following INFO message to the logs when a data directory does not exist:
Log directory [dir] not found, creating it.
Note
|
createAndValidateLogDirs is given the logDirs and the initialOfflineDirs that LogManager is created with.
|
createAndValidateLogDirs
throws…FIXME
Note
|
createAndValidateLogDirs is used exclusively when LogManager is created.
|
truncateTo
Method
truncateTo(
partitionOffsets: Map[TopicPartition, Long],
isFuture: Boolean): Unit
truncateTo
…FIXME
Note
|
truncateTo is used when Partition is requested to truncateTo.
|
truncateFullyAndStartAt
Method
truncateFullyAndStartAt(
topicPartition: TopicPartition,
newOffset: Long,
isFuture: Boolean): Unit
truncateFullyAndStartAt
…FIXME
Note
|
truncateFullyAndStartAt is used exclusively when Partition is requested to truncateFullyAndStartAt.
|
resizeRecoveryThreadPool
Method
resizeRecoveryThreadPool(newSize: Int): Unit
resizeRecoveryThreadPool
prints out the following INFO message to the logs and reconfigures the numRecoveryThreadsPerDataDir internal registry to be the given newSize
.
Resizing recovery thread pool size for each data dir from [numRecoveryThreadsPerDataDir] to [newSize]
Note
|
resizeRecoveryThreadPool is used exclusively when DynamicThreadPool is requested to reconfigure (with a new value of KafkaConfig.numRecoveryThreadsPerDataDir).
|
Shutting Down — shutdown
Method
shutdown(): Unit
shutdown
prints out the following INFO message to the logs:
Shutting down.
shutdown
then…FIXME
Note
|
shutdown is used exclusively when KafkaServer is requested to shutdown.
|
replaceCurrentWithFutureLog
Method
replaceCurrentWithFutureLog(topicPartition: TopicPartition): Unit
replaceCurrentWithFutureLog
…FIXME
Note
|
replaceCurrentWithFutureLog is used exclusively when Partition is requested to maybeReplaceCurrentWithFutureReplica.
|
handleLogDirFailure
Method
handleLogDirFailure(
dir: String): Unit
handleLogDirFailure
…FIXME
Note
|
handleLogDirFailure is used exclusively when ReplicaManager is requested to handleLogDirFailure.
|
Marking Start of Partition Log Initialization — initializingLog
Method
initializingLog(
topicPartition: TopicPartition): Unit
initializingLog
…FIXME
Note
|
initializingLog is used when Partition is requested to createLog.
|
Marking End of Partition Log Initialization — finishedInitializingLog
Method
finishedInitializingLog(
topicPartition: TopicPartition,
maybeLog: Option[Log],
fetchLogConfig: () => LogConfig): Unit
finishedInitializingLog
…FIXME
Note
|
finishedInitializingLog is used when Partition is requested to createLog.
|
flushDirtyLogs
Internal Method
flushDirtyLogs(): Unit
flushDirtyLogs
prints out the following DEBUG message to the logs:
Checking for dirty logs to flush...
flushDirtyLogs
…FIXME
Note
|
flushDirtyLogs is used exclusively for the kafka-log-flusher task (when LogManager is requested to start up).
|
createLogDirectory
Internal Method
createLogDirectory(
logDir: File,
logDirName: String): Try[File]
createLogDirectory
…FIXME
Note
|
createLogDirectory is used when LogManager is requested to look up or create a new partition log.
|
nextLogDirs
Internal Method
nextLogDirs(): List[File]
nextLogDirs
…FIXME
Note
|
nextLogDirs is used when LogManager is requested to look up or create a new partition log.
|
Internal Properties
Name | Description |
---|---|
|
Java’s ConcurrentLinkedQueue of live log directories (after createAndValidateLogDirs was executed with the logDirs and the initialOfflineDirs directories). Used when…FIXME |
|
Default LogConfig Used when a custom |
|
Pool of Logs per |
|
Pool of Logs per |
|
|
|
Starts as the recoveryThreadsPerDataDir and can then be dynamically changed. |
|
|
|