Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 16 additions & 6 deletions compaction.go
Original file line number Diff line number Diff line change
Expand Up @@ -1497,6 +1497,22 @@ func (d *DB) runCompaction(
limit = nil
}

if tw != nil {
if d.opts.TablePartitioner != nil {
lastUserKey := tw.LastUserKey()
if d.opts.TablePartitioner(lastUserKey, key.UserKey) {
limit = key.UserKey
break
}
}
if tw.EstimatedSize() >= c.maxOutputFileSize {
// Use the next key as the sstable boundary. Note that we already
// checked this key against the grandparent limit above.
limit = key.UserKey
break
}
}

atomic.StoreUint64(c.atomicBytesIterated, c.bytesIterated)
if err := pacer.maybeThrottle(c.bytesIterated); err != nil {
return nil, pendingOutputs, err
Expand All @@ -1509,12 +1525,6 @@ func (d *DB) runCompaction(
c.rangeDelFrag.Add(iter.cloneKey(*key), val)
continue
}
if tw != nil && tw.EstimatedSize() >= c.maxOutputFileSize {
// Use the next key as the sstable boundary. Note that we already
// checked this key against the grandparent limit above.
limit = key.UserKey
break
}
if tw == nil {
if err := newOutput(); err != nil {
return nil, pendingOutputs, err
Expand Down
39 changes: 21 additions & 18 deletions internal/manifest/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,7 +471,7 @@ func CheckOrdering(cmp Compare, format base.Formatter, level int, files []*FileM
// of 0 in RocksDB. We handle this case for compatibility with RocksDB.

// The largest sequence number of a flushed file. Increasing.
var largestFlushedSeqNum uint64
// var largestFlushedSeqNum uint64

// The largest sequence number of any file. Increasing.
var largestSeqNum uint64
Expand Down Expand Up @@ -509,23 +509,26 @@ func CheckOrdering(cmp Compare, format base.Formatter, level int, files []*FileM
// Ingested file.
uncheckedIngestedSeqNums = append(uncheckedIngestedSeqNums, f.LargestSeqNum)
} else {
// Flushed file.
// Two flushed files cannot overlap.
if largestFlushedSeqNum > 0 && f.SmallestSeqNum <= largestFlushedSeqNum {
return fmt.Errorf("L0 flushed file %06d overlaps with the largest seqnum of a "+
"preceding flushed file: %d-%d vs %d", f.FileNum, f.SmallestSeqNum, f.LargestSeqNum,
largestFlushedSeqNum)
}
largestFlushedSeqNum = f.LargestSeqNum
// Check that unchecked ingested sequence numbers are not coincident with f.SmallestSeqNum.
// We do not need to check that they are not coincident with f.LargestSeqNum because we
// have already confirmed that LargestSeqNums were increasing.
for _, seq := range uncheckedIngestedSeqNums {
if seq == f.SmallestSeqNum {
return fmt.Errorf("L0 flushed file %06d has an ingested file coincident with "+
"smallest seqnum: %d-%d", f.FileNum, f.SmallestSeqNum, f.LargestSeqNum)
}
}
// TODO(peter): with user-defined table partitioning, a flush can
// create 2 sstables with overlaps in the seqnum space.

// // Flushed file.
// // Two flushed files cannot overlap.
// if largestFlushedSeqNum > 0 && f.SmallestSeqNum <= largestFlushedSeqNum {
// return fmt.Errorf("L0 flushed file %06d overlaps with the largest seqnum of a "+
// "preceding flushed file: %d-%d vs %d", f.FileNum, f.SmallestSeqNum, f.LargestSeqNum,
// largestFlushedSeqNum)
// }
// largestFlushedSeqNum = f.LargestSeqNum
// // Check that unchecked ingested sequence numbers are not coincident with f.SmallestSeqNum.
// // We do not need to check that they are not coincident with f.LargestSeqNum because we
// // have already confirmed that LargestSeqNums were increasing.
// for _, seq := range uncheckedIngestedSeqNums {
// if seq == f.SmallestSeqNum {
// return fmt.Errorf("L0 flushed file %06d has an ingested file coincident with "+
// "smallest seqnum: %d-%d", f.FileNum, f.SmallestSeqNum, f.LargestSeqNum)
// }
// }
uncheckedIngestedSeqNums = uncheckedIngestedSeqNums[:0]
}
}
Expand Down
12 changes: 12 additions & 0 deletions options.go
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,14 @@ const (
TableFormatLevelDB = sstable.TableFormatLevelDB
)

// TablePartitioner provides a hook for partitioning sstables at the boundary
// between two keys during flushing and compaction. The partitioner is provided
// the last key added to the sstable and the current key being considered for
// addition. A return value of true will cause output to the current sstable to
// be completed and a new sstable to be created to hold curKey and subsequent
// keys.
type TablePartitioner func(lastKey, curKey []byte) bool

// TablePropertyCollector exports the base.TablePropertyCollector type.
type TablePropertyCollector = sstable.TablePropertyCollector

Expand Down Expand Up @@ -371,6 +379,10 @@ type Options struct {
// by a wider range of tools and libraries.
TableFormat TableFormat

// TablePartitioner is a hook to allow user control of required partitions
// between sstables.
TablePartitioner TablePartitioner

// TablePropertyCollectors is a list of TablePropertyCollector creation
// functions. A new TablePropertyCollector is created for each sstable built
// and lives for the lifetime of the table.
Expand Down
9 changes: 8 additions & 1 deletion sstable/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -678,6 +678,13 @@ func (w *Writer) Metadata() (*WriterMetadata, error) {
return &w.meta, nil
}

// LastUserKey returns the last point user key added to the table, or nil if no
// point record has been added to the table.
func (w *Writer) LastUserKey() []byte {
prevKey := base.DecodeInternalKey(w.block.curKey)
return prevKey.UserKey
}

// WriterOption provide an interface to do work on Writer while it is being
// opened.
type WriterOption interface {
Expand All @@ -689,7 +696,7 @@ type WriterOption interface {
// internalTableOpt is a WriterOption that sets properties for sstables being
// created by the db itself (i.e. through flushes and compactions), as opposed
// to those meant for ingestion.
type internalTableOpt struct {}
type internalTableOpt struct{}

func (i internalTableOpt) writerApply(w *Writer) {
// Set the external sst version to 0. This is what RocksDB expects for
Expand Down