log4j.logger.kafka.log.LogCleaner=ALL
LogCleaner
LogCleaner manages one or more CleanerThreads to remove obsolete records from logs with compact retention strategy.
LogCleaner is created exclusively for LogManager with enableCleaner flag enabled (of the CleanerConfig of the parent LogManager) (which is the default).
When created, LogCleaner is given a CleanerConfig that LogManager builds when created (when KafkaServer is requested to start up).
LogCleaner registers performance metrics (as a KafkaMetricsGroup).
LogCleaner supports dynamic (re)configuration (as a BrokerReconfigurable).
|
Tip
|
Enable Add the following line to Refer to Logging. Please note that Kafka comes with a preconfigured
That means that the logs of |
Creating LogCleaner Instance
LogCleaner takes the following to be created:
-
Logs per Kafka
TopicPartition
LogCleaner initializes the internal properties.
LogCleaner And CleanerThreads
LogCleaner manages CleanerThreads for cleaning up dirty logs.
The number of CleanerThreads in use is determined by the numThreads of the CleanerConfig (default: 1).
The statistics about the last log cleaning of CleanerThreads are used for the performance metrics.
Performance Metrics
LogCleaner is a KafkaMetricsGroup with the following performance metrics.
| Metric Name | Description |
|---|---|
|
Sum of |
|
Maximum |
|
Maximum cleaning time of the cleaners in the last cleaning |
|
Maximum delay between the time when a log is required to be compacted and the time of last cleaner run |
The performance metrics are registered in kafka.log:type=LogCleaner group.
LogCleaner And CleanerConfig
config: CleanerConfig
LogCleaner uses CleanerConfig for dynamic configurations (that can be changed via reconfigure).
LogCleaner is first given the initial CleanerConfig when created (from LogManager based on the KafkaConfig from KafkaServer).
Dynamic (Re)Configuration
LogCleaner is a BrokerReconfigurable for the following dynamic configurations:
createNewCleanedSegment Object Method
createNewCleanedSegment(
log: Log,
baseOffset: Long): LogSegment
createNewCleanedSegment…FIXME
|
Note
|
|
cleanSegments Internal Method
cleanSegments(
log: Log,
segments: Seq[LogSegment],
map: OffsetMap,
deleteHorizonMs: Long,
stats: CleanerStats): Unit
cleanSegments…FIXME
|
Note
|
cleanSegments is used when…FIXME
|
buildOffsetMap Internal Method
buildOffsetMap(
log: Log,
start: Long,
end: Long,
map: OffsetMap,
stats: CleanerStats): Unit
buildOffsetMap…FIXME
|
Note
|
buildOffsetMap is used when…FIXME
|
Reconfiguring — reconfigure Method
reconfigure(
oldConfig: KafkaConfig,
newConfig: KafkaConfig): Unit
|
Note
|
reconfigure is part of the BrokerReconfigurable Contract to change (reconfigure) the value of a Kafka dynamic configuration.
|
reconfigure…FIXME
Starting Up — startup Method
startup(): Unit
startup prints out the following INFO message to the logs:
Starting the log cleaner
startup creates new CleanerThreads and starts them all immediately.
startup adds the cleaner threads in cleaners internal registry.
|
Note
|
The number of CleanerThreads is controlled by log.cleaner.threads dynamic configuration (default: 1).
|
|
Note
|
|
Building CleanerConfig From KafkaConfig — cleanerConfig Utility
cleanerConfig(
config: KafkaConfig): CleanerConfig
cleanerConfig simply creates a new CleanerConfig based on the given KafkaConfig.
|
Note
|
|
awaitCleaned Method
awaitCleaned(
topicPartition: TopicPartition,
offset: Long,
maxWaitMs: Long = 60000L): Boolean
awaitCleaned…FIXME
|
Note
|
awaitCleaned seems to be used exclusively in tests.
|
alterCheckpointDir Method
alterCheckpointDir(
topicPartition: TopicPartition,
sourceLogDir: File,
destLogDir: File): Unit
alterCheckpointDir…FIXME
|
Note
|
alterCheckpointDir is used exclusively when LogManager is requested to replaceCurrentWithFutureLog.
|
handleLogDirFailure Method
handleLogDirFailure(dir: String): Unit
handleLogDirFailure…FIXME
|
Note
|
handleLogDirFailure is used exclusively when LogManager is requested to handleLogDirFailure.
|
updateCheckpoints Method
updateCheckpoints(dataDir: File): Unit
updateCheckpoints…FIXME
|
Note
|
updateCheckpoints is used exclusively when LogManager is requested to asyncDelete.
|
maybeTruncateCheckpoint Method
maybeTruncateCheckpoint(
dataDir: File,
topicPartition: TopicPartition,
offset: Long): Unit
maybeTruncateCheckpoint…FIXME
|
Note
|
maybeTruncateCheckpoint is used when LogManager is requested to truncateTo and truncateFullyAndStartAt.
|
abortAndPauseCleaning Method
abortAndPauseCleaning(
topicPartition: TopicPartition): Unit
abortAndPauseCleaning…FIXME
|
Note
|
abortAndPauseCleaning is used when…FIXME
|
resumeCleaning Method
resumeCleaning(
topicPartitions: Iterable[TopicPartition]): Unit
resumeCleaning…FIXME
|
Note
|
resumeCleaning is used when…FIXME
|
pauseCleaningForNonCompactedPartitions Method
pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)]
pauseCleaningForNonCompactedPartitions…FIXME
|
Note
|
pauseCleaningForNonCompactedPartitions is used when…FIXME
|