Skip to content

Commit ae99f4f

Browse files
committed
*: start reading range keys from sstables
The previous commit, which enabled flushing and compactions of range keys, now makes the in-memory range key arena obsolete for reading range keys. This change sets up iterators to read range keys from sstable range key blocks, and makes many (far too many) minor fixes in other range key code to make that happen correctly.
1 parent d72083d commit ae99f4f

File tree

18 files changed

+280
-202
lines changed

18 files changed

+280
-202
lines changed

compaction.go

Lines changed: 36 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -1009,7 +1009,7 @@ func (c *compaction) elideRangeKey(start, end []byte) bool {
10091009

10101010
// newInputIter returns an iterator over all the input tables in a compaction.
10111011
func (c *compaction) newInputIter(
1012-
newIters tableNewIters, newSpanIter keyspan.TableNewSpanIter, snapshots []uint64,
1012+
newIters tableNewIters, newRangeKeyIter keyspan.TableNewSpanIter, snapshots []uint64,
10131013
) (_ internalIterator, retErr error) {
10141014
var rangeDelIters []keyspan.FragmentIterator
10151015
var rangeKeyIters []keyspan.FragmentIterator
@@ -1198,7 +1198,28 @@ func (c *compaction) newInputIter(
11981198
}
11991199
if hasRangeKeys {
12001200
li := &keyspan.LevelIter{}
1201-
li.Init(keyspan.SpanIterOptions{}, c.cmp, newSpanIter, level.files.Iter(), l, c.logger, manifest.KeyTypeRange)
1201+
newRangeKeyIterWrapper := func(file *manifest.FileMetadata, iterOptions *keyspan.SpanIterOptions) (keyspan.FragmentIterator, error) {
1202+
iter, err := newRangeKeyIter(file, iterOptions)
1203+
if iter != nil {
1204+
// Ensure that the range key iter is not closed until the compaction is
1205+
// finished. This is necessary because range key processing
1206+
// requires the range keys to be held in memory for up to the
1207+
// lifetime of the compaction.
1208+
c.closers = append(c.closers, iter)
1209+
iter = noCloseIter{iter}
1210+
1211+
// We do not need to truncate range keys to sstable boundaries, or
1212+
// only read within the file's atomic compaction units, unlike with
1213+
// range tombstones. This is because range keys were added after we
1214+
// stopped splitting user keys across sstables, so all the range keys
1215+
// in this sstable must wholly lie within the file's bounds.
1216+
}
1217+
if iter == nil {
1218+
iter = emptyKeyspanIter
1219+
}
1220+
return iter, err
1221+
}
1222+
li.Init(keyspan.SpanIterOptions{}, c.cmp, newRangeKeyIterWrapper, level.files.Iter(), l, c.logger, manifest.KeyTypeRange)
12021223
rangeKeyIters = append(rangeKeyIters, li)
12031224
}
12041225
return nil
@@ -1645,11 +1666,7 @@ func (d *DB) flush1() (bytesFlushed uint64, err error) {
16451666
if err == nil {
16461667
flushed = d.mu.mem.queue[:n]
16471668
d.mu.mem.queue = d.mu.mem.queue[n:]
1648-
d.updateReadStateLocked(d.opts.DebugCheck, func() {
1649-
// TODO(jackson): Remove this, plus this updateReadStateLocked
1650-
// parameter when range keys are persisted to sstables.
1651-
err = d.applyFlushedRangeKeys(flushed)
1652-
})
1669+
d.updateReadStateLocked(d.opts.DebugCheck)
16531670
d.updateTableStatsLocked(ve.NewFiles)
16541671
}
16551672
// Signal FlushEnd after installing the new readState. This helps for unit
@@ -1974,6 +1991,11 @@ func checkDeleteCompactionHints(
19741991
if m.Compacting || !h.canDelete(cmp, m, snapshots) || files[m] {
19751992
continue
19761993
}
1994+
if m.HasRangeKeys {
1995+
// TODO(bilal): Remove this conditional when deletion hints work well
1996+
// with sstables containing range keys.
1997+
continue
1998+
}
19771999

19782000
if files == nil {
19792001
// Construct files lazily, assuming most calls will not
@@ -2080,7 +2102,7 @@ func (d *DB) compact1(c *compaction, errChannel chan error) (err error) {
20802102
// there are no references obsolete tables will be added to the obsolete
20812103
// table list.
20822104
if err == nil {
2083-
d.updateReadStateLocked(d.opts.DebugCheck, nil)
2105+
d.updateReadStateLocked(d.opts.DebugCheck)
20842106
d.updateTableStatsLocked(ve.NewFiles)
20852107
}
20862108
d.deleteObsoleteFiles(jobID, true /* waitForOngoing */)
@@ -2329,6 +2351,12 @@ func (d *DB) runCompaction(
23292351
if len(iter.tombstones) > 0 {
23302352
startKey = iter.tombstones[0].Start
23312353
}
2354+
if startKey == nil {
2355+
startKey = c.rangeKeyFrag.Start()
2356+
if len(iter.rangeKeys) > 0 {
2357+
startKey = iter.rangeKeys[0].Start
2358+
}
2359+
}
23322360
if splitKey != nil && d.cmp(startKey, splitKey) == 0 {
23332361
return nil
23342362
}

compaction_iter.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -506,6 +506,10 @@ func (i *compactionIter) nextInStripe() stripeChangeType {
506506
return sameStripeNonSkippable
507507
}
508508
return newStripe
509+
case InternalKeyKindRangeKeySet, InternalKeyKindRangeKeyUnset, InternalKeyKindRangeKeyDelete:
510+
// Range keys are interleaved at the max sequence number for a given user
511+
// key, so we should not see any more range keys in this stripe.
512+
panic("unreachable")
509513
case InternalKeyKindInvalid:
510514
if i.curSnapshotIdx == origSnapshotIdx {
511515
return sameStripeNonSkippable

data_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -756,7 +756,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
756756
entry := d.newFlushableEntry(d.mu.mem.mutable, 0, 0)
757757
entry.readerRefs++
758758
d.mu.mem.queue = append(d.mu.mem.queue, entry)
759-
d.updateReadStateLocked(nil, nil)
759+
d.updateReadStateLocked(nil)
760760
}
761761
mem = d.mu.mem.mutable
762762
start, end = nil, nil
@@ -845,7 +845,7 @@ func runDBDefineCmd(td *datadriven.TestData, opts *Options) (*DB, error) {
845845
}); err != nil {
846846
return nil, err
847847
}
848-
d.updateReadStateLocked(nil, nil)
848+
d.updateReadStateLocked(nil)
849849
d.updateTableStatsLocked(ve.NewFiles)
850850
}
851851

db.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1983,7 +1983,7 @@ func (d *DB) makeRoomForWrite(b *Batch) error {
19831983
var entry *flushableEntry
19841984
d.mu.mem.mutable, entry = d.newMemTable(newLogNum, logSeqNum)
19851985
d.mu.mem.queue = append(d.mu.mem.queue, entry)
1986-
d.updateReadStateLocked(nil, nil)
1986+
d.updateReadStateLocked(nil)
19871987
if immMem.writerUnref() {
19881988
d.maybeScheduleFlush()
19891989
}

flush_external.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -85,7 +85,7 @@ func flushExternalTable(untypedDB interface{}, path string, originalMeta *fileMe
8585
}
8686
return err
8787
}
88-
d.updateReadStateLocked(d.opts.DebugCheck, nil)
88+
d.updateReadStateLocked(d.opts.DebugCheck)
8989
d.updateTableStatsLocked(ve.NewFiles)
9090
d.deleteObsoleteFiles(jobID, true /* waitForOngoing */)
9191
d.maybeScheduleCompaction()

ingest.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -849,7 +849,7 @@ func (d *DB) ingestApply(
849849
}); err != nil {
850850
return nil, err
851851
}
852-
d.updateReadStateLocked(d.opts.DebugCheck, nil)
852+
d.updateReadStateLocked(d.opts.DebugCheck)
853853
d.updateTableStatsLocked(ve.NewFiles)
854854
d.deleteObsoleteFiles(jobID, false /* waitForOngoing */)
855855
// The ingestion may have pushed a level over the threshold for compaction,

internal/keyspan/level_iter.go

Lines changed: 81 additions & 51 deletions
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (l *LevelIter) loadFile(file *manifest.FileMetadata, dir int) loadFileRetur
178178
return noFileLoaded
179179
}
180180
if indicator != fileAlreadyLoaded {
181-
l.iter, l.err = l.newIter(l.files.Current(), &l.tableOpts)
181+
l.iter, l.err = l.newIter(file, &l.tableOpts)
182182
indicator = newFileLoaded
183183
}
184184
if l.err != nil {
@@ -190,33 +190,42 @@ func (l *LevelIter) loadFile(file *manifest.FileMetadata, dir int) loadFileRetur
190190
// SeekGE implements keyspan.FragmentIterator.
191191
func (l *LevelIter) SeekGE(key []byte) *Span {
192192
l.dir = +1
193+
l.straddle = Span{}
194+
l.straddleDir = 0
193195
l.err = nil // clear cached iteration error
194196

195197
f := l.findFileGE(key)
196198
if f != nil && l.keyType == manifest.KeyTypeRange && l.cmp(key, f.SmallestRangeKey.UserKey) < 0 {
197-
// Return a straddling key instead of loading the file.
198-
l.iterFile = f
199-
l.iter = nil
200-
l.straddleDir = +1
201-
// The synthetic span that we are creating starts at the seeked key. This
202-
// is an optimization as it prevents us from loading the adjacent file's
203-
// bounds, at the expense of this iterator appearing "inconsistent" to its
204-
// callers i.e.:
205-
//
206-
// SeekGE(bb) -> {bb-c, empty}
207-
// Next() -> {c-d, RANGEKEYSET}
208-
// Prev() -> {a-c, empty}
209-
//
210-
// Seeing as the inconsistency will only be around empty spans, which are
211-
// expected to be elided by one of the higher-level iterators (either
212-
// top-level Iterator or the defragmenting iter), the entire iterator should
213-
// still appear consistent to the user.
214-
l.straddle = Span{
215-
Start: key,
216-
End: f.SmallestRangeKey.UserKey,
217-
Keys: nil,
199+
prevFile := l.files.Prev()
200+
if prevFile != nil {
201+
// We could unconditionally return an empty span between the seek key and
202+
// f.SmallestRangeKey, however if this span is to the left of all range
203+
// keys on this level, it could lead to inconsistent behaviour in relative
204+
// positioning operations. Consider this example, with a b-c range key:
205+
//
206+
// SeekGE(a) -> a-b:{}
207+
// Next() -> b-c{(#5,RANGEKEYSET,@4,foo)}
208+
// Prev() -> nil
209+
//
210+
// Iterators higher up in the iterator stack rely on this sort of relative
211+
// positioning consistency.
212+
//
213+
// TODO(bilal): Investigate ways to be able to return straddle spans in
214+
// cases similar to the above, while still retaining correctness.
215+
l.files.Next()
216+
// Return a straddling key instead of loading the file.
217+
l.iterFile = f
218+
if err := l.Close(); err != nil {
219+
return nil
220+
}
221+
l.straddleDir = +1
222+
l.straddle = Span{
223+
Start: prevFile.LargestRangeKey.UserKey,
224+
End: f.SmallestRangeKey.UserKey,
225+
Keys: nil,
226+
}
227+
return &l.straddle
218228
}
219-
return &l.straddle
220229
}
221230
loadFileIndicator := l.loadFile(f, +1)
222231
if loadFileIndicator == noFileLoaded {
@@ -231,33 +240,42 @@ func (l *LevelIter) SeekGE(key []byte) *Span {
231240
// SeekLT implements keyspan.FragmentIterator.
232241
func (l *LevelIter) SeekLT(key []byte) *Span {
233242
l.dir = -1
243+
l.straddle = Span{}
244+
l.straddleDir = 0
234245
l.err = nil // clear cached iteration error
235246

236247
f := l.findFileLT(key)
237248
if f != nil && l.keyType == manifest.KeyTypeRange && l.cmp(f.LargestRangeKey.UserKey, key) < 0 {
238-
// Return a straddling key instead of loading the file.
239-
l.iterFile = f
240-
l.iter = nil
241-
l.straddleDir = -1
242-
// The synthetic span that we are creating ends at the seeked key. This
243-
// is an optimization as it prevents us from loading the adjacent file's
244-
// bounds, at the expense of this iterator appearing "inconsistent" to its
245-
// callers i.e.:
246-
//
247-
// SeekLT(dd) -> {d-dd, empty}
248-
// Prev() -> {c-d, RANGEKEYSET}
249-
// Next() -> {d-e, empty}
250-
//
251-
// Seeing as the inconsistency will only be around empty spans, which are
252-
// expected to be elided by one of the higher-level iterators (either
253-
// top-level Iterator or the defragmenting iter), the entire iterator should
254-
// still appear consistent to the user.
255-
l.straddle = Span{
256-
Start: f.LargestRangeKey.UserKey,
257-
End: key,
258-
Keys: nil,
249+
nextFile := l.files.Next()
250+
if nextFile != nil {
251+
// We could unconditionally return an empty span between f.LargestRangeKey
252+
// and the seek key, however if this span is to the right of all range keys
253+
// on this level, it could lead to inconsistent behaviour in relative
254+
// positioning operations. Consider this example, with a b-c range key:
255+
//
256+
// SeekLT(d) -> c-d:{}
257+
// Prev() -> b-c{(#5,RANGEKEYSET,@4,foo)}
258+
// Next() -> nil
259+
//
260+
// Iterators higher up in the iterator stack rely on this sort of relative
261+
// positioning consistency.
262+
//
263+
// TODO(bilal): Investigate ways to be able to return straddle spans in
264+
// cases similar to the above, while still retaining correctness.
265+
l.files.Prev()
266+
// Return a straddling key instead of loading the file.
267+
l.iterFile = f
268+
if err := l.Close(); err != nil {
269+
return nil
270+
}
271+
l.straddleDir = -1
272+
l.straddle = Span{
273+
Start: f.LargestRangeKey.UserKey,
274+
End: nextFile.SmallestRangeKey.UserKey,
275+
Keys: nil,
276+
}
277+
return &l.straddle
259278
}
260-
return &l.straddle
261279
}
262280
if l.loadFile(l.findFileLT(key), -1) == noFileLoaded {
263281
return nil
@@ -271,6 +289,8 @@ func (l *LevelIter) SeekLT(key []byte) *Span {
271289
// First implements keyspan.FragmentIterator.
272290
func (l *LevelIter) First() *Span {
273291
l.dir = +1
292+
l.straddle = Span{}
293+
l.straddleDir = 0
274294
l.err = nil // clear cached iteration error
275295

276296
if l.loadFile(l.files.First(), +1) == noFileLoaded {
@@ -285,6 +305,8 @@ func (l *LevelIter) First() *Span {
285305
// Last implements keyspan.FragmentIterator.
286306
func (l *LevelIter) Last() *Span {
287307
l.dir = -1
308+
l.straddle = Span{}
309+
l.straddleDir = 0
288310
l.err = nil // clear cached iteration error
289311

290312
if l.loadFile(l.files.Last(), -1) == noFileLoaded {
@@ -298,10 +320,14 @@ func (l *LevelIter) Last() *Span {
298320

299321
// Next implements keyspan.FragmentIterator.
300322
func (l *LevelIter) Next() *Span {
301-
l.dir = +1
302-
if l.err != nil || (l.iter == nil && l.iterFile == nil) {
323+
if l.err != nil || (l.iter == nil && l.iterFile == nil && l.dir > 0) {
303324
return nil
304325
}
326+
if l.iter == nil && l.iterFile == nil {
327+
// l.dir <= 0
328+
return l.First()
329+
}
330+
l.dir = +1
305331

306332
if l.iter != nil {
307333
if span := l.iter.Next(); span != nil {
@@ -313,10 +339,14 @@ func (l *LevelIter) Next() *Span {
313339

314340
// Prev implements keyspan.FragmentIterator.
315341
func (l *LevelIter) Prev() *Span {
316-
l.dir = -1
317-
if l.err != nil || (l.iter == nil && l.iterFile == nil) {
342+
if l.err != nil || (l.iter == nil && l.iterFile == nil && l.dir < 0) {
318343
return nil
319344
}
345+
if l.iter == nil && l.iterFile == nil {
346+
// l.dir >= 0
347+
return l.Last()
348+
}
349+
l.dir = -1
320350

321351
if l.iter != nil {
322352
if span := l.iter.Prev(); span != nil {
@@ -355,7 +385,7 @@ func (l *LevelIter) skipEmptyFileForward() *Span {
355385
Start: startKey,
356386
End: endKey,
357387
}
358-
l.straddleDir = l.dir
388+
l.straddleDir = +1
359389
return &l.straddle
360390
}
361391
} else if l.straddleDir < 0 {
@@ -416,7 +446,7 @@ func (l *LevelIter) skipEmptyFileBackward() *Span {
416446
Start: startKey,
417447
End: endKey,
418448
}
419-
l.straddleDir = l.dir
449+
l.straddleDir = -1
420450
return &l.straddle
421451
}
422452
} else if l.straddleDir > 0 {

internal/keyspan/merging_iter.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -811,7 +811,7 @@ func (m *MergingIter) synthesizeKeys(dir int8) (bool, *Span) {
811811
}
812812
sort.Sort(&m.keys)
813813

814-
// Apply the configured transform. See VisibleTransform.
814+
// Apply the configured transform. See visibleTransform.
815815
s := Span{
816816
Start: m.start,
817817
End: m.end,

0 commit comments

Comments
 (0)