$ tree /tmp/kafka-logs/t1-1/
/tmp/kafka-logs/t1-1/
├── 00000000000000000000.index
├── 00000000000000000000.log
├── 00000000000000000000.timeindex
└── leader-epoch-checkpoint
LogSegment — Record Segment Of Partition Log
|
Tip
|
Use DumpLogSegments tool to review the content of (the underlying files of) a log segment. |
LogSegment is composed of two main file types, e.g. the log file itself (with records) and index files.
|
Note
|
When Log is created (and requested to loadSegmentFiles) orphaned index files are removed. |
The files are all in the same directory (as specified when opening a segment).
LogSegment is created (indirectly using LogSegment.open utility) when:
-
Logis requested to loadSegmentFiles (for every log file), completeSwapOperations, loadSegments, recoverLog, roll, and truncateFullyAndStartAt -
LogCleaneris requested to createNewCleanedSegment
// One of log.dirs directories
import java.nio.file.Path
val dir = Path.of("/tmp/logsegment")
import java.nio.file.Files
Files.createDirectories(dir)
import kafka.log.{LogConfig, LogSegment}
import org.apache.kafka.common.utils.Time
val config = LogConfig()
val segment = LogSegment.open(dir.toFile, baseOffset = 0, config, time = Time.SYSTEM)
scala> println(segment)
LogSegment(baseOffset=0, size=0, lastModifiedTime=1576677605692, largestTime=0)
LogSegment uses the following configuration properties (while opening a log segment):
Creating LogSegment Instance
LogSegment takes the following to be created:
-
Lazy OffsetIndex with deferred loading (
LazyIndex[OffsetIndex]) -
Lazy TimeIndex with deferred loading (
LazyIndex[TimeIndex])
LogSegment initializes the internal properties.
Opening Log Segment — open Utility
open(
dir: File,
baseOffset: Long,
config: LogConfig,
time: Time,
fileAlreadyExists: Boolean = false,
initFileSize: Int = 0,
preallocate: Boolean = false,
fileSuffix: String = ""): LogSegment
open uses the following configuration properties (of the given LogConfig):
-
segment.index.bytes for the
maxIndexSize
open creates a new LogSegment with the following files in the given dir directory:
-
Creates a log file (with the given
baseOffset, andfileSuffix) and opens it (usingFileRecords.open)
|
Note
|
|
LogSegment and TransactionIndex
txnIndex: TransactionIndex
When created, LogSegment is given a TransactionIndex.
LogSegment uses the TransactionIndex for the following:
txnIndex is also used when Cleaner is requested to clean a log (and in turn cleanSegments)
TransactionIndex is closed when LogSegment is requested to close and closeHandlers.
TransactionIndex is deleted (if exists) when LogSegment is requested to deleteIfExists.
LogSegment and OffsetIndex
offsetIndex: OffsetIndex
When created, LogSegment is given an OffsetIndex with deferred loading (LazyIndex[OffsetIndex]).
offsetIndex simply gets (unwraps) the OffsetIndex.
offsetIndex is used for the following…FIXME
collectAbortedTxns Method
collectAbortedTxns(
fetchOffset: Long,
upperBoundOffset: Long): TxnIndexSearchResult
collectAbortedTxns…FIXME
|
Note
|
collectAbortedTxns is used when Log is requested to collectAbortedTransactions.
|
Closing — close Method
close(): Unit
close…FIXME
|
Note
|
close is used when Log is requested to load segments, close, and [kafka-log-Log#splitOverflowedSegment splitOverflowedSegment].
|
closeHandlers Method
closeHandlers(): Unit
closeHandlers…FIXME
|
Note
|
closeHandlers is used when Log is requested to closeHandlers.
|
Recovering — recover Method
recover(
producerStateManager: ProducerStateManager,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
recover requests the OffsetIndex, the TimeIndex, and the TransactionIndex to reset.
|
Note
|
recover is used when Log is requested to recover a log segment.
|
updateProducerState Internal Method
updateProducerState(
producerStateManager: ProducerStateManager,
batch: RecordBatch): Unit
updateProducerState…FIXME
|
Note
|
updateProducerState is used when LogSegment is requested to recover.
|
sanityCheck Method
sanityCheck(
timeIndexFileNewlyCreated: Boolean): Unit
sanityCheck…FIXME
|
Note
|
sanityCheck is used exclusively when Log is requested to loadSegments (when created).
|
updateDir Method
updateDir(
dir: File): Unit
updateDir…FIXME
|
Note
|
updateDir is used exclusively when Log is requested to renameDir.
|
changeFileSuffixes Method
changeFileSuffixes(
oldSuffix: String,
newSuffix: String): Unit
changeFileSuffixes…FIXME
|
Note
|
changeFileSuffixes is used when Log is requested to asyncDeleteSegment and replaceSegments.
|
flush Method
flush(): Unit
flush…FIXME
|
Note
|
|
deleteIfExists Method
deleteIfExists(): Unit
deleteIfExists…FIXME
|
Note
|
deleteIfExists is used when…FIXME
|
deleteIfExists Utility
deleteIfExists(
dir: File,
baseOffset: Long,
fileSuffix: String = ""): Unit
deleteIfExists…FIXME
|
Note
|
deleteIfExists is used when…FIXME
|
resizeIndexes Method
resizeIndexes(
size: Int): Unit
resizeIndexes…FIXME
|
Note
|
resizeIndexes is used when…FIXME
|
largestTimestamp Method
largestTimestamp: Long
largestTimestamp…FIXME
|
Note
|
largestTimestamp is used when…FIXME
|
shouldRoll Method
shouldRoll(
rollParams: RollParams): Boolean
shouldRoll…FIXME
|
Note
|
shouldRoll is used exclusively when Log is requested to maybeRoll (while appending records).
|
timeWaitedForRoll Method
timeWaitedForRoll(
now: Long,
messageTimestamp: Long) : Long
timeWaitedForRoll…FIXME
|
Note
|
timeWaitedForRoll is used exclusively when LogSegment is requested to shouldRoll.
|
append Method
append(
largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit
append…FIXME
|
Note
|
|
appendFromFile Method
appendFromFile(
records: FileRecords,
start: Int): Int
appendFromFile…FIXME
|
Note
|
appendFromFile is used exclusively when Log is requested to splitOverflowedSegment.
|
appendChunkFromFile Internal Method
appendChunkFromFile(
records: FileRecords,
position: Int,
bufferSupplier: BufferSupplier): Int
appendChunkFromFile…FIXME
|
Note
|
appendChunkFromFile is used when LogSegment is requested to appendFromFile.
|
Truncating To Offset — truncateTo Method
truncateTo(
offset: Long): Int
truncateTo…FIXME
|
Note
|
truncateTo is used when Log is created (and in turn recoverLog) and truncateTo.
|
updateTxnIndex Method
updateTxnIndex(
completedTxn: CompletedTxn,
lastStableOffset: Long): Unit
updateTxnIndex…FIXME
|
Note
|
|
size Method
size: Int
size requests the FileRecords for the size (in bytes).
|
Note
|
|
Internal Properties
| Name | Description |
|---|---|
|
Time(stamp) when this Used exclusively when |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |