log4j.logger.kafka.log.LogCleaner=ALL
LogCleaner
LogCleaner
manages one or more CleanerThreads to remove obsolete records from logs with compact retention strategy.
LogCleaner
is created exclusively for LogManager with enableCleaner flag enabled (of the CleanerConfig of the parent LogManager) (which is the default).
When created, LogCleaner
is given a CleanerConfig that LogManager
builds when created (when KafkaServer
is requested to start up).
LogCleaner
registers performance metrics (as a KafkaMetricsGroup).
LogCleaner
supports dynamic (re)configuration (as a BrokerReconfigurable).
Tip
|
Enable Add the following line to Refer to Logging. Please note that Kafka comes with a preconfigured
That means that the logs of |
Creating LogCleaner Instance
LogCleaner
takes the following to be created:
-
Logs per Kafka
TopicPartition
LogCleaner
initializes the internal properties.
LogCleaner And CleanerThreads
LogCleaner
manages CleanerThreads for cleaning up dirty logs.
The number of CleanerThreads
in use is determined by the numThreads of the CleanerConfig (default: 1).
The statistics about the last log cleaning of CleanerThreads
are used for the performance metrics.
Performance Metrics
LogCleaner
is a KafkaMetricsGroup with the following performance metrics.
Metric Name | Description |
---|---|
|
Sum of |
|
Maximum |
|
Maximum cleaning time of the cleaners in the last cleaning |
|
Maximum delay between the time when a log is required to be compacted and the time of last cleaner run |
The performance metrics are registered in kafka.log:type=LogCleaner group.
LogCleaner And CleanerConfig
config: CleanerConfig
LogCleaner
uses CleanerConfig for dynamic configurations (that can be changed via reconfigure).
LogCleaner
is first given the initial CleanerConfig when created (from LogManager based on the KafkaConfig
from KafkaServer
).
Dynamic (Re)Configuration
LogCleaner
is a BrokerReconfigurable for the following dynamic configurations:
createNewCleanedSegment
Object Method
createNewCleanedSegment(
log: Log,
baseOffset: Long): LogSegment
createNewCleanedSegment
…FIXME
Note
|
|
cleanSegments
Internal Method
cleanSegments(
log: Log,
segments: Seq[LogSegment],
map: OffsetMap,
deleteHorizonMs: Long,
stats: CleanerStats): Unit
cleanSegments
…FIXME
Note
|
cleanSegments is used when…FIXME
|
buildOffsetMap
Internal Method
buildOffsetMap(
log: Log,
start: Long,
end: Long,
map: OffsetMap,
stats: CleanerStats): Unit
buildOffsetMap
…FIXME
Note
|
buildOffsetMap is used when…FIXME
|
Reconfiguring — reconfigure
Method
reconfigure(
oldConfig: KafkaConfig,
newConfig: KafkaConfig): Unit
Note
|
reconfigure is part of the BrokerReconfigurable Contract to change (reconfigure) the value of a Kafka dynamic configuration.
|
reconfigure
…FIXME
Starting Up — startup
Method
startup(): Unit
startup
prints out the following INFO message to the logs:
Starting the log cleaner
startup
creates new CleanerThreads and starts them all immediately.
startup
adds the cleaner threads in cleaners internal registry.
Note
|
The number of CleanerThreads is controlled by log.cleaner.threads dynamic configuration (default: 1 ).
|
Note
|
|
Building CleanerConfig From KafkaConfig — cleanerConfig
Utility
cleanerConfig(
config: KafkaConfig): CleanerConfig
cleanerConfig
simply creates a new CleanerConfig based on the given KafkaConfig.
Note
|
|
awaitCleaned
Method
awaitCleaned(
topicPartition: TopicPartition,
offset: Long,
maxWaitMs: Long = 60000L): Boolean
awaitCleaned
…FIXME
Note
|
awaitCleaned seems to be used exclusively in tests.
|
alterCheckpointDir
Method
alterCheckpointDir(
topicPartition: TopicPartition,
sourceLogDir: File,
destLogDir: File): Unit
alterCheckpointDir
…FIXME
Note
|
alterCheckpointDir is used exclusively when LogManager is requested to replaceCurrentWithFutureLog.
|
handleLogDirFailure
Method
handleLogDirFailure(dir: String): Unit
handleLogDirFailure
…FIXME
Note
|
handleLogDirFailure is used exclusively when LogManager is requested to handleLogDirFailure.
|
updateCheckpoints
Method
updateCheckpoints(dataDir: File): Unit
updateCheckpoints
…FIXME
Note
|
updateCheckpoints is used exclusively when LogManager is requested to asyncDelete.
|
maybeTruncateCheckpoint
Method
maybeTruncateCheckpoint(
dataDir: File,
topicPartition: TopicPartition,
offset: Long): Unit
maybeTruncateCheckpoint
…FIXME
Note
|
maybeTruncateCheckpoint is used when LogManager is requested to truncateTo and truncateFullyAndStartAt.
|
abortAndPauseCleaning
Method
abortAndPauseCleaning(
topicPartition: TopicPartition): Unit
abortAndPauseCleaning
…FIXME
Note
|
abortAndPauseCleaning is used when…FIXME
|
resumeCleaning
Method
resumeCleaning(
topicPartitions: Iterable[TopicPartition]): Unit
resumeCleaning
…FIXME
Note
|
resumeCleaning is used when…FIXME
|
pauseCleaningForNonCompactedPartitions
Method
pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)]
pauseCleaningForNonCompactedPartitions
…FIXME
Note
|
pauseCleaningForNonCompactedPartitions is used when…FIXME
|