LogManager

LogManager is created and immediately started when KafkaServer is requested to start up.

LogManager.png
Figure 1. LogManager and KafkaServer

LogManager manages the log directories that are configured using log.dirs or log.dir configuration properties (default: /tmp/kafka-logs).

LogManager is used to create a ReplicaManager, a DynamicLogConfig, a TopicConfigHandler.

LogManager defaults to 30000ms for the initial task delay.

Table 1. LogManager’s Internal Properties (e.g. Registries, Counters and Flags)
Name Description

_liveLogDirs

Java’s ConcurrentLinkedQueue of log directories

Used when…​FIXME

Tip

Enable INFO or DEBUG logging level for kafka.log.LogManager logger to see what happens inside.

Add the following line to config/log4j.properties:

log4j.logger.kafka.log.LogManager=DEBUG

Refer to Logging.

Starting Up — startup Method

startup(): Unit

startup starts the background threads to flush logs and do log cleanup.

Internally, startup prints out the following INFO message to the logs:

Starting log cleanup with a period of [retentionCheckMs] ms.

startup requests the Scheduler to schedule a task with the name kafka-log-retention that cleanupLogs with the InitialTaskDelayMs delay, the retentionCheckMs execution period.

startup prints out the following INFO message to the logs:

Starting log flusher with a default period of [flushCheckMs] ms.

startup requests the Scheduler to schedule a task with…​FIXME

Note
startup is used when…​FIXME

cleanupLogs Method

cleanupLogs(): Unit

cleanupLogs…​FIXME

Note
cleanupLogs is used when…​FIXME

Getting All Partition Logs — allLogs Method

allLogs: Iterable[Log]

allLogs…​FIXME

Note
allLogs is used when…​FIXME

addLogToBeDeleted Internal Method

addLogToBeDeleted(log: Log): Unit

addLogToBeDeleted…​FIXME

Note
addLogToBeDeleted is used when…​FIXME

asyncDelete Method

asyncDelete(topicPartition: TopicPartition, isFuture: Boolean = false): Log

asyncDelete…​FIXME

Note
asyncDelete is used when…​FIXME

getOrCreateLog Method

getOrCreateLog(
  topicPartition: TopicPartition,
  config: LogConfig,
  isNew: Boolean = false,
  isFuture: Boolean = false): Log

getOrCreateLog…​FIXME

Note
getOrCreateLog is used exclusively when Partition is requested to getOrCreateReplica.

loadLog Internal Method

loadLog(
  logDir: File,
  recoveryPoints: Map[TopicPartition, Long],
  logStartOffsets: Map[TopicPartition, Long]): Unit

loadLog…​FIXME

Note
loadLog is used exclusively when LogManager is requested to loadLogs.

Loading Logs — loadLogs Internal Method

loadLogs(): Unit

loadLogs prints out the following INFO message to the logs:

Loading logs.

For every live log directory, loadLogs…​FIXME

Note
loadLogs is used exclusively when LogManager is created.

Creating LogManager Instance

LogManager takes the following when created:

  • Log directories

  • Initial offline directories

  • Topic configurations (Map[String, LogConfig])

  • Initial default configuration (LogConfig)

  • CleanerConfig

  • recoveryThreadsPerDataDir

  • flushCheckMs

  • flushRecoveryOffsetCheckpointMs

  • flushStartOffsetCheckpointMs

  • retentionCheckMs

  • maxPidExpirationMs

  • Scheduler

  • BrokerState

  • BrokerTopicStats

  • LogDirFailureChannel

  • Time

LogManager initializes the internal registries and counters.

While being created, LogManager load logs.

Creating LogManager — apply Factory Method

apply(
  config: KafkaConfig,
  initialOfflineDirs: Seq[String],
  zkClient: KafkaZkClient,
  brokerState: BrokerState,
  kafkaScheduler: KafkaScheduler,
  time: Time,
  brokerTopicStats: BrokerTopicStats,
  logDirFailureChannel: LogDirFailureChannel): LogManager

apply…​FIXME

Note
apply is used exclusively when KafkaServer is requested to start up.

liveLogDirs Method

liveLogDirs: Seq[File]

liveLogDirs…​FIXME

Note
liveLogDirs is used when…​FIXME

deleteLogs Internal Method

deleteLogs(): Unit

deleteLogs…​FIXME

Note
deleteLogs is used when…​FIXME

flushDirtyLogs Internal Method

flushDirtyLogs(): Unit

flushDirtyLogs…​FIXME

Note
flushDirtyLogs is used when…​FIXME

checkpointLogRecoveryOffsets Method

checkpointLogRecoveryOffsets(): Unit

checkpointLogRecoveryOffsets…​FIXME

Note
checkpointLogRecoveryOffsets is used when…​FIXME

checkpointLogStartOffsets Method

checkpointLogStartOffsets(): Unit

checkpointLogStartOffsets…​FIXME

Note
checkpointLogStartOffsets is used when…​FIXME

isLogDirOnline Method

isLogDirOnline(logDir: String): Boolean

isLogDirOnline…​FIXME

Note
isLogDirOnline is used when…​FIXME

results matching ""

    No results matching ""