log4j.logger.kafka.log.Log=ALL
Partition Log — Collection Of Log Segments
Log represents a log of a partition of a topic.
Log is a collection of LogSegments that are stored on disk in a given partition log directory with the name of the form topic-partition or topic-partition.uniqueId-delete (if marked for deletion).
While being created, Log creates the log directory unless available already.
Log is created when LogManager is requested to:
-
Load a partition log (directory) (while being created which is when
KafkaServeris requested to start up) -
Look up or create a partition log (when
Partitionis requested to getOrCreateReplica)
Log uses .kafka_cleanshutdown file to indicate…FIXME
Log is isFuture when…FIXME
Log uses the file suffixes to differentiate between parts of log segments.
| Suffix | Description |
|---|---|
|
|
|
|
|
FileRecords log file |
|
|
|
|
|
The files of offset indices, time indices, and transaction indices are collectively called index files.
Log uses a Scheduler to schedule the background tasks.
| Name | Period | Delay | Description |
|---|---|---|---|
|
Requests the ProducerStateManager to removeExpiredProducers Scheduled immediately when |
||
|
|
|
Scheduled when |
|
|
Scheduled when |
Log uses [Log partition=[topicPartition], dir=[parent]] as the logging prefix (aka logIdent).
|
Tip
|
Enable Add the following line to Refer to Logging. |
LogSegments — segments Internal Registry
segments: ConcurrentNavigableMap[java.lang.Long, LogSegment]
segments is a Java’s ConcurrentSkipListMap of LogSegments per baseOffset.
A new LogSegment is added when Log is requested to addSegment.
-
Cleared in loadSegments just before loadSegmentFiles
Performance Metrics
Log is a KafkaMetricsGroup with the following performance metrics.
| Metric Name | Description |
|---|---|
|
|
|
|
|
|
|
The performance metrics are registered in kafka.log:type=Log group.
Creating Log Instance
Log takes the following to be created:
Log initializes the internal properties.
While being created, Log creates the log directory unless already available.
Log loads segments.
Log updateLogEndOffset.
Log requests the LeaderEpochFileCache to remove epoch entries from the store with start offsets greater than or equal to the nextOffset (LeaderEpochFileCache.truncateFromEnd).
Log computes the new start offset to be the maximum of the logStartOffset and the baseOffset of the first LogSegment in the segments internal registry.
Log requests the LeaderEpochFileCache to remove epoch entries from the store with offsets less than or equal to the new start offset (LeaderEpochFileCache.truncateFromStart).
Log throws a IllegalStateException when the ProducerStateManager is not empty:
Producer state must be empty during log initialization
Log loadProducerState (with the logEndOffset and the reloadFromCleanShutdown based on hasCleanShutdownFile).
In the end, Log prints out the following INFO message to the logs:
Completed load of log with [size] segments, log start offset [logStartOffset] and log end offset [logEndOffset] in [time] ms
Creating Log Instance — apply Utility
apply(
dir: File,
config: LogConfig,
logStartOffset: Long,
recoveryPoint: Long,
scheduler: Scheduler,
brokerTopicStats: BrokerTopicStats,
time: Time = Time.SYSTEM,
maxProducerIdExpirationMs: Int,
producerIdExpirationCheckIntervalMs: Int,
logDirFailureChannel: LogDirFailureChannel): Log
apply parseTopicPartitionName from the log directory.
apply creates a new ProducerStateManager.
In the end, apply creates a Log.
|
Note
|
apply is used when LogManager is requested to loadLog and look up or create a new partition log.
|
roll Method
roll(
expectedNextOffset: Option[Long] = None): LogSegment
roll…FIXME
|
Note
|
roll is used when Log is requested to deleteSegments and maybeRoll.
|
closeHandlers Method
closeHandlers(): Unit
closeHandlers…FIXME
|
Note
|
closeHandlers is used when…FIXME
|
updateHighWatermark Method
updateHighWatermark(
hw: Long): Long
updateHighWatermark…FIXME
|
Note
|
updateHighWatermark is used when…FIXME
|
addAbortedTransactions Internal Method
addAbortedTransactions(
startOffset: Long,
segmentEntry: JEntry[JLong, LogSegment],
fetchInfo: FetchDataInfo): FetchDataInfo
addAbortedTransactions…FIXME
|
Note
|
addAbortedTransactions is used when Log is requested to read records.
|
collectAbortedTransactions Internal Method
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long): List[AbortedTxn]
collectAbortedTransactions(
startOffset: Long,
upperBoundOffset: Long,
startingSegmentEntry: JEntry[JLong, LogSegment],
accumulator: List[AbortedTxn] => Unit): Unit
collectAbortedTransactions…FIXME
|
Note
|
collectAbortedTransactions is used when Cleaner is requested to cleanSegments and buildOffsetMap.
|
maybeRoll Internal Method
maybeRoll(
messagesSize: Int,
appendInfo: LogAppendInfo): LogSegment
maybeRoll…FIXME
|
Note
|
maybeRoll is used exclusively when Log is requested to append records.
|
asyncDeleteSegment Internal Method
asyncDeleteSegment(segment: LogSegment): Unit
asyncDeleteSegment…FIXME
|
Note
|
asyncDeleteSegment is used when Log is requested to deleteSegment and replaceSegments.
|
Flushing Log Segments Out To Disk — flush Method
flush(): Unit (1)
flush(offset: Long): Unit
-
Uses logEndOffset for the offset (and so flushes all log segments)
flush prints out the following DEBUG message to the logs:
Flushing log up to offset [offset], last flushed: [lastFlushTime], current time: [time], unflushed: [unflushedMessages]
flush…FIXME
|
Note
|
|
deleteSeg Internal Method
deleteSeg(): Unit
deleteSeg…FIXME
|
Note
|
deleteSeg is used exclusively for the delete-file Background Task.
|
appendAsLeader Method
appendAsLeader(
records: MemoryRecords,
leaderEpoch: Int,
isFromClient: Boolean = true): LogAppendInfo
appendAsLeader simply appends the records with the assignOffsets flag on.
|
Note
|
appendAsLeader is used exclusively when Partition is requested to appendRecordsToLeader.
|
appendAsFollower Method
appendAsFollower(records: MemoryRecords): LogAppendInfo
appendAsFollower simply append (with the isFromClient and assignOffsets flags off, and the leaderEpoch being -1).
|
Note
|
appendAsFollower is used exclusively when Partition is requested to doAppendRecordsToFollowerOrFutureReplica.
|
Appending Records — append Internal Method
append(
records: MemoryRecords,
isFromClient: Boolean,
interBrokerProtocolVersion: ApiVersion,
assignOffsets: Boolean,
leaderEpoch: Int): LogAppendInfo
append…FIXME
|
Note
|
append is used when Log is requested to appendAsLeader (with assignOffsets enabled) and appendAsFollower (with assignOffsets and isFromClient disabled).
|
analyzeAndValidateRecords Internal Method
analyzeAndValidateRecords(
records: MemoryRecords,
isFromClient: Boolean): LogAppendInfo
analyzeAndValidateRecords…FIXME
|
Note
|
analyzeAndValidateRecords is used exclusively when Log is requested to append.
|
deleteSegment Internal Method
deleteSegment(segment: LogSegment): Unit
deleteSegment…FIXME
|
Note
|
deleteSegment is used when Log is requested to recoverLog, deleteSegments, roll, truncateTo, and truncateFullyAndStartAt.
|
replaceSegments Internal Method
replaceSegments(
newSegments: Seq[LogSegment],
oldSegments: Seq[LogSegment],
isRecoveredSwapFile: Boolean = false): Unit
replaceSegments…FIXME
|
Note
|
|
Checking Whether .kafka_cleanshutdown Is In Parent Directory of Log Directory — hasCleanShutdownFile Internal Method
hasCleanShutdownFile: Boolean
hasCleanShutdownFile is true when .kafka_cleanshutdown file is in the parent directory of the log directory. Otherwise, hasCleanShutdownFile is false.
|
Note
|
hasCleanShutdownFile is used exclusively when Log is created (to loadProducerState) and requested to recoverLog.
|
maybeIncrementLogStartOffset Method
maybeIncrementLogStartOffset(
newLogStartOffset: Long): Unit
maybeIncrementLogStartOffset…FIXME
|
Note
|
maybeIncrementLogStartOffset is used when…FIXME
|
truncateTo Internal Method
truncateTo(targetOffset: Long): Boolean
truncateTo…FIXME
|
Note
|
truncateTo is used when LogManager is requested to truncateTo.
|
truncateFullyAndStartAt Internal Method
truncateFullyAndStartAt(newOffset: Long): Unit
truncateFullyAndStartAt…FIXME
|
Note
|
|
Scheduling Deletion Of Old Segments (Log Retention) — deleteOldSegments Method
deleteOldSegments(): Long
deleteOldSegments uses the delete flag (of the given LogConfig) to determine the scope of log deletion and returns the number of segments deleted.
|
Note
|
delete flag is enabled (true) when delete cleanup policy is part of the cleanup.policy configuration property.
|
With the delete flag enabled (true), deleteOldSegments deleteRetentionMsBreachedSegments, deleteRetentionSizeBreachedSegments and deleteLogStartOffsetBreachedSegments.
With the delete flag disabled (false), deleteOldSegments merely deleteLogStartOffsetBreachedSegments.
|
Note
|
|
Scheduling Deletion Of Old Log Segments (Per Predicate) — deleteOldSegments Internal Method
deleteOldSegments(
predicate: (LogSegment, Option[LogSegment]) => Boolean,
reason: String): Int
deleteOldSegments finds deletable segments for the given predicate and schedules their deletion.
If found any, deleteOldSegments prints out the following INFO message to the logs:
Found deletable segments with base offsets [[baseOffsets]] due to [reason]
|
Note
|
deleteOldSegments is used when Log is requested to deleteRetentionMsBreachedSegments, deleteRetentionSizeBreachedSegments, and deleteLogStartOffsetBreachedSegments.
|
Finding Deletable Segments (Per Predicate) — deletableSegments Internal Method
deletableSegments(
predicate: (LogSegment, Option[LogSegment]) => Boolean): Iterable[LogSegment]
deletableSegments…FIXME
|
Note
|
deletableSegments is used exclusively when Log is requested to schedule deletion of old segments (per predicate).
|
deleteSegments Internal Method
deleteSegments(
deletable: Iterable[LogSegment]): Int
deleteSegments roll if the number of deletable LogSegments is exactly all the segments.
For every log segment, deleteSegments simply schedules it for deletion and maybeIncrementLogStartOffset (based on…FIXME).
|
Note
|
deleteSegments is used exclusively when Log is requested to schedule deletion of old segments (per predicate).
|
deleteRetentionMsBreachedSegments Internal Method
deleteRetentionMsBreachedSegments(): Int
deleteRetentionMsBreachedSegments uses the retentionMs threshold (of the given LogConfig) to determine the scope of log retention.
deleteRetentionMsBreachedSegments schedules deletion of segments with their largestTimestamp below the retentionMs threshold.
deleteRetentionMsBreachedSegments uses the following reason:
retention time [retentionMs]ms breach
deleteRetentionMsBreachedSegments simply returns 0 for a negative retention.ms threshold.
|
Note
|
deleteRetentionMsBreachedSegments is used exclusively when Log is requested to schedule deletion of old segments (log retention).
|
deleteRetentionSizeBreachedSegments Internal Method
deleteRetentionSizeBreachedSegments(): Int
deleteRetentionSizeBreachedSegments uses the retentionSize threshold (of the given LogConfig) to determine the scope of log retention.
|
Note
|
retentionSize threshold is the value of retention.bytes configuration for a topic or log.retention.bytes configuration for the cluster. |
With the retentionSize threshold negative (the default is -1L) or the size of the log below it, deleteRetentionSizeBreachedSegments simply exits with the return value of 0.
deleteRetentionSizeBreachedSegments schedules deletion of records in log segments (per their size) so the log size drops below the threshold.
deleteRetentionSizeBreachedSegments uses the following reason:
retention size in bytes [retentionSize] breach
|
Note
|
deleteRetentionSizeBreachedSegments is used exclusively when Log is requested to schedule deletion of old segments (log retention) (for delete cleanup policy).
|
deleteLogStartOffsetBreachedSegments Internal Method
deleteLogStartOffsetBreachedSegments(): Int
deleteLogStartOffsetBreachedSegments…FIXME
|
Note
|
deleteLogStartOffsetBreachedSegments is used when…FIXME
|
splitOverflowedSegment Internal Method
splitOverflowedSegment(
segment: LogSegment): List[LogSegment]
splitOverflowedSegment…FIXME
|
Note
|
|
onHighWatermarkIncremented Method
onHighWatermarkIncremented(highWatermark: Long): Unit
onHighWatermarkIncremented…FIXME
|
Note
|
onHighWatermarkIncremented is used when Replica is created and <<kafka-cluster-Replica.adoc#highWatermark_, highWatermark_⇒>.
|
parseTopicPartitionName Object Method
parseTopicPartitionName(dir: File): TopicPartition
parseTopicPartitionName parses the name of the given directory and creates a TopicPartition.
parseTopicPartitionName assumes that the name is of the form topic-partition or topic-partition.uniqueId-delete (if marked for deletion).
parseTopicPartitionName uses all characters up to the last - for the topic name and the rest as the partition ID.
|
Note
|
|
offsetFromFileName Object Method
offsetFromFileName(filename: String): Long
offsetFromFileName…FIXME
|
Note
|
offsetFromFileName is used when Log is requested to removeTempFilesAndCollectSwapFiles (right when created) and offsetFromFile.
|
offsetFromFile Object Method
offsetFromFile(file: File): Long
offsetFromFile…FIXME
|
Note
|
offsetFromFile is used when…FIXME
|
Reading Records — read Method
read(
startOffset: Long,
maxLength: Int,
maxOffset: Option[Long],
minOneMessage: Boolean,
includeAbortedTxns: Boolean): FetchDataInfo
read…FIXME
|
Note
|
|
convertToOffsetMetadata Method
convertToOffsetMetadata(
offset: Long): Option[LogOffsetMetadata]
convertToOffsetMetadata…FIXME
|
Note
|
convertToOffsetMetadata is used exclusively when Replica is requested to convertHWToLocalOffsetMetadata
|
logEndOffset Method
logEndOffset: Long
logEndOffset is the offset of the next message that will be appended to the log (based on the nextOffsetMetadata internal registry).
|
Note
|
logEndOffset is used when…FIXME
|
addSegment Method
addSegment(
segment: LogSegment): LogSegment
addSegment simply associates the given LogSegment with the baseOffset in the segments internal registry.
|
Note
|
addSegment is used when Log is requested to replaceSegments, loadSegmentFiles, loadSegments, recoverLog, roll, and truncateFullyAndStartAt.
|
updateLogEndOffset Internal Method
updateLogEndOffset(messageOffset: Long): Unit
updateLogEndOffset simply creates a new LogOffsetMetadata (with the messageOffset, active segment) and becomes the nextOffsetMetadata internal registry.
|
Note
|
updateLogEndOffset is used when Log is requested to append records, roll log segment, truncateTo, and truncateFullyAndStartAt.
|
activeSegment Method
activeSegment: LogSegment
activeSegment gives the active LogSegment that is currently taking appends (that is the greatest key in the segments internal registry).
|
Note
|
activeSegment is used exclusively when Log is created (to create a LogOffsetMetadata).
|
Updating Config — updateConfig Method
updateConfig(
updatedKeys: Set[String],
newConfig: LogConfig): Unit
If message.format.version is among the updated keys, updateConfig…FIXME
|
Note
|
|
renameDir Method
renameDir(
name: String): Unit
renameDir…FIXME
|
Note
|
renameDir is used when LogManager is requested to replaceCurrentWithFutureLog and asyncDelete.
|
Creating Log File — logFile Utility
logFile(
dir: File,
offset: Long,
suffix: String = ""): File
logFile creates a prefix of the file name of a log segment (for the given offset, the .log suffix and the optional suffix) in the given dir directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.logFile(dir, offset = 10, suffix = ".suffix")
assert(log_file.getName == "00000000000000000010.log.suffix")
|
Note
|
|
offsetIndexFile Method
offsetIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
offsetIndexFile creates a prefix of the file name of a log segment (for the given offset, the .index suffix and the optional suffix) in the given dir directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.offsetIndexFile(dir, offset = 10, suffix = ".suffix")
assert(log_file.getName == "00000000000000000010.index.suffix")
|
Note
|
|
timeIndexFile Method
timeIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
timeIndexFile creates a prefix of the file name of a log segment (for the given offset, the .timeindex suffix and the optional suffix) in the given dir directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.timeIndexFile(dir, offset = 10, suffix = ".suffix")
assert(log_file.getName == "00000000000000000010.timeindex.suffix")
|
Note
|
|
transactionIndexFile Method
transactionIndexFile(
dir: File,
offset: Long,
suffix: String = ""): File
transactionIndexFile creates a prefix of the file name of a log segment (for the given offset, the .txnindex suffix and the optional suffix) in the given dir directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.transactionIndexFile(dir, offset = 10, suffix = ".suffix")
assert(log_file.getName == "00000000000000000010.txnindex.suffix")
|
Note
|
|
producerSnapshotFile Method
producerSnapshotFile(
dir: File,
offset: Long): File
producerSnapshotFile creates a prefix of the file name of a log segment (for the given offset and the .snapshot suffix) in the given dir directory.
import java.nio.file.{Files, Path}
import java.io.File
val tmp = "/tmp/kafka-internals"
val p = Path.of(tmp)
val dir = if (Files.exists(p)) {
new File(tmp)
} else {
Files.createDirectory(p).toFile()
}
import kafka.log.Log
val log_file = Log.producerSnapshotFile(dir, offset = 10)
assert(log_file.getName == "00000000000000000010.snapshot")
|
Note
|
producerSnapshotFile is used exclusively when ProducerStateManager is requested to takeSnapshot.
|
Creating File Name Of Log Segment (From Offset) — filenamePrefixFromOffset Utility
filenamePrefixFromOffset(
offset: Long): String
filenamePrefixFromOffset uses java.text.NumberFormat to format the given offset:
-
Minimum number of digits: 20
-
Maximum number of digits in the fraction portion of a number: 0
-
No grouping used
import kafka.log.Log
val filenamePrefix = Log.filenamePrefixFromOffset(offset = 10)
assert(filenamePrefix == "00000000000000000010")
|
Note
|
filenamePrefixFromOffset is used when Log utility is used to create file names for logFile, offsetIndexFile, timeIndexFile, producerSnapshotFile, and transactionIndexFile.
|
maybeAssignEpochStartOffset Method
maybeAssignEpochStartOffset(
leaderEpoch: Int,
startOffset: Long): Unit
maybeAssignEpochStartOffset…FIXME
|
Note
|
maybeAssignEpochStartOffset is used when…FIXME
|
sizeInBytes Utility
sizeInBytes(
segments: Iterable[LogSegment]): Long
sizeInBytes sums up the sizes of the given log segments.
|
Note
|
sizeInBytes is used when Log is requested for the size.
|
Loading Segments — loadSegments Internal Method
loadSegments(): Long
loadSegments removeTempFilesAndCollectSwapFiles.
loadSegments loadSegmentFiles (with retries when there are log segments with offset overflow).
loadSegments completeSwapOperations.
loadSegments branches off per whether the log directory is scheduled to be deleted or not.
|
Note
|
loadSegments is used when Log is created (to create a LogOffsetMetadata).
|
loadSegments Internal Method and Log Directory Not Scheduled For Deletion
For the log directory that is not scheduled to be deleted, loadSegments recoverLog.
loadSegments requests the active segment to resizeIndexes (to the value of segment.index.bytes configuration property).
In the end, loadSegments returns the next offset after recovery.
loadSegments Internal Method and Log Directory Scheduled For Deletion
For the log directory that is scheduled to be deleted, loadSegments adds a new log segment (with base offset 0 and initFileSize).
In the end, loadSegments returns 0.
recoverLog Internal Method
recoverLog(): Long
recoverLog…FIXME
|
Note
|
recoverLog is used when Log is requested to loadSegments.
|
removeTempFilesAndCollectSwapFiles Internal Method
removeTempFilesAndCollectSwapFiles(): Set[File]
removeTempFilesAndCollectSwapFiles…FIXME
|
Note
|
removeTempFilesAndCollectSwapFiles is used exclusively when Log is requested to loadSegments (right when created).
|
Loading Segment Files — loadSegmentFiles Internal Method
loadSegmentFiles(): Unit
loadSegmentFiles processes index and log files in the log directory (by name in ascending order).
Internally, loadSegmentFiles finds all the files (sorted by name) in the log directory and branches off per whether a file is an index or a log file.
|
Note
|
loadSegmentFiles is used when Log is created (and in turn loadSegments).
|
Loading Log Files
For a log file, loadSegmentFiles opens it and requests sanityCheck.
In case of NoSuchFileException, loadSegmentFiles prints out the following ERROR to the logs and recovers the segment.
Could not find offset index file corresponding to log file [path], recovering segment and rebuilding index files...
In case of CorruptIndexException, loadSegmentFiles prints out the following ERROR to the logs and recovers the segment.
Found a corrupted index file corresponding to log file [path] due to [message], recovering segment and rebuilding index files...
In the end, loadSegmentFiles addSegment.
Loading Index Files
For index files, loadSegmentFiles simply makes sure that they have corresponding .log files (in the same log directory).
If an orphaned index file is found, loadSegmentFiles simply prints out the following WARN message and deletes the file:
Found an orphaned index file [path], with no corresponding log file.
isIndexFile Internal Object Method
isIndexFile(file: File): Boolean
isIndexFile is positive (true) when the given file has got one of the following file suffices:
Otherwise, isIndexFile is false.
|
Note
|
isIndexFile is used when Log is requested to removeTempFilesAndCollectSwapFiles and loadSegmentFiles.
|
isLogFile Internal Object Method
isLogFile(file: File): Boolean
isLogFile returns true when the given file has .log file suffix. Otherwise, isLogFile is false.
|
Note
|
isLogFile is used when Log is requested to removeTempFilesAndCollectSwapFiles, loadSegmentFiles, and splitOverflowedSegment.
|
Recovering Log Segment — recoverSegment Internal Method
recoverSegment(
segment: LogSegment,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
recoverSegment creates a new ProducerStateManager (for the TopicPartition, log directory and maxProducerIdExpirationMs).
|
Note
|
Why does recoverSegment create a new ProducerStateManager rather than using the ProducerStateManager?
|
recoverSegment then rebuildProducerState (with the baseOffset of the LogSegment, the reloadFromCleanShutdown flag off, and the new ProducerStateManager).
recoverSegment requests the given LogSegment to recover (with the new ProducerStateManager and the optional LeaderEpochFileCache).
recoverSegment requests the ProducerStateManager to takeSnapshot.
recoverSegment returns the number of bytes truncated from the log (while doing segment recovery).
|
Note
|
recoverSegment is used when Log is requested to loadSegmentFiles, completeSwapOperations, and recoverLog.
|
loadProducerState Internal Method
loadProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean): Unit
loadProducerState rebuildProducerState (with the lastOffset, reloadFromCleanShutdown and the ProducerStateManager).
In the end, loadProducerState updateFirstUnstableOffset.
|
Note
|
loadProducerState is used when Log is created and requested to truncateTo.
|
rebuildProducerState Internal Method
rebuildProducerState(
lastOffset: Long,
reloadFromCleanShutdown: Boolean,
producerStateManager: ProducerStateManager): Unit
rebuildProducerState…FIXME
|
Note
|
rebuildProducerState is used when Log is requested to recoverSegment and loadProducerState.
|
updateFirstUnstableOffset Internal Method
updateFirstUnstableOffset(): Unit
updateFirstUnstableOffset…FIXME
|
Note
|
updateFirstUnstableOffset is used when Log is requested to loadProducerState, append, onHighWatermarkIncremented, maybeIncrementLogStartOffset, and truncateFullyAndStartAt.
|
completeSwapOperations Internal Method
completeSwapOperations(
swapFiles: Set[File]): Unit
completeSwapOperations…FIXME
|
Note
|
completeSwapOperations is used when Log is created (and in turn loadSegments).
|
retryOnOffsetOverflow Internal Method
retryOnOffsetOverflow[T](fn: => T): T
retryOnOffsetOverflow executes the fn block and returns the result.
In case of LogSegmentOffsetOverflowException, retryOnOffsetOverflow prints out the following INFO message to the logs, splitOverflowedSegment and retries execution of the fn block.
Caught segment overflow error: [message]. Split segment and retry.
|
Note
|
retryOnOffsetOverflow is used exclusively when Log is requested to loadSegments.
|
initializeLeaderEpochCache Internal Method
initializeLeaderEpochCache(): Unit
initializeLeaderEpochCache…FIXME
|
Note
|
initializeLeaderEpochCache is used when Log is created and later requested to updateConfig and renameDir.
|
Internal Properties
| Name | Description |
|---|---|
|
Used when:
|