Partition Log

Log is a partition log.

Log is stored on disk in a log directory with the name of the form topic-partition or topic-partition.uniqueId-delete (if marked for deletion).

Log is created when LogManager is requested to:

Log uses .kafka_cleanshutdown file to indicate…​FIXME

Log is isFuture when…​FIXME

Table 1. File Suffixes
Suffix Description

-delete

.txnindex

.timeindex

.index

.log

Log uses a Scheduler to schedule the background tasks.

Table 2. Log’s Background Tasks
Name Period Delay Description

PeriodicProducerExpirationCheck

producerIdExpirationCheckIntervalMs

producerIdExpirationCheckIntervalMs

Requests the ProducerStateManager to removeExpiredProducers

Scheduled immediately when Log is created.

flush-log

-1 (once)

0L

flush

Scheduled when Log is requested to roll.

delete-file

-1 (once)

file.delete.delay.ms

deleteSeg

Scheduled when Log is requested to asyncDeleteSegment.

Log uses [Log partition=[topicPartition], dir=[parent]] as the logging prefix (aka logIdent).

Tip

Enable ALL logging levels for kafka.log.Log logger to see what happens inside.

Add the following line to log4j.properties:

log4j.logger.kafka.log.Log=ALL

Refer to Logging.

Performance Metrics

Log is a KafkaMetricsGroup with the following performance metrics.

Table 3. Log’s Performance Metrics
Metric Name Description

NumLogSegments

LogEndOffset

LogStartOffset

Size

The performance metrics are registered in kafka.log:type=Log group.

Log jconsole.png
Figure 1. Log in jconsole

Creating Log Instance

Log takes the following to be created:

Log initializes the internal properties.

While being created, Log creates the log directory unless already available.

Log requests the LeaderEpochFileCache to remove epoch entries from the store with start offsets greater than or equal to the nextOffset (LeaderEpochFileCache.truncateFromEnd).

Log computes the new start offset to be the maximum of the logStartOffset and the baseOffset of the first LogSegment in the segments internal registry.

Log requests the LeaderEpochFileCache to remove epoch entries from the store with offsets less than or equal to the new start offset (LeaderEpochFileCache.truncateFromStart).

Log throws a IllegalStateException when the ProducerStateManager is not empty:

Producer state must be empty during log initialization

Log loadProducerState (with the logEndOffset and the reloadFromCleanShutdown based on hasCleanShutdownFile).

In the end, Log prints out the following INFO message to the logs:

Completed load of log with [size] segments, log start offset [logStartOffset] and log end offset [logEndOffset] in [time] ms

Creating Log Instance — apply Factory Method

apply(
  dir: File,
  config: LogConfig,
  logStartOffset: Long,
  recoveryPoint: Long,
  scheduler: Scheduler,
  brokerTopicStats: BrokerTopicStats,
  time: Time = Time.SYSTEM,
  maxProducerIdExpirationMs: Int,
  producerIdExpirationCheckIntervalMs: Int,
  logDirFailureChannel: LogDirFailureChannel): Log

apply creates a new ProducerStateManager.

In the end, apply creates a Log.

Note
apply is used when LogManager is requested to loadLog and getOrCreateLog.

roll Method

roll(
  expectedNextOffset: Option[Long] = None): LogSegment

roll…​FIXME

Note
roll is used when Log is requested to deleteSegments and maybeRoll.

maybeRoll Internal Method

maybeRoll(
  messagesSize: Int,
  appendInfo: LogAppendInfo): LogSegment

maybeRoll…​FIXME

Note
maybeRoll is used exclusively when Log is requested to append.

asyncDeleteSegment Internal Method

asyncDeleteSegment(segment: LogSegment): Unit

asyncDeleteSegment…​FIXME

Note
asyncDeleteSegment is used when Log is requested to deleteSegment and replaceSegments.

flush Method

flush(): Unit (1)
flush(offset: Long): Unit
  1. Uses logEndOffset for the offset

flush…​FIXME

Note

flush is used when:

deleteSeg Internal Method

deleteSeg(): Unit

deleteSeg…​FIXME

Note
deleteSeg is used exclusively for the delete-file Background Task.

appendAsLeader Method

appendAsLeader(
  records: MemoryRecords,
  leaderEpoch: Int,
  isFromClient: Boolean = true): LogAppendInfo

appendAsLeader simply append with the assignOffsets flag on.

Note
appendAsLeader is used exclusively when Partition is requested to appendRecordsToLeader.

appendAsFollower Method

appendAsFollower(records: MemoryRecords): LogAppendInfo

appendAsFollower simply append (with the isFromClient and assignOffsets flags off, and the leaderEpoch being -1).

Note
appendAsFollower is used exclusively when Partition is requested to doAppendRecordsToFollowerOrFutureReplica.

append Internal Method

append(
  records: MemoryRecords,
  isFromClient: Boolean,
  interBrokerProtocolVersion: ApiVersion,
  assignOffsets: Boolean,
  leaderEpoch: Int): LogAppendInfo

append…​FIXME

Note
append is used when Log is requested to appendAsLeader (with assignOffsets enabled) and appendAsFollower (with assignOffsets and isFromClient disabled).

analyzeAndValidateRecords Internal Method

analyzeAndValidateRecords(
  records: MemoryRecords,
  isFromClient: Boolean): LogAppendInfo

analyzeAndValidateRecords…​FIXME

Note
analyzeAndValidateRecords is used exclusively when Log is requested to append.

deleteSegment Internal Method

deleteSegment(segment: LogSegment): Unit

deleteSegment…​FIXME

Note
deleteSegment is used when Log is requested to recoverLog, deleteSegments, roll, truncateTo, and truncateFullyAndStartAt.

replaceSegments Internal Method

replaceSegments(
  newSegments: Seq[LogSegment],
  oldSegments: Seq[LogSegment],
  isRecoveredSwapFile: Boolean = false): Unit

replaceSegments…​FIXME

Note

replaceSegments is used when:

recoverLog Internal Method

recoverLog(): Long

recoverLog…​FIXME

Note
recoverLog is used exclusively when Log is requested to loadSegments.

Checking Whether .kafka_cleanshutdown Is In Parent Directory of Log Directory — hasCleanShutdownFile Internal Method

hasCleanShutdownFile: Boolean

hasCleanShutdownFile is true when .kafka_cleanshutdown file is in the parent directory of the log directory. Otherwise, hasCleanShutdownFile is false.

Note
hasCleanShutdownFile is used exclusively when Log is created (to loadProducerState) and requested to recoverLog.

deleteSegments Internal Method

deleteSegments(deletable: Iterable[LogSegment]): Int

deleteSegments…​FIXME

Note
deleteSegments is used exclusively when Log is requested to deleteOldSegments.

truncateTo Internal Method

truncateTo(targetOffset: Long): Boolean

truncateTo…​FIXME

Note
truncateTo is used exclusively when LogManager is requested to truncateTo.

truncateFullyAndStartAt Internal Method

truncateFullyAndStartAt(newOffset: Long): Unit

truncateFullyAndStartAt…​FIXME

Note

truncateFullyAndStartAt is used when:

deleteOldSegments Method

deleteOldSegments(): Long
// Private API
deleteOldSegments(
  predicate: (LogSegment, Option[LogSegment]) => Boolean,
  reason: String): Int

deleteOldSegments…​FIXME

Note

deleteOldSegments is used when:

deleteRetentionMsBreachedSegments Internal Method

deleteRetentionMsBreachedSegments(): Int

deleteRetentionMsBreachedSegments…​FIXME

Note
deleteRetentionMsBreachedSegments is used when…​FIXME

deleteRetentionSizeBreachedSegments Internal Method

deleteRetentionSizeBreachedSegments(): Int

deleteRetentionSizeBreachedSegments…​FIXME

Note
deleteRetentionSizeBreachedSegments is used when…​FIXME

deleteLogStartOffsetBreachedSegments Internal Method

deleteLogStartOffsetBreachedSegments(): Int

deleteLogStartOffsetBreachedSegments…​FIXME

Note
deleteLogStartOffsetBreachedSegments is used when…​FIXME

splitOverflowedSegment Internal Method

splitOverflowedSegment(segment: LogSegment): List[LogSegment]

splitOverflowedSegment…​FIXME

Note

splitOverflowedSegment is used when:

onHighWatermarkIncremented Method

onHighWatermarkIncremented(highWatermark: Long): Unit

onHighWatermarkIncremented…​FIXME

Note
onHighWatermarkIncremented is used when Replica is created and <<kafka-cluster-Replica.adoc#highWatermark_, highWatermark_⇒>.

parseTopicPartitionName Object Method

parseTopicPartitionName(dir: File): TopicPartition

parseTopicPartitionName parses the name of the given directory and creates a TopicPartition.

parseTopicPartitionName assumes that the name is of the form topic-partition or topic-partition.uniqueId-delete (if marked for deletion).

parseTopicPartitionName uses all characters up to the last - for the topic name and the rest as the partition ID.

Note

parseTopicPartitionName is used when:

offsetFromFileName Object Method

offsetFromFileName(filename: String): Long

offsetFromFileName…​FIXME

Note
offsetFromFileName is used when Log is requested to removeTempFilesAndCollectSwapFiles (right when created) and offsetFromFile.

offsetFromFile Object Method

offsetFromFile(file: File): Long

offsetFromFile…​FIXME

Note
offsetFromFile is used when…​FIXME

read Method

read(
  startOffset: Long,
  maxLength: Int,
  maxOffset: Option[Long],
  minOneMessage: Boolean,
  includeAbortedTxns: Boolean): FetchDataInfo

read…​FIXME

Note

read is used when:

convertToOffsetMetadata Method

convertToOffsetMetadata(
  offset: Long): Option[LogOffsetMetadata]

convertToOffsetMetadata…​FIXME

Note
convertToOffsetMetadata is used exclusively when Replica is requested to convertHWToLocalOffsetMetadata

logEndOffset Method

logEndOffset: Long

logEndOffset is the offset of the next message that will be appended to the log (based on the nextOffsetMetadata internal registry).

Note
logEndOffset is used when…​FIXME

addSegment Method

addSegment(segment: LogSegment): LogSegment

addSegment simply associates the given LogSegment with the baseOffset in the segments internal registry.

Note
addSegment is used when Log is requested to replaceSegments, loadSegmentFiles, loadSegments, recoverLog, roll, and truncateFullyAndStartAt.

updateLogEndOffset Internal Method

updateLogEndOffset(messageOffset: Long): Unit

updateLogEndOffset simply creates a new LogOffsetMetadata (with the messageOffset, active segment) and becomes the nextOffsetMetadata internal registry.

Note
updateLogEndOffset is used when Log is requested to append records, roll log segment, truncateTo, and truncateFullyAndStartAt.

activeSegment Method

activeSegment: LogSegment

activeSegment gives the active LogSegment that is currently taking appends (that is the greatest key in the segments internal registry).

Note
activeSegment is used exclusively when Log is created (to create a LogOffsetMetadata).

Updating Config — updateConfig Method

updateConfig(
  updatedKeys: Set[String],
  newConfig: LogConfig): Unit

updateConfig replaces the current LogConfig with the given LogConfig.

If message.format.version is among the updated keys, updateConfig…​FIXME

Note

updateConfig is used when:

renameDir Method

renameDir(name: String): Unit

renameDir…​FIXME

Note
renameDir is used when LogManager is requested to replaceCurrentWithFutureLog and asyncDelete.

loadSegments Internal Method

loadSegments(): Long

loadSegments loadSegmentFiles (with retries when there are log segments with offset overflow).

loadSegments completeSwapOperations.

loadSegments branches off per whether the log directory is scheduled to be deleted or not.

Note
loadSegments is used exclusively when Log is created (to create a LogOffsetMetadata).

loadSegments Internal Method and Log Directory Not Scheduled For Deletion

For the log directory that is not scheduled to be deleted, loadSegments recoverLog.

loadSegments requests the active segment to resizeIndexes (to the value of segment.index.bytes configuration property).

In the end, loadSegments returns the next offset after recovery.

loadSegments Internal Method and Log Directory Scheduled For Deletion

For the log directory that is scheduled to be deleted, loadSegments adds a new log segment (with base offset 0 and initFileSize).

In the end, loadSegments returns 0.

removeTempFilesAndCollectSwapFiles Internal Method

removeTempFilesAndCollectSwapFiles(): Set[File]

removeTempFilesAndCollectSwapFiles…​FIXME

Note
removeTempFilesAndCollectSwapFiles is used exclusively when Log is requested to loadSegments (right when created).

loadSegmentFiles Internal Method

loadSegmentFiles(): Unit

loadSegmentFiles processes index and log files in the log directory.

Internally, loadSegmentFiles finds all the files (sorted by name) in the log directory and branches off per whether a file is an index or a log file.

Note
loadSegmentFiles is used exclusively when Log is requested to loadSegments (right when created).

loadSegmentFiles Internal Method and Index Files

For an index file, loadSegmentFiles simply makes sure that it has a corresponding .log file (in the same log directory).

If the file is an orphaned index file, loadSegmentFiles simply prints out the following WARN message and deletes the file:

Found an orphaned index file [path], with no corresponding log file.

loadSegmentFiles Internal Method and Log Files

For an log file, loadSegmentFiles opens it and requests sanityCheck.

In case of NoSuchFileException, loadSegmentFiles prints out the following ERROR to the logs and recovers the segment.

Could not find offset index file corresponding to log file [path], recovering segment and rebuilding index files...

In case of CorruptIndexException, loadSegmentFiles prints out the following ERROR to the logs and recovers the segment.

Found a corrupted index file corresponding to log file [path] due to [message], recovering segment and rebuilding index files...

In the end, loadSegmentFiles addSegment.

isIndexFile Internal Object Method

isIndexFile(file: File): Boolean

isIndexFile is true for files with the following file suffices:

Otherwise, isIndexFile is false.

Note
isIndexFile is used when Log is requested to removeTempFilesAndCollectSwapFiles and loadSegmentFiles.

isLogFile Internal Object Method

isLogFile(file: File): Boolean

isLogFile returns true when the given file has .log file suffix. Otherwise, isLogFile is false.

Note
isLogFile is used when Log is requested to removeTempFilesAndCollectSwapFiles, loadSegmentFiles, and splitOverflowedSegment.

Recovering Log Segment — recoverSegment Internal Method

recoverSegment(
  segment: LogSegment,
  leaderEpochCache: Option[LeaderEpochFileCache] = None): Int

recoverSegment creates a new ProducerStateManager (for the TopicPartition, log directory and maxProducerIdExpirationMs).

Note
Why does recoverSegment create a new ProducerStateManager rather than using the ProducerStateManager?

recoverSegment then rebuildProducerState (with the baseOffset of the LogSegment, the reloadFromCleanShutdown flag off, and the new ProducerStateManager).

recoverSegment requests the given LogSegment to recover (with the new ProducerStateManager and the optional LeaderEpochFileCache).

recoverSegment requests the ProducerStateManager to takeSnapshot.

recoverSegment returns the number of bytes truncated from the log (while doing segment recovery).

Note
recoverSegment is used when Log is requested to loadSegmentFiles, completeSwapOperations, and recoverLog.

loadProducerState Internal Method

loadProducerState(
  lastOffset: Long,
  reloadFromCleanShutdown: Boolean): Unit

loadProducerState rebuildProducerState (with the lastOffset, reloadFromCleanShutdown and the ProducerStateManager).

In the end, loadProducerState updateFirstUnstableOffset.

Note
loadProducerState is used when Log is created and requested to truncateTo.

rebuildProducerState Internal Method

rebuildProducerState(
  lastOffset: Long,
  reloadFromCleanShutdown: Boolean,
  producerStateManager: ProducerStateManager): Unit

rebuildProducerState…​FIXME

Note
rebuildProducerState is used when Log is requested to recoverSegment and loadProducerState.

updateFirstUnstableOffset Internal Method

updateFirstUnstableOffset(): Unit

updateFirstUnstableOffset…​FIXME

Note
updateFirstUnstableOffset is used when Log is requested to loadProducerState, append, onHighWatermarkIncremented, maybeIncrementLogStartOffset, and truncateFullyAndStartAt.

completeSwapOperations Internal Method

completeSwapOperations(swapFiles: Set[File]): Unit

completeSwapOperations…​FIXME

Note
completeSwapOperations is used exclusively when Log is requested to loadSegments.

retryOnOffsetOverflow Internal Method

retryOnOffsetOverflow[T](fn: => T): T

retryOnOffsetOverflow executes the fn block and returns the result.

In case of LogSegmentOffsetOverflowException, retryOnOffsetOverflow prints out the following INFO message to the logs, splitOverflowedSegment and retries.

Caught segment overflow error: [message]. Split segment and retry.
Note
retryOnOffsetOverflow is used exclusively when Log is requested to loadSegments.

initializeLeaderEpochCache Internal Method

initializeLeaderEpochCache(): Unit

initializeLeaderEpochCache…​FIXME

Note
initializeLeaderEpochCache is used when Log is created and later requested to updateConfig and renameDir.

Internal Properties

Name Description

nextOffsetMetadata

LogOffsetMetadata (log end offset) of the next message that will be appended to the log

Used when:

segments

Used when…​FIXME

results matching ""

    No results matching ""