Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/log/AbstractIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -36,8 +36,8 @@ import org.apache.kafka.common.utils.{ByteBufferUnmapper, OperatingSystem, Utils
* @param baseOffset the base offset of the segment that this index is corresponding to.
* @param maxIndexSize The maximum index size in bytes.
*/
abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Long,
val maxIndexSize: Int = -1, val writable: Boolean) extends Closeable {
abstract class AbstractIndex(@volatile var file: File, val baseOffset: Long, val maxIndexSize: Int = -1,
val writable: Boolean) extends Closeable {
import AbstractIndex._

// Length of the index file
Expand Down Expand Up @@ -242,7 +242,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
/**
* The number of bytes actually used by this index
*/
def sizeInBytes = entrySize * _entries
def sizeInBytes: Int = entrySize * _entries

/** Close the index */
def close(): Unit = {
Expand Down Expand Up @@ -425,7 +425,7 @@ abstract class AbstractIndex[K, V](@volatile var file: File, val baseOffset: Lon
}

object AbstractIndex extends Logging {
override val loggerName: String = classOf[AbstractIndex[_, _]].getName
override val loggerName: String = classOf[AbstractIndex].getName
}

object IndexSearchType extends Enumeration {
Expand Down
93 changes: 0 additions & 93 deletions core/src/main/scala/kafka/log/AbstractLazyIndex.scala

This file was deleted.

85 changes: 85 additions & 0 deletions core/src/main/scala/kafka/log/LazyIndex.scala
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package kafka.log

import java.io.File
import java.util.concurrent.locks.ReentrantLock

import LazyIndex._
import kafka.utils.CoreUtils.inLock

/**
* A wrapper over an `AbstractIndex` instance that provides a mechanism to defer loading (i.e. memory mapping) the
* underlying index until it is accessed for the first time via the `get` method.
*
* This is an important optimization with regards to broker start-up time if it has a large number of segments.
*
* @param loadIndex A function that takes a `File` pointing to an index and returns a loaded `AbstractIndex` instance.
*/
class LazyIndex[T <: AbstractIndex] private (@volatile private var indexWrapper: IndexWrapper, loadIndex: File => T) {

private val lock = new ReentrantLock()

def file: File = indexWrapper.file

def file_=(f: File): Unit = {
inLock(lock) {
indexWrapper.file = f
}
}

def get: T = {
indexWrapper match {
case indexValue: IndexValue[T] => indexValue.index
case _: IndexFile =>
inLock(lock) {
indexWrapper match {
case indexValue: IndexValue[T] => indexValue.index
case indexFile: IndexFile =>
val indexValue = new IndexValue(loadIndex(indexFile.file))
indexWrapper = indexValue
indexValue.index
}
}
}
}

}

object LazyIndex {

def forOffset(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true): LazyIndex[OffsetIndex] =
new LazyIndex(new IndexFile(file), file => new OffsetIndex(file, baseOffset, maxIndexSize, writable))

def forTime(file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true): LazyIndex[TimeIndex] =
new LazyIndex(new IndexFile(file), file => new TimeIndex(file, baseOffset, maxIndexSize, writable))

private sealed trait IndexWrapper {
def file: File
def file_=(f: File)
}

private class IndexFile(@volatile var file: File) extends IndexWrapper

private class IndexValue[T <: AbstractIndex](val index: T) extends IndexWrapper {
override def file: File = index.file
override def file_=(f: File): Unit = index.file = f
}

}

8 changes: 4 additions & 4 deletions core/src/main/scala/kafka/log/LogSegment.scala
Original file line number Diff line number Diff line change
Expand Up @@ -54,8 +54,8 @@ import scala.math._
*/
@nonthreadsafe
class LogSegment private[log] (val log: FileRecords,
val lazyOffsetIndex: LazyOffsetIndex,
val lazyTimeIndex: LazyTimeIndex,
val lazyOffsetIndex: LazyIndex[OffsetIndex],
val lazyTimeIndex: LazyIndex[TimeIndex],
val txnIndex: TransactionIndex,
val baseOffset: Long,
val indexIntervalBytes: Int,
Expand Down Expand Up @@ -655,8 +655,8 @@ object LogSegment {
val maxIndexSize = config.maxIndexSize
new LogSegment(
FileRecords.open(Log.logFile(dir, baseOffset, fileSuffix), fileAlreadyExists, initFileSize, preallocate),
new LazyOffsetIndex(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
new LazyTimeIndex(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
LazyIndex.forOffset(Log.offsetIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
LazyIndex.forTime(Log.timeIndexFile(dir, baseOffset, fileSuffix), baseOffset = baseOffset, maxIndexSize = maxIndexSize),
new TransactionIndex(baseOffset, Log.transactionIndexFile(dir, baseOffset, fileSuffix)),
baseOffset,
indexIntervalBytes = config.indexInterval,
Expand Down
21 changes: 1 addition & 20 deletions core/src/main/scala/kafka/log/OffsetIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.kafka.common.errors.InvalidOffsetException
*/
// Avoid shadowing mutable `file` in AbstractIndex
class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
extends AbstractIndex[Long, Int](_file, baseOffset, maxIndexSize, writable) {
extends AbstractIndex(_file, baseOffset, maxIndexSize, writable) {
import OffsetIndex._

override def entrySize = 8
Expand Down Expand Up @@ -205,22 +205,3 @@ class OffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writabl
object OffsetIndex extends Logging {
override val loggerName: String = classOf[OffsetIndex].getName
}



/**
* A thin wrapper on top of the raw OffsetIndex object to avoid initialization on construction. This defers the OffsetIndex
* initialization to the time it gets accessed so the cost of the heavy memory mapped operation gets amortized over time.
*
* Combining with skipping sanity check for safely flushed segments, the startup time of a broker can be reduced, especially
* for the the broker with a lot of log segments
*
*/
class LazyOffsetIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
extends AbstractLazyIndex[OffsetIndex](_file) {

override protected def loadIndex(indexFile: File): OffsetIndex = {
new OffsetIndex(indexFile, baseOffset, maxIndexSize, writable)
}

}
21 changes: 1 addition & 20 deletions core/src/main/scala/kafka/log/TimeIndex.scala
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ import org.apache.kafka.common.record.RecordBatch
*/
// Avoid shadowing mutable file in AbstractIndex
class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
extends AbstractIndex[Long, Long](_file, baseOffset, maxIndexSize, writable) {
extends AbstractIndex(_file, baseOffset, maxIndexSize, writable) {
import TimeIndex._

@volatile private var _lastEntry = lastEntryFromIndexFile
Expand Down Expand Up @@ -227,22 +227,3 @@ class TimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable:
object TimeIndex extends Logging {
override val loggerName: String = classOf[TimeIndex].getName
}



/**
* A thin wrapper on top of the raw TimeIndex object to avoid initialization on construction. This defers the TimeIndex
* initialization to the time it gets accessed so the cost of the heavy memory mapped operation gets amortized over time.
*
* Combining with skipping sanity check for safely flushed segments, the startup time of a broker can be reduced, especially
* for the the broker with a lot of log segments
*
*/
class LazyTimeIndex(_file: File, baseOffset: Long, maxIndexSize: Int = -1, writable: Boolean = true)
extends AbstractLazyIndex[TimeIndex](_file) {

override protected def loadIndex(indexFile: File): TimeIndex = {
new TimeIndex(indexFile, baseOffset, maxIndexSize, writable)
}

}
4 changes: 2 additions & 2 deletions core/src/test/scala/unit/kafka/log/LogUtils.scala
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,8 @@ object LogUtils {
indexIntervalBytes: Int = 10,
time: Time = Time.SYSTEM): LogSegment = {
val ms = FileRecords.open(Log.logFile(logDir, offset))
val idx = new LazyOffsetIndex(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
val timeIdx = new LazyTimeIndex(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
val idx = LazyIndex.forOffset(Log.offsetIndexFile(logDir, offset), offset, maxIndexSize = 1000)
val timeIdx = LazyIndex.forTime(Log.timeIndexFile(logDir, offset), offset, maxIndexSize = 1500)
val txnIndex = new TransactionIndex(offset, Log.transactionIndexFile(logDir, offset))

new LogSegment(ms, idx, timeIdx, txnIndex, offset, indexIntervalBytes, 0, time)
Expand Down