replay: add replay with pacing#2152
Conversation
0c6b2ec to
b619cd9
Compare
jbowens
left a comment
There was a problem hiding this comment.
Reviewed 1 of 8 files at r1, 1 of 10 files at r2, all commit messages.
Reviewable status: 2 of 12 files reviewed, 7 unresolved discussions (waiting on @coolcom200)
replay/replay.go line 43 at r2 (raw file):
// computeReadAmp calculates the read amplification from a manifest.Version func computeReadAmp(v *manifest.Version) int { refRAmp := v.L0Sublevels.ReadAmplification()
nit: s/refRamp/rAmp/
replay/replay.go line 55 at r2 (raw file):
// PaceByFixedReadAmp to wait on the dbMetricsNotifier condition variable if the // read amplification observed is greater than the specified target (refRAmp). func waitAndCompare(r *Runner, refRAmp int) {
nit: s/waitAndCompare/waitForReadAmpLE?
replay/replay.go line 85 at r2 (raw file):
// PaceByFixedReadAmp implements Pacer by applying each new write following a // fixed read amplification. type PaceByFixedReadAmp struct {
nit: can make this type PaceByFixedReadAmp int
replay/replay.go line 106 at r2 (raw file):
// Internal state. cancel func() workloadExhausted chan struct{}
can you add a comment explaining the meaning of 'workload exhausted' (we've applied all of the workload's modifications but might not have finished compacting). And also that only a single item is sent to the channel when exhausted, so it should only be read from the refreshMetrics goroutine
replay/replay.go line 325 at r2 (raw file):
} r.Pacer.pace(r, step)
mind leaving a TODO to sum all the paced durations to report as a statistic at the end?
replay/replay.go line 455 at r2 (raw file):
// database at this point when the workload was collected. This // can be useful for pacing. pv := s.v // save the previous version
s.v should be nil, because it's a local variable, no? I think we need save the current version to a global variable with a scope outside this step's iteration
replay/testdata/corpus/findManifestStart line 26 at r2 (raw file):
0188001729f6a822d807280901b42e00000000008a5cb45d0605d8fff39b 0601670014c4ff84011ef089fd0b53825747ac000188001729f6a826a109 900901c12e00000000001ef089fd0b5382580a
we generally try to have binary data be generated by the test itself, so it's programmatically able to be reconstructed. it also helps with understanding the test itself
53d6a9a to
085d270
Compare
d1ae213 to
af8357f
Compare
af8357f to
3b107a1
Compare
jbowens-alt
left a comment
There was a problem hiding this comment.
Reviewed 1 of 11 files at r3, 1 of 10 files at r7, all commit messages.
Reviewable status: 2 of 13 files reviewed, 18 unresolved discussions (waiting on @coolcom200 and @jbowens)
internal/datatest/datatest.go line 96 at r7 (raw file):
} // CompactionTracker is a listener that tracks the number of compactions
nit: period
internal/datatest/datatest.go line 103 at r7 (raw file):
} // NewCompactionTracker setups the necessary to keep track of the
nit: necessary .. "options"?
internal/datatest/datatest.go line 104 at r7 (raw file):
// NewCompactionTracker setups the necessary to keep track of the // compactions that are in flight
nit: period
internal/datatest/datatest.go line 106 at r7 (raw file):
// compactions that are in flight func NewCompactionTracker(options *pebble.Options) *CompactionTracker { cql := CompactionTracker{}
nit: s/cql/ct/?
internal/datatest/datatest.go line 131 at r7 (raw file):
} else { options.EventListener = &el }
let's copy over Options.AddEventListener from here and use that
internal/datatest/datatest.go line 135 at r7 (raw file):
} // WaitForInflightCompactionsToEqual waits until compactions meet the specified target
nit: missing period
replay/replay.go line 108 at r7 (raw file):
// have been applied. However once this channel is closed the replayer could // still not be finished as there could still be compactions running in the // background.
this comment needs to be updated now that this channel is no longer closed; it's sent to when the workload is finished. can you add that workloadExhausted is a buffered channel of size 1, so only refreshMetrics can read from it.
replay/replay.go line 188 at r7 (raw file):
l = r.eventListener() } r.Opts.EventListener = &l
we can use the new Options.AddEventListener here too
replay/replay.go line 223 at r7 (raw file):
for !done { select { case <-r.workloadExhausted:
can you add a comment here explaining that r.workloadExhausted does not get closed; it just gets an empty value sent to it, so this case will trigger at most once.
replay/replay.go line 373 at r7 (raw file):
var v *manifest.Version var previousVersion *manifest.Version var bve manifest.BulkVersionEdit
I forgot to set bve.AddedByFileNum. We should initialize it to make(map[base.FileNum]*FileMetadata)
This map will keep track of all the file metadata so that when a file is deleted, it can look up the deleted file's metadata from the map. i'm a little surprised our unit tests haven't caught this yet.
replay/replay.go line 387 at r7 (raw file):
r.Opts.FlushSplitBytes, r.Opts.Experimental.ReadCompactionRate) bve = manifest.BulkVersionEdit{}
and then we should AddedByFileNum here, eg:
bve = manifest.BulkVersionEdit{AddedByFileNum: bve.AddedByFileNum}
3b107a1 to
aa287af
Compare
coolcom200
left a comment
There was a problem hiding this comment.
Reviewed 1 of 8 files at r1, 1 of 11 files at r3, 1 of 11 files at r6, 10 of 10 files at r7, 1 of 4 files at r8.
Reviewable status: 11 of 14 files reviewed, 10 unresolved discussions (waiting on @jbowens and @jbowens-alt)
internal/datatest/datatest.go line 131 at r7 (raw file):
Previously, jbowens-alt (Jackson Owens (v2)) wrote…
let's copy over
Options.AddEventListenerfrom here and use that
Done.
replay/replay.go line 106 at r2 (raw file):
Previously, jbowens (Jackson Owens) wrote…
can you add a comment explaining the meaning of 'workload exhausted' (we've applied all of the workload's modifications but might not have finished compacting). And also that only a single item is sent to the channel when exhausted, so it should only be read from the
refreshMetricsgoroutine
Done.
replay/replay.go line 325 at r2 (raw file):
Previously, jbowens (Jackson Owens) wrote…
mind leaving a TODO to sum all the paced durations to report as a statistic at the end?
Done.
replay/replay.go line 455 at r2 (raw file):
Previously, jbowens (Jackson Owens) wrote…
s.v should be nil, because it's a local variable, no? I think we need save the current version to a global variable with a scope outside this step's iteration
Done.
replay/replay.go line 108 at r7 (raw file):
Previously, jbowens-alt (Jackson Owens (v2)) wrote…
this comment needs to be updated now that this channel is no longer closed; it's sent to when the workload is finished. can you add that workloadExhausted is a buffered channel of size 1, so only
refreshMetricscan read from it.
The comment I think is still accurate as the channel is closed. I have changed the channel to not be a initialized as buffered channel.
replay/replay.go line 188 at r7 (raw file):
Previously, jbowens-alt (Jackson Owens (v2)) wrote…
we can use the new
Options.AddEventListenerhere too
Done.
replay/replay.go line 223 at r7 (raw file):
Previously, jbowens-alt (Jackson Owens (v2)) wrote…
can you add a comment here explaining that
r.workloadExhausteddoes not get closed; it just gets an empty value sent to it, so this case will trigger at most once.
This channel does get closed.
replay/replay.go line 373 at r7 (raw file):
I think we already have the metadata saved above (line 367) I think we just don't use it
metas := make(map[base.FileNum]*manifest.FileMetadata)
I think that we can just do this. Please correct me if I misunderstood.
@@ -384,7 +378,7 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error {
r.Opts.Comparer.FormatKey,
r.Opts.FlushSplitBytes,
r.Opts.Experimental.ReadCompactionRate)
- bve = manifest.BulkVersionEdit{}
+ bve = manifest.BulkVersionEdit{AddedByFileNum: metas}
return v, err
}replay/replay.go line 387 at r7 (raw file):
Previously, jbowens-alt (Jackson Owens (v2)) wrote…
and then we should
AddedByFileNumhere, eg:bve = manifest.BulkVersionEdit{AddedByFileNum: bve.AddedByFileNum}
See above comment.
jbowens-alt
left a comment
There was a problem hiding this comment.
Reviewable status: 11 of 14 files reviewed, 8 unresolved discussions (waiting on @coolcom200 and @jbowens)
replay/replay.go line 108 at r7 (raw file):
Previously, coolcom200 (Leon Fattakhov) wrote…
The comment I think is still accurate as the channel is closed. I have changed the channel to not be a initialized as buffered channel.
Hrm, I'm confused. I thought we deliberately used a buffered channel and a send so that the select in refreshMetrics doesn't always fall into the r.workloadExhausted case once the workload is complete.
replay/replay.go line 373 at r7 (raw file):
Previously, coolcom200 (Leon Fattakhov) wrote…
I think we already have the metadata saved above (line 367) I think we just don't use it
metas := make(map[base.FileNum]*manifest.FileMetadata)
I think that we can just do this. Please correct me if I misunderstood.
@@ -384,7 +378,7 @@ func (r *Runner) prepareWorkloadSteps(ctx context.Context) error { r.Opts.Comparer.FormatKey, r.Opts.FlushSplitBytes, r.Opts.Experimental.ReadCompactionRate) - bve = manifest.BulkVersionEdit{} + bve = manifest.BulkVersionEdit{AddedByFileNum: metas} return v, err }
I forgot about metas—I don't think we need to manually accumulate them in applyVE then. BulkVersionEdit.Accumulate will already do that for us.
27348fa to
b64a6d9
Compare
coolcom200
left a comment
There was a problem hiding this comment.
Reviewable status: 11 of 14 files reviewed, 8 unresolved discussions (waiting on @jbowens and @jbowens-alt)
replay/replay.go line 108 at r7 (raw file):
Previously, jbowens-alt (Jackson Owens (v2)) wrote…
Hrm, I'm confused. I thought we deliberately used a buffered channel and a send so that the select in
refreshMetricsdoesn't always fall into ther.workloadExhaustedcase once the workload is complete.
Updated comment and changed it to send a single message a message.
replay/replay.go line 373 at r7 (raw file):
Previously, jbowens-alt (Jackson Owens (v2)) wrote…
I forgot about
metas—I don't think we need to manually accumulate them inapplyVEthen.BulkVersionEdit.Accumulatewill already do that for us.
Sounds good. Added an issue to make a test for this #2200
replay/testdata/corpus/findManifestStart line 26 at r2 (raw file):
Previously, jbowens (Jackson Owens) wrote…
we generally try to have binary data be generated by the test itself, so it's programmatically able to be reconstructed. it also helps with understanding the test itself
Added an issue to change this test: #2201
jbowens
left a comment
There was a problem hiding this comment.
Reviewable status: 11 of 14 files reviewed, 9 unresolved discussions (waiting on @coolcom200 and @jbowens-alt)
replay/replay.go line 129 at r9 (raw file):
compactionEndedCount int compactionStartLock sync.Mutex
let's use atomics instead of a mutex for compactionStartCount
jbowens
left a comment
There was a problem hiding this comment.
modulo the nit and the use of atomics for the compaction start count
Reviewable status: 11 of 14 files reviewed, 6 unresolved discussions (waiting on @coolcom200 and @jbowens-alt)
replay/replay.go line 255 at r9 (raw file):
func (r *Runner) stopCompactionNotifier() { // Close the channel which will cause the compactionNotified to exit
compactionNotified goroutine
Add pacer interface, and three pacers `PaceByReferenceReadAmp` , `PaceByFixedReadAmp` and `Unpaced`. The pacer is designed to ensure that workloads are processed in a similar manner to the load when being collected. This allows collection to be performed, and replayed on machines with different performance characteristics. The pacer implementations checks the `pebble.Metrics` on the `Runner` in order to determine if an operation should be applied during replay. The metrics on the runner are updated by a separate goroutine that is signaled by the `compactionNotified` goroutine when a compaction ends. The `compactionNotified` goroutine ensures that we synchronizes the compaction end events and metric updates. That is we don’t miss updating metrics when compactions do occur. Additionally add a compaction tracker for testing purposes that keeps track of the number of compactions and allows for waiting till a compaction reaches a desired threshold.
b64a6d9 to
54c7177
Compare
coolcom200
left a comment
There was a problem hiding this comment.
Reviewable status: 11 of 14 files reviewed, 6 unresolved discussions (waiting on @jbowens and @jbowens-alt)
replay/replay.go line 129 at r9 (raw file):
Previously, jbowens (Jackson Owens) wrote…
let's use atomics instead of a mutex for
compactionStartCount
Done.
replay/replay.go line 255 at r9 (raw file):
Previously, jbowens (Jackson Owens) wrote…
compactionNotified goroutine
Done.
|
TFTRs!!! |
Add pacer interface, and three pacers
PaceByReferenceReadAmp,PaceByFixedReadAmpandUnpaced. The pacer is designed to ensure thatworkloads are processed in a similar manner to the load when being
collected. This allows collection to be performed, and replayed on
machines with different performance characteristics.
The pacer implementations checks the
pebble.Metricson theRunnerin order to determine if an operation should be applied during replay.
The metrics on the runner are updated by a separate goroutine that is
signaled by the
compactionNotifiedgoroutine when a compaction ends.The
compactionNotifiedgoroutine ensures that we synchronizes thecompaction end events and metric updates. That is we don’t miss updating
metrics when compactions do occur.
Additionally add a compaction tracker for testing purposes that keeps
track of the number of compactions and allows for waiting till a
compaction reaches a desired threshold.
Resolves: #2057