LogCleaner

LogCleaner manages one or more CleanerThreads to remove obsolete records from logs with compact retention strategy.

LogCleaner is created exclusively for LogManager with enableCleaner flag enabled (of the CleanerConfig of the parent LogManager) (which is the default).

When created, LogCleaner is given a CleanerConfig that LogManager builds when created (when KafkaServer is requested to start up).

LogCleaner registers performance metrics (as a KafkaMetricsGroup).

LogCleaner supports dynamic (re)configuration (as a BrokerReconfigurable).

Tip

Enable ALL logging levels for kafka.log.LogCleaner logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.log.LogCleaner=ALL

Refer to Logging.


Please note that Kafka comes with a preconfigured kafka.log.LogCleaner logger in config/log4j.properties:

log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false

That means that the logs of LogCleaner go to logs/log-cleaner.log file at INFO logging level and are not added to the main logs (per log4j.additivity being off).

Creating LogCleaner Instance

LogCleaner takes the following to be created:

  • CleanerConfig

  • Log directories

  • Logs per Kafka TopicPartition

  • LogDirFailureChannel

  • Time (default: Time.SYSTEM)

LogCleaner initializes the internal properties.

LogCleaner And CleanerThreads

LogCleaner manages CleanerThreads for cleaning up dirty logs.

CleanerThreads are created and started at startup. They are doing their join until shutdown.

The number of CleanerThreads in use is determined by the numThreads of the CleanerConfig (default: 1).

The statistics about the last log cleaning of CleanerThreads are used for the performance metrics.

Performance Metrics

LogCleaner is a KafkaMetricsGroup with the following performance metrics.

Table 1. LogCleaner’s Performance Metrics
Metric Name Description

cleaner-recopy-percent

Sum of bytesWritten by sum of bytesRead of the cleaners in the last cleaning

max-buffer-utilization-percent

Maximum bufferUtilization of the cleaners in the last cleaning

max-clean-time-secs

Maximum cleaning time of the cleaners in the last cleaning

max-compaction-delay-secs

Maximum delay between the time when a log is required to be compacted and the time of last cleaner run

The performance metrics are registered in kafka.log:type=LogCleaner group.

LogCleaner jconsole.png
Figure 1. LogCleaner in jconsole

LogCleaner And CleanerConfig

config: CleanerConfig

LogCleaner uses CleanerConfig for dynamic configurations (that can be changed via reconfigure).

LogCleaner is first given the initial CleanerConfig when created (from LogManager based on the KafkaConfig from KafkaServer).

createNewCleanedSegment Object Method

createNewCleanedSegment(
  log: Log,
  baseOffset: Long): LogSegment

createNewCleanedSegment…​FIXME

Note

createNewCleanedSegment is used when:

cleanSegments Internal Method

cleanSegments(
  log: Log,
  segments: Seq[LogSegment],
  map: OffsetMap,
  deleteHorizonMs: Long,
  stats: CleanerStats): Unit

cleanSegments…​FIXME

Note
cleanSegments is used when…​FIXME

buildOffsetMap Internal Method

buildOffsetMap(
  log: Log,
  start: Long,
  end: Long,
  map: OffsetMap,
  stats: CleanerStats): Unit

buildOffsetMap…​FIXME

Note
buildOffsetMap is used when…​FIXME

Reconfiguring — reconfigure Method

reconfigure(
  oldConfig: KafkaConfig,
  newConfig: KafkaConfig): Unit
Note
reconfigure is part of the BrokerReconfigurable Contract to change (reconfigure) the value of a Kafka dynamic configuration.

reconfigure…​FIXME

Starting Up — startup Method

startup(): Unit

startup prints out the following INFO message to the logs:

Starting the log cleaner

startup creates new CleanerThreads and starts them all immediately.

startup adds the cleaner threads in cleaners internal registry.

Note
The number of CleanerThreads is controlled by log.cleaner.threads dynamic configuration (default: 1).
Note

startup is used when:

  • LogManager is requested to start up (with enableCleaner enabled which is the default)

  • LogCleaner is requested to reconfigure

Building CleanerConfig From KafkaConfig — cleanerConfig Utility

cleanerConfig(
  config: KafkaConfig): CleanerConfig

cleanerConfig simply creates a new CleanerConfig based on the given KafkaConfig.

Note

cleanerConfig is used when:

awaitCleaned Method

awaitCleaned(
  topicPartition: TopicPartition,
  offset: Long,
  maxWaitMs: Long = 60000L): Boolean

awaitCleaned…​FIXME

Note
awaitCleaned seems to be used exclusively in tests.

alterCheckpointDir Method

alterCheckpointDir(
  topicPartition: TopicPartition,
  sourceLogDir: File,
  destLogDir: File): Unit

alterCheckpointDir…​FIXME

Note
alterCheckpointDir is used exclusively when LogManager is requested to replaceCurrentWithFutureLog.

handleLogDirFailure Method

handleLogDirFailure(dir: String): Unit

handleLogDirFailure…​FIXME

Note
handleLogDirFailure is used exclusively when LogManager is requested to handleLogDirFailure.

updateCheckpoints Method

updateCheckpoints(dataDir: File): Unit

updateCheckpoints…​FIXME

Note
updateCheckpoints is used exclusively when LogManager is requested to asyncDelete.

maybeTruncateCheckpoint Method

maybeTruncateCheckpoint(
  dataDir: File,
  topicPartition: TopicPartition,
  offset: Long): Unit

maybeTruncateCheckpoint…​FIXME

Note
maybeTruncateCheckpoint is used when LogManager is requested to truncateTo and truncateFullyAndStartAt.

Shuting Down — shutdown Method

shutdown(): Unit

shutdown…​FIXME

Note
shutdown is used when…​FIXME

abortAndPauseCleaning Method

abortAndPauseCleaning(
  topicPartition: TopicPartition): Unit

abortAndPauseCleaning…​FIXME

Note
abortAndPauseCleaning is used when…​FIXME

resumeCleaning Method

resumeCleaning(
  topicPartitions: Iterable[TopicPartition]): Unit

resumeCleaning…​FIXME

Note
resumeCleaning is used when…​FIXME

pauseCleaningForNonCompactedPartitions Method

pauseCleaningForNonCompactedPartitions(): Iterable[(TopicPartition, Log)]

pauseCleaningForNonCompactedPartitions…​FIXME

Note
pauseCleaningForNonCompactedPartitions is used when…​FIXME

Internal Properties

Name Description

cleanerManager

results matching ""

    No results matching ""