log4j.logger.kafka.log.Log=ALL
Partition Log — Collection Of Log Segments
Log
represents a log of a partition of a topic.
Log
is a collection of LogSegments that are stored on disk in a given partition log directory with the name of the form topic-partition or topic-partition.uniqueId-delete (if marked for deletion).
While being created, Log
creates the log directory unless available already.
Log
is created when LogManager
is requested to:
-
Load a partition log (directory) (while being created which is when
KafkaServer
is requested to start up) -
Look up or create a partition log (when
Partition
is requested to getOrCreateReplica)
Log
uses .kafka_cleanshutdown file to indicate…FIXME
Log
is isFuture when…FIXME
Log
uses the file suffixes to differentiate between parts of log segments.
Suffix | Description |
---|---|
|
|
|
|
|
FileRecords log file |
|
|
|
|
|
The files of offset indices, time indices, and transaction indices are collectively called index files.
Log
uses a Scheduler to schedule the background tasks.
Name | Period | Delay | Description |
---|---|---|---|
|
Requests the ProducerStateManager to removeExpiredProducers Scheduled immediately when |
||
|
|
|
Scheduled when |
|
|
Scheduled when |
Log
uses [Log partition=[topicPartition], dir=[parent]] as the logging prefix (aka logIdent
).
Tip
|
Enable Add the following line to Refer to Logging. |
LogSegments — segments
Internal Registry
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
segments
is a Java’s ConcurrentSkipListMap of LogSegments per baseOffset.
A new LogSegment
is added when Log
is requested to addSegment.
-
Cleared in loadSegments just before loadSegmentFiles
Performance Metrics
Log
is a KafkaMetricsGroup with the following performance metrics.
Metric Name | Description |
---|---|
|
|
|
|
|
|
|
The performance metrics are registered in kafka.log:type=Log group.
Creating Log Instance
Log
takes the following to be created:
Log
initializes the internal properties.
While being created, Log
creates the log directory unless already available.
Log
loads segments.
Log
updateLogEndOffset.
Log
requests the LeaderEpochFileCache to remove epoch entries from the store with start offsets greater than or equal to the nextOffset
(LeaderEpochFileCache.truncateFromEnd
).
Log
computes the new start offset to be the maximum of the logStartOffset and the baseOffset
of the first LogSegment in the segments internal registry.
Log
requests the LeaderEpochFileCache to remove epoch entries from the store with offsets less than or equal to the new start offset (LeaderEpochFileCache.truncateFromStart
).
Log
throws a IllegalStateException
when the ProducerStateManager is not empty:
Producer state must be empty during log initialization
Log
loadProducerState (with the logEndOffset
and the reloadFromCleanShutdown
based on hasCleanShutdownFile).
In the end, Log
prints out the following INFO message to the logs:
Completed load of log with [size] segments, log start offset [logStartOffset] and log end offset [logEndOffset] in [time] ms
Creating Log Instance — apply
Utility
apply(
dir: File,
config: LogConfig,
logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
apply
parseTopicPartitionName from the log directory.
apply
creates a new ProducerStateManager.
In the end, apply
creates a Log.
Note
|
apply is used when LogManager is requested to loadLog and look up or create a new partition log.
|
roll
Method
roll(
expectedNextOffset: Option[Long] = None): LogSegment
roll
…FIXME
Note
|
roll is used when Log is requested to deleteSegments and maybeRoll.
|
closeHandlers
Method
closeHandlers(): Unit
closeHandlers
…FIXME
Note
|
closeHandlers is used when…FIXME
|
updateHighWatermark
Method
updateHighWatermark(
hw: Long): Long
updateHighWatermark
…FIXME
Note
|
updateHighWatermark is used when…FIXME
|
addAbortedTransactions
Internal Method
addAbortedTransactions(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
addAbortedTransactions
…FIXME
Note
|
addAbortedTransactions is used when Log is requested to read records.
|
collectAbortedTransactions
Internal Method
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
collectAbortedTransactions
…FIXME
Note
|
collectAbortedTransactions is used when Cleaner is requested to cleanSegments and buildOffsetMap.
|
maybeRoll
Internal Method
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
maybeRoll
…FIXME
Note
|
maybeRoll is used exclusively when Log is requested to append records.
|
asyncDeleteSegment
Internal Method
asyncDeleteSegment(segment: LogSegment): Unit
asyncDeleteSegment
…FIXME
Note
|
asyncDeleteSegment is used when Log is requested to deleteSegment and replaceSegments.
|
Flushing Log Segments Out To Disk — flush
Method
flush(): Unit (1)
flush(offset: Long): Unit
-
Uses logEndOffset for the offset (and so flushes all log segments)
flush
prints out the following DEBUG message to the logs:
Flushing log up to offset [offset], last flushed: [lastFlushTime], current time: [time], unflushed: [unflushedMessages]
flush
…FIXME
Note
|
|
deleteSeg
Internal Method
deleteSeg(): Unit
deleteSeg
…FIXME
Note
|
deleteSeg is used exclusively for the delete-file Background Task.
|
appendAsLeader
Method
appendAsLeader(
records: MemoryRecords,
leaderEpoch: Int,
isFromClient: Boolean = true): LogAppendInfo
appendAsLeader
simply appends the records with the assignOffsets
flag on.
Note
|
appendAsLeader is used exclusively when Partition is requested to appendRecordsToLeader.
|
appendAsFollower
Method
appendAsFollower(records: MemoryRecords): LogAppendInfo
appendAsFollower
simply append (with the isFromClient
and assignOffsets
flags off, and the leaderEpoch
being -1
).
Note
|
appendAsFollower is used exclusively when Partition is requested to doAppendRecordsToFollowerOrFutureReplica.
|
Appending Records — append
Internal Method
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
append
…FIXME
Note
|
append is used when Log is requested to appendAsLeader (with assignOffsets enabled) and appendAsFollower (with assignOffsets and isFromClient disabled).
|
analyzeAndValidateRecords
Internal Method
analyzeAndValidateRecords(
records: MemoryRecords,
isFromClient: Boolean): LogAppendInfo
analyzeAndValidateRecords
…FIXME
Note
|
analyzeAndValidateRecords is used exclusively when Log is requested to append.
|
deleteSegment
Internal Method
deleteSegment(segment: LogSegment): Unit
deleteSegment
…FIXME
Note
|
deleteSegment is used when Log is requested to recoverLog, deleteSegments, roll, truncateTo, and truncateFullyAndStartAt.
|
replaceSegments
Internal Method
replaceSegments(
newSegments: Seq[LogSegment],
oldSegments: Seq[LogSegment],
isRecoveredSwapFile: Boolean = false): Unit
replaceSegments
…FIXME
Note
|
|
Checking Whether .kafka_cleanshutdown Is In Parent Directory of Log Directory — hasCleanShutdownFile
Internal Method
hasCleanShutdownFile: Boolean
hasCleanShutdownFile
is true
when .kafka_cleanshutdown file is in the parent directory of the log directory. Otherwise, hasCleanShutdownFile
is false
.
Note
|
hasCleanShutdownFile is used exclusively when Log is created (to loadProducerState) and requested to recoverLog.
|
maybeIncrementLogStartOffset
Method
maybeIncrementLogStartOffset(
newLogStartOffset: Long): Unit
maybeIncrementLogStartOffset
…FIXME
Note
|
maybeIncrementLogStartOffset is used when…FIXME
|
truncateTo
Internal Method
truncateTo(targetOffset: Long): Boolean
truncateTo
…FIXME
Note
|
truncateTo is used when LogManager is requested to truncateTo.
|
truncateFullyAndStartAt
Internal Method
truncateFullyAndStartAt(newOffset: Long): Unit
truncateFullyAndStartAt
…FIXME
Note
|
|
Scheduling Deletion Of Old Segments (Log Retention) — deleteOldSegments
Method
deleteOldSegments(): Long
deleteOldSegments
uses the delete flag (of the given LogConfig) to determine the scope of log deletion and returns the number of segments deleted.
Note
|
delete flag is enabled (true ) when delete cleanup policy is part of the cleanup.policy configuration property.
|
With the delete flag enabled (true
), deleteOldSegments
deleteRetentionMsBreachedSegments, deleteRetentionSizeBreachedSegments and deleteLogStartOffsetBreachedSegments.
With the delete flag disabled (false
), deleteOldSegments
merely deleteLogStartOffsetBreachedSegments.
Note
|
|
Scheduling Deletion Of Old Log Segments (Per Predicate) — deleteOldSegments
Internal Method
deleteOldSegments(
predicate: (LogSegment, Option[LogSegment]) => Boolean,
reason: String): Int
deleteOldSegments
finds deletable segments for the given predicate
and schedules their deletion.
If found any, deleteOldSegments
prints out the following INFO message to the logs:
Found deletable segments with base offsets [[baseOffsets]] due to [reason]
Note
|
deleteOldSegments is used when Log is requested to deleteRetentionMsBreachedSegments, deleteRetentionSizeBreachedSegments, and deleteLogStartOffsetBreachedSegments.
|
Finding Deletable Segments (Per Predicate) — deletableSegments
Internal Method
deletableSegments(
predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment]
deletableSegments
…FIXME
Note
|
deletableSegments is used exclusively when Log is requested to schedule deletion of old segments (per predicate).
|
deleteSegments
Internal Method
deleteSegments(
deletable: Iterable[LogSegment]): Int
deleteSegments
roll if the number of deletable LogSegments is exactly all the segments.
For every log segment, deleteSegments
simply schedules it for deletion and maybeIncrementLogStartOffset (based on…FIXME).
Note
|
deleteSegments is used exclusively when Log is requested to schedule deletion of old segments (per predicate).
|
deleteRetentionMsBreachedSegments
Internal Method
deleteRetentionMsBreachedSegments(): Int
deleteRetentionMsBreachedSegments
uses the retentionMs threshold (of the given LogConfig) to determine the scope of log retention.
deleteRetentionMsBreachedSegments
schedules deletion of segments with their largestTimestamp below the retentionMs threshold.
deleteRetentionMsBreachedSegments
uses the following reason:
retention time [retentionMs]ms breach
deleteRetentionMsBreachedSegments
simply returns 0
for a negative retention.ms threshold.
Note
|
deleteRetentionMsBreachedSegments is used exclusively when Log is requested to schedule deletion of old segments (log retention).
|
deleteRetentionSizeBreachedSegments
Internal Method
deleteRetentionSizeBreachedSegments(): Int
deleteRetentionSizeBreachedSegments
uses the retentionSize threshold (of the given LogConfig) to determine the scope of log retention.
Note
|
retentionSize threshold is the value of retention.bytes configuration for a topic or log.retention.bytes configuration for the cluster. |
With the retentionSize threshold negative (the default is -1L) or the size of the log below it, deleteRetentionSizeBreachedSegments
simply exits with the return value of 0
.
deleteRetentionSizeBreachedSegments
schedules deletion of records in log segments (per their size) so the log size drops below the threshold.
deleteRetentionSizeBreachedSegments
uses the following reason:
retention size in bytes [retentionSize] breach
Note
|
deleteRetentionSizeBreachedSegments is used exclusively when Log is requested to schedule deletion of old segments (log retention) (for delete cleanup policy).
|
deleteLogStartOffsetBreachedSegments
Internal Method
deleteLogStartOffsetBreachedSegments(): Int
deleteLogStartOffsetBreachedSegments
…FIXME
Note
|
deleteLogStartOffsetBreachedSegments is used when…FIXME
|
splitOverflowedSegment
Internal Method
splitOverflowedSegment(
segment: LogSegment): List[LogSegment]
splitOverflowedSegment
…FIXME
Note
|
|
onHighWatermarkIncremented
Method
onHighWatermarkIncremented(highWatermark: Long): Unit
onHighWatermarkIncremented
…FIXME
Note
|
onHighWatermarkIncremented is used when Replica is created and <<kafka-cluster-Replica.adoc#highWatermark_, highWatermark_⇒>.
|
parseTopicPartitionName
Object Method
parseTopicPartitionName(dir: File): TopicPartition
parseTopicPartitionName
parses the name of the given directory and creates a TopicPartition
.
parseTopicPartitionName
assumes that the name is of the form topic-partition or topic-partition.uniqueId-delete (if marked for deletion).
parseTopicPartitionName
uses all characters up to the last -
for the topic name and the rest as the partition ID.
Note
|
|
offsetFromFileName
Object Method
offsetFromFileName(filename: String): Long
offsetFromFileName
…FIXME
Note
|
offsetFromFileName is used when Log is requested to removeTempFilesAndCollectSwapFiles (right when created) and offsetFromFile.
|
offsetFromFile
Object Method
offsetFromFile(file: File): Long
offsetFromFile
…FIXME
Note
|
offsetFromFile is used when…FIXME
|
Reading Records — read
Method
read(
startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
read
…FIXME
Note
|
|
convertToOffsetMetadata
Method
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
convertToOffsetMetadata
…FIXME
Note
|
convertToOffsetMetadata is used exclusively when Replica is requested to convertHWToLocalOffsetMetadata
|
logEndOffset
Method
logEndOffset: Long
logEndOffset
is the offset of the next message that will be appended to the log (based on the nextOffsetMetadata internal registry).
Note
|
logEndOffset is used when…FIXME
|
addSegment
Method
addSegment(
segment: LogSegment): LogSegment
addSegment
simply associates the given LogSegment with the baseOffset in the segments internal registry.
Note
|
addSegment is used when Log is requested to replaceSegments, loadSegmentFiles, loadSegments, recoverLog, roll, and truncateFullyAndStartAt.
|
updateLogEndOffset
Internal Method
updateLogEndOffset(messageOffset: Long): Unit
updateLogEndOffset
simply creates a new LogOffsetMetadata
(with the messageOffset
, active segment) and becomes the nextOffsetMetadata internal registry.
Note
|
updateLogEndOffset is used when Log is requested to append records, roll log segment, truncateTo, and truncateFullyAndStartAt.
|
activeSegment
Method
activeSegment: LogSegment
activeSegment
gives the active LogSegment that is currently taking appends (that is the greatest key in the segments internal registry).
Note
|
activeSegment is used exclusively when Log is created (to create a LogOffsetMetadata).
|
Updating Config — updateConfig
Method
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
If message.format.version
is among the updated keys, updateConfig
…FIXME
Note
|
|
renameDir
Method
renameDir(
name: String): Unit
renameDir
…FIXME
Note
|
renameDir is used when LogManager is requested to replaceCurrentWithFutureLog and asyncDelete.
|
Creating Log File — logFile
Utility
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
logFile
creates a prefix of the file name of a log segment (for the given offset
, the .log suffix and the optional suffix
) in the given dir
directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.logFile(dir, offset = 10, suffix = ".suffix")
assert(log_file.getName == "00000000000000000010.log.suffix")
Note
|
|
offsetIndexFile
Method
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
offsetIndexFile
creates a prefix of the file name of a log segment (for the given offset
, the .index suffix and the optional suffix
) in the given dir
directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.offsetIndexFile(dir, offset = 10, suffix = ".suffix")
assert(log_file.getName == "00000000000000000010.index.suffix")
Note
|
|
timeIndexFile
Method
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile
creates a prefix of the file name of a log segment (for the given offset
, the .timeindex suffix and the optional suffix
) in the given dir
directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.timeIndexFile(dir, offset = 10, suffix = ".suffix")
assert(log_file.getName == "00000000000000000010.timeindex.suffix")
Note
|
|
transactionIndexFile
Method
transactionIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile
creates a prefix of the file name of a log segment (for the given offset
, the .txnindex suffix and the optional suffix
) in the given dir
directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.transactionIndexFile(dir, offset = 10, suffix = ".suffix")
assert(log_file.getName == "00000000000000000010.txnindex.suffix")
Note
|
|
producerSnapshotFile
Method
producerSnapshotFile(
dir: File,
offset: Long): File
producerSnapshotFile
creates a prefix of the file name of a log segment (for the given offset
and the .snapshot suffix) in the given dir
directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.producerSnapshotFile(dir, offset = 10)
assert(log_file.getName == "00000000000000000010.snapshot")
Note
|
producerSnapshotFile is used exclusively when ProducerStateManager is requested to takeSnapshot.
|
Creating File Name Of Log Segment (From Offset) — filenamePrefixFromOffset
Utility
filenamePrefixFromOffset(
offset: Long): String
filenamePrefixFromOffset
uses java.text.NumberFormat to format the given offset
:
-
Minimum number of digits: 20
-
Maximum number of digits in the fraction portion of a number: 0
-
No grouping used
import kafka.log.Log
val filenamePrefix = Log.filenamePrefixFromOffset(offset = 10)
assert(filenamePrefix == "00000000000000000010")
Note
|
filenamePrefixFromOffset is used when Log utility is used to create file names for logFile, offsetIndexFile, timeIndexFile, producerSnapshotFile, and transactionIndexFile.
|
maybeAssignEpochStartOffset
Method
maybeAssignEpochStartOffset(
leaderEpoch: Int,
startOffset: Long): Unit
maybeAssignEpochStartOffset
…FIXME
Note
|
maybeAssignEpochStartOffset is used when…FIXME
|
sizeInBytes
Utility
sizeInBytes(
segments: Iterable[LogSegment]): Long
sizeInBytes
sums up the sizes of the given log segments.
Note
|
sizeInBytes is used when Log is requested for the size.
|
Loading Segments — loadSegments
Internal Method
loadSegments(): Long
loadSegments
removeTempFilesAndCollectSwapFiles.
loadSegments
loadSegmentFiles (with retries when there are log segments with offset overflow).
loadSegments
completeSwapOperations.
loadSegments
branches off per whether the log directory is scheduled to be deleted or not.
Note
|
loadSegments is used when Log is created (to create a LogOffsetMetadata).
|
loadSegments
Internal Method and Log Directory Not Scheduled For Deletion
For the log directory that is not scheduled to be deleted, loadSegments
recoverLog.
loadSegments
requests the active segment to resizeIndexes (to the value of segment.index.bytes configuration property).
In the end, loadSegments
returns the next offset after recovery.
loadSegments
Internal Method and Log Directory Scheduled For Deletion
For the log directory that is scheduled to be deleted, loadSegments
adds a new log segment (with base offset 0
and initFileSize).
In the end, loadSegments
returns 0
.
recoverLog
Internal Method
recoverLog(): Long
recoverLog
…FIXME
Note
|
recoverLog is used when Log is requested to loadSegments.
|
removeTempFilesAndCollectSwapFiles
Internal Method
removeTempFilesAndCollectSwapFiles(): Set[File]
removeTempFilesAndCollectSwapFiles
…FIXME
Note
|
removeTempFilesAndCollectSwapFiles is used exclusively when Log is requested to loadSegments (right when created).
|
Loading Segment Files — loadSegmentFiles
Internal Method
loadSegmentFiles(): Unit
loadSegmentFiles
processes index and log files in the log directory (by name in ascending order).
Internally, loadSegmentFiles
finds all the files (sorted by name) in the log directory and branches off per whether a file is an index or a log file.
Note
|
loadSegmentFiles is used when Log is created (and in turn loadSegments).
|
Loading Log Files
For a log file, loadSegmentFiles
opens it and requests sanityCheck.
In case of NoSuchFileException
, loadSegmentFiles
prints out the following ERROR to the logs and recovers the segment.
Could not find offset index file corresponding to log file [path], recovering segment and rebuilding index files...
In case of CorruptIndexException
, loadSegmentFiles
prints out the following ERROR to the logs and recovers the segment.
Found a corrupted index file corresponding to log file [path] due to [message], recovering segment and rebuilding index files...
In the end, loadSegmentFiles
addSegment.
Loading Index Files
For index files, loadSegmentFiles
simply makes sure that they have corresponding .log files (in the same log directory).
If an orphaned index file is found, loadSegmentFiles
simply prints out the following WARN message and deletes the file:
Found an orphaned index file [path], with no corresponding log file.
isIndexFile
Internal Object Method
isIndexFile(file: File): Boolean
isIndexFile
is positive (true
) when the given file has got one of the following file suffices:
Otherwise, isIndexFile
is false
.
Note
|
isIndexFile is used when Log is requested to removeTempFilesAndCollectSwapFiles and loadSegmentFiles.
|
isLogFile
Internal Object Method
isLogFile(file: File): Boolean
isLogFile
returns true
when the given file has .log file suffix. Otherwise, isLogFile
is false
.
Note
|
isLogFile is used when Log is requested to removeTempFilesAndCollectSwapFiles, loadSegmentFiles, and splitOverflowedSegment.
|
Recovering Log Segment — recoverSegment
Internal Method
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
recoverSegment
creates a new ProducerStateManager (for the TopicPartition, log directory and maxProducerIdExpirationMs).
Note
|
Why does recoverSegment create a new ProducerStateManager rather than using the ProducerStateManager?
|
recoverSegment
then rebuildProducerState (with the baseOffset of the LogSegment, the reloadFromCleanShutdown
flag off, and the new ProducerStateManager
).
recoverSegment
requests the given LogSegment
to recover (with the new ProducerStateManager
and the optional LeaderEpochFileCache
).
recoverSegment
requests the ProducerStateManager
to takeSnapshot.
recoverSegment
returns the number of bytes truncated from the log (while doing segment recovery).
Note
|
recoverSegment is used when Log is requested to loadSegmentFiles, completeSwapOperations, and recoverLog.
|
loadProducerState
Internal Method
loadProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean): Unit
loadProducerState
rebuildProducerState (with the lastOffset
, reloadFromCleanShutdown
and the ProducerStateManager).
In the end, loadProducerState
updateFirstUnstableOffset.
Note
|
loadProducerState is used when Log is created and requested to truncateTo.
|
rebuildProducerState
Internal Method
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
rebuildProducerState
…FIXME
Note
|
rebuildProducerState is used when Log is requested to recoverSegment and loadProducerState.
|
updateFirstUnstableOffset
Internal Method
updateFirstUnstableOffset(): Unit
updateFirstUnstableOffset
…FIXME
Note
|
updateFirstUnstableOffset is used when Log is requested to loadProducerState, append, onHighWatermarkIncremented, maybeIncrementLogStartOffset, and truncateFullyAndStartAt.
|
completeSwapOperations
Internal Method
completeSwapOperations(
swapFiles: Set[File]): Unit
completeSwapOperations
…FIXME
Note
|
completeSwapOperations is used when Log is created (and in turn loadSegments).
|
retryOnOffsetOverflow
Internal Method
retryOnOffsetOverflow[T](fn: => T): T
retryOnOffsetOverflow
executes the fn
block and returns the result.
In case of LogSegmentOffsetOverflowException
, retryOnOffsetOverflow
prints out the following INFO message to the logs, splitOverflowedSegment and retries execution of the fn
block.
Caught segment overflow error: [message]. Split segment and retry.
Note
|
retryOnOffsetOverflow is used exclusively when Log is requested to loadSegments.
|
initializeLeaderEpochCache
Internal Method
initializeLeaderEpochCache(): Unit
initializeLeaderEpochCache
…FIXME
Note
|
initializeLeaderEpochCache is used when Log is created and later requested to updateConfig and renameDir.
|
Internal Properties
Name | Description |
---|---|
|
Used when:
|