$ tree /tmp/kafka-logs/t1-1/
/tmp/kafka-logs/t1-1/
├── 00000000000000000000.index
├── 00000000000000000000.log
├── 00000000000000000000.timeindex
└── leader-epoch-checkpoint
LogSegment — Record Segment Of Partition Log
Tip
|
Use DumpLogSegments tool to review the content of (the underlying files of) a log segment. |
LogSegment
is composed of two main file types, e.g. the log file itself (with records) and index files.
Note
|
When Log is created (and requested to loadSegmentFiles) orphaned index files are removed. |
The files are all in the same directory (as specified when opening a segment).
LogSegment
is created (indirectly using LogSegment.open utility) when:
-
Log
is requested to loadSegmentFiles (for every log file), completeSwapOperations, loadSegments, recoverLog, roll, and truncateFullyAndStartAt -
LogCleaner
is requested to createNewCleanedSegment
// One of log.dirs directories
import java.nio.file.Path
val dir = Path.of("/tmp/logsegment")
import java.nio.file.Files
Files.createDirectories(dir)
import kafka.log.{LogConfig, LogSegment}
import org.apache.kafka.common.utils.Time
val config = LogConfig()
val segment = LogSegment.open(dir.toFile, baseOffset = 0, config, time = Time.SYSTEM)
scala> println(segment)
LogSegment(baseOffset=0, size=0, lastModifiedTime=1576677605692, largestTime=0)
LogSegment
uses the following configuration properties (while opening a log segment):
Creating LogSegment Instance
LogSegment
takes the following to be created:
-
Lazy OffsetIndex with deferred loading (
LazyIndex[OffsetIndex]
) -
Lazy TimeIndex with deferred loading (
LazyIndex[TimeIndex]
)
LogSegment
initializes the internal properties.
Opening Log Segment — open
Utility
open(
dir: File,
baseOffset: Long,
config: LogConfig,
time: Time,
fileAlreadyExists: Boolean = false,
initFileSize: Int = 0,
preallocate: Boolean = false,
fileSuffix: String = ""): LogSegment
open
uses the following configuration properties (of the given LogConfig):
-
segment.index.bytes for the
maxIndexSize
open
creates a new LogSegment with the following files in the given dir
directory:
-
Creates a log file (with the given
baseOffset
, andfileSuffix
) and opens it (usingFileRecords.open
)
Note
|
|
LogSegment and TransactionIndex
txnIndex: TransactionIndex
When created, LogSegment
is given a TransactionIndex.
LogSegment
uses the TransactionIndex
for the following:
txnIndex
is also used when Cleaner
is requested to clean a log (and in turn cleanSegments)
TransactionIndex
is closed when LogSegment
is requested to close and closeHandlers.
TransactionIndex
is deleted (if exists) when LogSegment
is requested to deleteIfExists.
LogSegment and OffsetIndex
offsetIndex: OffsetIndex
When created, LogSegment
is given an OffsetIndex with deferred loading (LazyIndex[OffsetIndex]
).
offsetIndex
simply gets (unwraps) the OffsetIndex.
offsetIndex
is used for the following…FIXME
collectAbortedTxns
Method
collectAbortedTxns(
fetchOffset: Long,
upperBoundOffset: Long): TxnIndexSearchResult
collectAbortedTxns
…FIXME
Note
|
collectAbortedTxns is used when Log is requested to collectAbortedTransactions.
|
Closing — close
Method
close(): Unit
close
…FIXME
Note
|
close is used when Log is requested to load segments, close, and [kafka-log-Log#splitOverflowedSegment splitOverflowedSegment].
|
closeHandlers
Method
closeHandlers(): Unit
closeHandlers
…FIXME
Note
|
closeHandlers is used when Log is requested to closeHandlers.
|
Recovering — recover
Method
recover(
producerStateManager: ProducerStateManager,
leaderEpochCache: Option[LeaderEpochFileCache] = None): Int
recover
requests the OffsetIndex, the TimeIndex, and the TransactionIndex to reset.
Note
|
recover is used when Log is requested to recover a log segment.
|
updateProducerState
Internal Method
updateProducerState(
producerStateManager: ProducerStateManager,
batch: RecordBatch): Unit
updateProducerState
…FIXME
Note
|
updateProducerState is used when LogSegment is requested to recover.
|
sanityCheck
Method
sanityCheck(
timeIndexFileNewlyCreated: Boolean): Unit
sanityCheck
…FIXME
Note
|
sanityCheck is used exclusively when Log is requested to loadSegments (when created).
|
updateDir
Method
updateDir(
dir: File): Unit
updateDir
…FIXME
Note
|
updateDir is used exclusively when Log is requested to renameDir.
|
changeFileSuffixes
Method
changeFileSuffixes(
oldSuffix: String,
newSuffix: String): Unit
changeFileSuffixes
…FIXME
Note
|
changeFileSuffixes is used when Log is requested to asyncDeleteSegment and replaceSegments.
|
flush
Method
flush(): Unit
flush
…FIXME
Note
|
|
deleteIfExists
Method
deleteIfExists(): Unit
deleteIfExists
…FIXME
Note
|
deleteIfExists is used when…FIXME
|
deleteIfExists
Utility
deleteIfExists(
dir: File,
baseOffset: Long,
fileSuffix: String = ""): Unit
deleteIfExists
…FIXME
Note
|
deleteIfExists is used when…FIXME
|
resizeIndexes
Method
resizeIndexes(
size: Int): Unit
resizeIndexes
…FIXME
Note
|
resizeIndexes is used when…FIXME
|
largestTimestamp
Method
largestTimestamp: Long
largestTimestamp
…FIXME
Note
|
largestTimestamp is used when…FIXME
|
shouldRoll
Method
shouldRoll(
rollParams: RollParams): Boolean
shouldRoll
…FIXME
Note
|
shouldRoll is used exclusively when Log is requested to maybeRoll (while appending records).
|
timeWaitedForRoll
Method
timeWaitedForRoll(
now: Long,
messageTimestamp: Long) : Long
timeWaitedForRoll
…FIXME
Note
|
timeWaitedForRoll is used exclusively when LogSegment is requested to shouldRoll.
|
append
Method
append(
largestOffset: Long,
largestTimestamp: Long,
shallowOffsetOfMaxTimestamp: Long,
records: MemoryRecords): Unit
append
…FIXME
Note
|
|
appendFromFile
Method
appendFromFile(
records: FileRecords,
start: Int): Int
appendFromFile
…FIXME
Note
|
appendFromFile is used exclusively when Log is requested to splitOverflowedSegment.
|
appendChunkFromFile
Internal Method
appendChunkFromFile(
records: FileRecords,
position: Int,
bufferSupplier: BufferSupplier): Int
appendChunkFromFile
…FIXME
Note
|
appendChunkFromFile is used when LogSegment is requested to appendFromFile.
|
Truncating To Offset — truncateTo
Method
truncateTo(
offset: Long): Int
truncateTo
…FIXME
Note
|
truncateTo is used when Log is created (and in turn recoverLog) and truncateTo.
|
updateTxnIndex
Method
updateTxnIndex(
completedTxn: CompletedTxn,
lastStableOffset: Long): Unit
updateTxnIndex
…FIXME
Note
|
|
size
Method
size: Int
size
requests the FileRecords for the size (in bytes).
Note
|
|
Internal Properties
Name | Description |
---|---|
|
Time(stamp) when this Used exclusively when |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |
|
Used when…FIXME |