LogCleaner

LogCleaner is created exclusively when LogManager is created (with enableCleaner enabled which is the default).

When created, LogCleaner is given a CleanerConfig that LogManager builds when created (when KafkaServer is requested to start up).

LogCleaner is a KafkaMetricsGroup and registers performance metrics.

Table 1. LogCleaner’s Performance Metrics
Metric Name Description

max-buffer-utilization-percent

cleaner-recopy-percent

max-clean-time-secs

The performance metrics are registered in kafka.log:type=LogCleaner group.

LogCleaner jconsole.png
Figure 1. LogCleaner in jconsole

LogCleaner is a BrokerReconfigurable for the following dynamic configurations:

Table 2. LogCleaner’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

cleaners

Used when…​FIXME

config

Initialized with the given CleanerConfig

Changed in reconfigure

cleanerManager

Tip

Enable WARN or INFO logging levels for kafka.log.LogCleaner logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.log.LogCleaner=INFO

Refer to Logging.


Please note that Kafka comes with a preconfigured kafka.log.LogCleaner logger in config/log4j.properties:

log4j.appender.cleanerAppender=org.apache.log4j.DailyRollingFileAppender
log4j.appender.cleanerAppender.DatePattern='.'yyyy-MM-dd-HH
log4j.appender.cleanerAppender.File=${kafka.logs.dir}/log-cleaner.log
log4j.appender.cleanerAppender.layout=org.apache.log4j.PatternLayout
log4j.appender.cleanerAppender.layout.ConversionPattern=[%d] %p %m (%c)%n

log4j.logger.kafka.log.LogCleaner=INFO, cleanerAppender
log4j.additivity.kafka.log.LogCleaner=false

That means that the logs of LogCleaner go to logs/log-cleaner.log file at INFO logging level and are not added to the main logs (per log4j.additivity being off).

createNewCleanedSegment Method

createNewCleanedSegment(log: Log, baseOffset: Long): LogSegment

createNewCleanedSegment…​FIXME

Note
createNewCleanedSegment is used when…​FIXME

cleanSegments Internal Method

cleanSegments(
  log: Log,
  segments: Seq[LogSegment],
  map: OffsetMap,
  deleteHorizonMs: Long,
  stats: CleanerStats): Unit

cleanSegments…​FIXME

Note
cleanSegments is used when…​FIXME

buildOffsetMap Internal Method

buildOffsetMap(
  log: Log,
  start: Long,
  end: Long,
  map: OffsetMap,
  stats: CleanerStats): Unit

buildOffsetMap…​FIXME

Note
buildOffsetMap is used when…​FIXME

Reconfiguring — reconfigure Method

reconfigure(oldConfig: KafkaConfig, newConfig: KafkaConfig): Unit
Note
reconfigure is part of the BrokerReconfigurable Contract to change (reconfigure) the value of a Kafka dynamic configuration.

reconfigure…​FIXME

Creating LogCleaner Instance

LogCleaner takes the following to be created:

  • CleanerConfig

  • Log directories

  • Logs per Kafka TopicPartition

  • LogDirFailureChannel

  • Time

LogCleaner initializes the internal registries and counters.

Starting Up — startup Method

startup(): Unit

startup prints out the following INFO message to the logs:

Starting the log cleaner

startup creates new CleanerThreads and starts them all immediately.

startup adds the cleaner threads in cleaners internal registry.

Note
The number of CleanerThreads is controlled by log.cleaner.threads dynamic configuration (default: 1).
Note

startup is used when:

  • LogManager is requested to start up (with enableCleaner enabled which is the default)

  • LogCleaner is requested to reconfigure

Building CleanerConfig From KafkaConfig — cleanerConfig Method

cleanerConfig(config: KafkaConfig): CleanerConfig

cleanerConfig simply creates a CleanerConfig from the given KafkaConfig.

Note

cleanerConfig is used when:

CleanerConfig

CleanerConfig represents a set of dynamic configurations of a LogCleaner:

CleanerConfig is created exclusively when LogCleaner is requested to build a CleanerConfig from a KafkaConfig.

awaitCleaned Method

awaitCleaned(
  topicPartition: TopicPartition,
  offset: Long,
  maxWaitMs: Long = 60000L): Boolean

awaitCleaned…​FIXME

Note
awaitCleaned seems to be used exclusively in tests.

alterCheckpointDir Method

alterCheckpointDir(
  topicPartition: TopicPartition,
  sourceLogDir: File,
  destLogDir: File): Unit

alterCheckpointDir…​FIXME

Note
alterCheckpointDir is used exclusively when LogManager is requested to replaceCurrentWithFutureLog.

handleLogDirFailure Method

handleLogDirFailure(dir: String): Unit

handleLogDirFailure…​FIXME

Note
handleLogDirFailure is used exclusively when LogManager is requested to handleLogDirFailure.

updateCheckpoints Method

updateCheckpoints(dataDir: File): Unit

updateCheckpoints…​FIXME

Note
updateCheckpoints is used exclusively when LogManager is requested to asyncDelete.

maybeTruncateCheckpoint Method

maybeTruncateCheckpoint(
  dataDir: File,
  topicPartition: TopicPartition,
  offset: Long): Unit

maybeTruncateCheckpoint…​FIXME

Note
maybeTruncateCheckpoint is used when LogManager is requested to truncateTo and truncateFullyAndStartAt.

results matching ""

    No results matching ""