Skip to content

Commit 8cdcf77

Browse files
committed
roachtest: port tpcc/mixed-headroom to the new framework
Note that the new framework randomizes how many version upgrades occur. This removes the need for both a single upgrade and multiple upgrade test and the two were merged. Release note: None Fixes: #110537
1 parent 6cc1224 commit 8cdcf77

2 files changed

Lines changed: 66 additions & 221 deletions

File tree

pkg/cmd/roachtest/tests/tpcc.go

Lines changed: 66 additions & 172 deletions
Original file line numberDiff line numberDiff line change
@@ -24,16 +24,15 @@ import (
2424
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/cluster"
2525
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/option"
2626
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/registry"
27+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil"
2728
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/clusterupgrade"
29+
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/roachtestutil/mixedversion"
2830
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/spec"
2931
"github.com/cockroachdb/cockroach/pkg/cmd/roachtest/test"
3032
"github.com/cockroachdb/cockroach/pkg/roachprod/install"
3133
"github.com/cockroachdb/cockroach/pkg/roachprod/logger"
3234
"github.com/cockroachdb/cockroach/pkg/roachprod/prometheus"
33-
"github.com/cockroachdb/cockroach/pkg/roachprod/vm"
3435
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
35-
"github.com/cockroachdb/cockroach/pkg/testutils/release"
36-
"github.com/cockroachdb/cockroach/pkg/util/randutil"
3736
"github.com/cockroachdb/cockroach/pkg/util/search"
3837
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
3938
"github.com/cockroachdb/cockroach/pkg/util/version"
@@ -354,63 +353,12 @@ func maxSupportedTPCCWarehouses(
354353
return warehouses
355354
}
356355

357-
type backgroundFn func(ctx context.Context, u *versionUpgradeTest) error
358-
359-
// A backgroundStepper is a tool to run long-lived commands while a cluster is
360-
// going through a sequence of version upgrade operations.
361-
// It exposes a `launch` step that launches the method carrying out long-running
362-
// work (in the background) and a `stop` step collecting any errors.
363-
type backgroundStepper struct {
364-
// This is the operation that will be launched in the background. When the
365-
// context gets canceled, it should shut down and return without an error.
366-
// The way to typically get this is:
367-
//
368-
// err := doSomething(ctx)
369-
// ctx.Err() != nil {
370-
// return nil
371-
// }
372-
// return err
373-
run backgroundFn
374-
// When not nil, called with the error within `.stop()`. The interceptor
375-
// gets a chance to ignore the error or produce a different one (via t.Fatal).
376-
onStop func(context.Context, test.Test, *versionUpgradeTest, error)
377-
nodes option.NodeListOption // nodes to monitor, defaults to c.All()
378-
379-
// Internal.
380-
m cluster.Monitor
381-
}
382-
383-
// launch spawns the function the background step was initialized with.
384-
func (s *backgroundStepper) launch(ctx context.Context, t test.Test, u *versionUpgradeTest) {
385-
nodes := s.nodes
386-
if nodes == nil {
387-
nodes = u.c.All()
388-
}
389-
s.m = u.c.NewMonitor(ctx, nodes)
390-
s.m.Go(func(ctx context.Context) error {
391-
return s.run(ctx, u)
392-
})
393-
}
394-
395-
func (s *backgroundStepper) wait(ctx context.Context, t test.Test, u *versionUpgradeTest) {
396-
// We don't care about the workload failing since we only use it to produce a
397-
// few `RESTORE` jobs. And indeed workload will fail because it does not
398-
// tolerate pausing of its jobs.
399-
err := s.m.WaitE()
400-
if s.onStop != nil {
401-
s.onStop(ctx, t, u, err)
402-
} else if err != nil {
403-
t.Fatal(err)
404-
}
405-
}
406-
407356
// runTPCCMixedHeadroom runs a mixed-version test that imports a large
408-
// `bank` dataset, and runs one or multiple database upgrades while a
409-
// TPCC workload is running. The number of database upgrades is
410-
// controlled by the `versionsToUpgrade` parameter.
411-
func runTPCCMixedHeadroom(
412-
ctx context.Context, t test.Test, c cluster.Cluster, versionsToUpgrade int,
413-
) {
357+
// `bank` dataset, and runs multiple database upgrades while a TPCC
358+
// workload is running. The number of database upgrades is randomized
359+
// by the mixed-version framework which chooses a random predecessor version
360+
// and upgrades until it reaches the current version.
361+
func runTPCCMixedHeadroom(ctx context.Context, t test.Test, c cluster.Cluster) {
414362
crdbNodes := c.Range(1, c.Spec().NodeCount-1)
415363
workloadNode := c.Node(c.Spec().NodeCount)
416364

@@ -420,26 +368,6 @@ func runTPCCMixedHeadroom(
420368
headroomWarehouses = 10
421369
}
422370

423-
// We'll need this below.
424-
tpccBackgroundStepper := func(duration time.Duration) backgroundStepper {
425-
return backgroundStepper{
426-
nodes: crdbNodes,
427-
run: func(ctx context.Context, u *versionUpgradeTest) error {
428-
t.L().Printf("running background TPCC workload for %s", duration)
429-
runTPCC(ctx, t, c, tpccOptions{
430-
Warehouses: headroomWarehouses,
431-
Duration: duration,
432-
SetupType: usingExistingData,
433-
Start: func(ctx context.Context, t test.Test, c cluster.Cluster) {
434-
// Noop - we don't let tpcc upload or start binaries in this test.
435-
},
436-
})
437-
return nil
438-
}}
439-
}
440-
441-
randomCRDBNode := func() int { return crdbNodes.RandNode()[0] }
442-
443371
// NB: this results in ~100GB of (actual) disk usage per node once things
444372
// have settled down, and ~7.5k ranges. The import takes ~40 minutes.
445373
// The full 6.5m import ran into out of disk errors (on 250gb machines),
@@ -449,91 +377,74 @@ func runTPCCMixedHeadroom(
449377
bankRows = 1000
450378
}
451379

452-
rng, seed := randutil.NewLockedPseudoRand()
453-
t.L().Printf("using random seed %d", seed)
454-
history, err := release.RandomPredecessorHistory(rng, t.BuildVersion(), versionsToUpgrade)
455-
if err != nil {
456-
t.Fatal(err)
457-
}
458-
sep := " -> "
459-
t.L().Printf("testing upgrade: %s%scurrent", strings.Join(history, sep), sep)
460-
releases := make([]*clusterupgrade.Version, 0, len(history))
461-
for _, v := range history {
462-
releases = append(releases, clusterupgrade.MustParseVersion(v))
463-
}
464-
releases = append(releases, clusterupgrade.CurrentVersion())
380+
mvt := mixedversion.NewTest(ctx, t, t.L(), c, crdbNodes)
465381

466-
waitForWorkloadToRampUp := sleepStep(rampDuration(c.IsLocal()))
467-
logStep := func(format string, args ...interface{}) versionStep {
468-
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
469-
t.L().Printf(format, args...)
470-
}
382+
importTPCC := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
383+
randomNode := c.Node(h.RandomNode(rng, crdbNodes))
384+
cmd := tpccImportCmdWithCockroachBinary(test.DefaultCockroachPath, headroomWarehouses, fmt.Sprintf("{pgurl%s}", randomNode))
385+
return c.RunE(ctx, randomNode, cmd)
471386
}
472387

473-
oldestVersion := releases[0]
474-
setupSteps := []versionStep{
475-
logStep("starting from fixture at version %s", oldestVersion),
476-
uploadAndStartFromCheckpointFixture(crdbNodes, oldestVersion),
477-
waitForUpgradeStep(crdbNodes), // let oldest version settle (gossip etc)
478-
uploadVersionStep(workloadNode, clusterupgrade.CurrentVersion()), // for tpccBackgroundStepper's workload
479-
480-
// Load TPCC dataset, don't run TPCC yet. We do this while in the
481-
// version we are starting with to load some data and hopefully
482-
// create some state that will need work by long-running
483-
// migrations.
484-
importTPCCStep(oldestVersion, headroomWarehouses, crdbNodes),
485-
// Add a lot of cold data to this cluster. This further stresses the version
486-
// upgrade machinery, in which a) all ranges are touched and b) work proportional
487-
// to the amount data may be carried out.
488-
importLargeBankStep(oldestVersion, bankRows, crdbNodes),
388+
// Add a lot of cold data to this cluster. This further stresses the version
389+
// upgrade machinery, in which a) all ranges are touched and b) work proportional
390+
// to the amount data may be carried out.
391+
importLargeBank := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
392+
randomNode := c.Node(h.RandomNode(rng, crdbNodes))
393+
cmd := roachtestutil.NewCommand(fmt.Sprintf("%s workload fixtures import bank", test.DefaultCockroachPath)).
394+
Arg("{pgurl%s}", randomNode).
395+
Flag("payload-bytes", 10240).
396+
Flag("rows", bankRows).
397+
Flag("seed", 4).
398+
Flag("db", "bigbank").
399+
String()
400+
return c.RunE(ctx, randomNode, cmd)
489401
}
490402

491-
// upgradeToVersionSteps returns the list of steps to be performed
492-
// when upgrading to the given version.
493-
upgradeToVersionSteps := func(crdbVersion *clusterupgrade.Version) []versionStep {
494-
duration := 10 * time.Minute
495-
if crdbVersion.IsCurrent() {
496-
duration = 100 * time.Minute
497-
}
498-
tpccWorkload := tpccBackgroundStepper(duration)
499-
500-
return []versionStep{
501-
logStep("upgrading to version %q", crdbVersion.String()),
502-
preventAutoUpgradeStep(randomCRDBNode()),
503-
// Upload and restart cluster into the new
504-
// binary (stays at previous cluster version).
505-
binaryUpgradeStep(crdbNodes, crdbVersion),
506-
// Now start running TPCC in the background.
507-
tpccWorkload.launch,
508-
// Wait for the workload to ramp up before attemping to
509-
// upgrade the cluster version. If we start the migrations
510-
// immediately after launching the tpcc workload above, they
511-
// could finish "too quickly", before the workload had a
512-
// chance to pick up the pace (starting all the workers, range
513-
// merge/splits, compactions, etc). By waiting here, we
514-
// increase the concurrency exposed to the upgrade migrations,
515-
// and increase the chances of exposing bugs (such as #83079).
516-
waitForWorkloadToRampUp,
517-
// While tpcc is running in the background, bump the cluster
518-
// version manually. We do this over allowing automatic upgrades
519-
// to get a better idea of what errors come back here, if any.
520-
// This will block until the long-running migrations have run.
521-
allowAutoUpgradeStep(randomCRDBNode()),
522-
waitForUpgradeStep(crdbNodes),
523-
// Wait until TPCC background run terminates
524-
// and fail if it reports an error.
525-
tpccWorkload.wait,
403+
// We don't run this in the background using the Workload() wrapper. We want
404+
// it to block and wait for the workload to ramp up before attempting to upgrade
405+
// the cluster version. If we start the migrations immediately after launching
406+
// the tpcc workload, they could finish "too quickly", before the workload had
407+
// a chance to pick up the pace (starting all the workers, range merge/splits,
408+
// compactions, etc). By waiting here, we increase the concurrency exposed to
409+
// the upgrade migrations, and increase the chances of exposing bugs (such as #83079).
410+
runTPCCWorkload := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
411+
workloadDur := 10 * time.Minute
412+
rampDur := rampDuration(c.IsLocal())
413+
// If migrations are running we want to ramp up the workload faster in order
414+
// to expose them to more concurrent load. In a similar goal, we also let the
415+
// TPCC workload run longer.
416+
if h.Context().Finalizing && !c.IsLocal() {
417+
rampDur = 1 * time.Minute
418+
if h.Context().ToVersion.IsCurrent() {
419+
workloadDur = 100 * time.Minute
420+
}
526421
}
422+
cmd := roachtestutil.NewCommand("./cockroach workload run tpcc").
423+
Arg("{pgurl%s}", crdbNodes).
424+
Flag("duration", workloadDur).
425+
Flag("warehouses", headroomWarehouses).
426+
Flag("histograms", t.PerfArtifactsDir()+"/stats.json").
427+
Flag("ramp", rampDur).
428+
Flag("prometheus-port", 2112).
429+
Flag("pprofport", workloadPProfStartPort).
430+
String()
431+
return c.RunE(ctx, workloadNode, cmd)
527432
}
528433

529-
// Test steps consist of the setup steps + the upgrade steps for
530-
// each upgrade being carried out here.
531-
testSteps := append([]versionStep{}, setupSteps...)
532-
for _, nextVersion := range releases[1:] {
533-
testSteps = append(testSteps, upgradeToVersionSteps(nextVersion)...)
434+
checkTPCCWorkload := func(ctx context.Context, l *logger.Logger, rng *rand.Rand, h *mixedversion.Helper) error {
435+
cmd := roachtestutil.NewCommand(fmt.Sprintf("%s workload check tpcc", test.DefaultCockroachPath)).
436+
Arg("{pgurl:1}").
437+
Flag("warehouses", headroomWarehouses).
438+
String()
439+
return c.RunE(ctx, workloadNode, cmd)
534440
}
535441

536-
newVersionUpgradeTest(c, testSteps...).run(ctx, t)
442+
uploadVersion(ctx, t, c, workloadNode, clusterupgrade.CurrentVersion())
443+
mvt.OnStartup("load TPCC dataset", importTPCC)
444+
mvt.OnStartup("load bank dataset", importLargeBank)
445+
mvt.InMixedVersion("TPCC workload", runTPCCWorkload)
446+
mvt.AfterUpgradeFinalized("check TPCC workload", checkTPCCWorkload)
447+
mvt.Run()
537448
}
538449

539450
func registerTPCC(r registry.Registry) {
@@ -579,27 +490,10 @@ func registerTPCC(r registry.Registry) {
579490
Cluster: mixedHeadroomSpec,
580491
EncryptionSupport: registry.EncryptionMetamorphic,
581492
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
582-
runTPCCMixedHeadroom(ctx, t, c, 1)
493+
runTPCCMixedHeadroom(ctx, t, c)
583494
},
584495
})
585496

586-
// N.B. Multiple upgrades may require a released version < 22.2.x, which wasn't built for ARM64.
587-
mixedHeadroomMultiUpgradesSpec := r.MakeClusterSpec(5, spec.CPU(16), spec.RandomlyUseZfs(), spec.Arch(vm.ArchAMD64))
588-
589-
r.Add(registry.TestSpec{
590-
// run the same mixed-headroom test, but going back two versions
591-
Name: "tpcc/mixed-headroom/multiple-upgrades/" + mixedHeadroomMultiUpgradesSpec.String(),
592-
Timeout: 5 * time.Hour,
593-
Owner: registry.OwnerTestEng,
594-
CompatibleClouds: registry.AllExceptAWS,
595-
Suites: registry.Suites(registry.Nightly),
596-
Tags: registry.Tags(`default`),
597-
Cluster: mixedHeadroomMultiUpgradesSpec,
598-
EncryptionSupport: registry.EncryptionMetamorphic,
599-
Run: func(ctx context.Context, t test.Test, c cluster.Cluster) {
600-
runTPCCMixedHeadroom(ctx, t, c, 2)
601-
},
602-
})
603497
r.Add(registry.TestSpec{
604498
Name: "tpcc-nowait/nodes=3/w=1",
605499
Owner: registry.OwnerTestEng,

pkg/cmd/roachtest/tests/versionupgrade.go

Lines changed: 0 additions & 49 deletions
Original file line numberDiff line numberDiff line change
@@ -433,55 +433,6 @@ done
433433
}).run(ctx, t)
434434
}
435435

436-
// importTPCCStep runs a TPCC import import on the first crdbNode (monitoring them all for
437-
// crashes during the import). If oldV is nil, this runs the import using the specified
438-
// version (for example "19.2.1", as provided by LatestPredecessor()) using the location
439-
// used by c.Stage(). An empty oldV uses the main cockroach binary.
440-
func importTPCCStep(
441-
oldV *clusterupgrade.Version, headroomWarehouses int, crdbNodes option.NodeListOption,
442-
) versionStep {
443-
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
444-
// We need to use the predecessor binary to load into the
445-
// predecessor cluster to avoid random breakage. For example, you
446-
// can't use 21.1 to import into 20.2 due to some flag changes.
447-
//
448-
// TODO(tbg): also import a large dataset (for example 2TB bank)
449-
// that will provide cold data that may need to be migrated.
450-
var cmd string
451-
if oldV.IsCurrent() {
452-
cmd = tpccImportCmd(headroomWarehouses)
453-
} else {
454-
cmd = tpccImportCmdWithCockroachBinary(clusterupgrade.BinaryPathForVersion(t, oldV), headroomWarehouses, "--checks=false")
455-
}
456-
// Use a monitor so that we fail cleanly if the cluster crashes
457-
// during import.
458-
m := u.c.NewMonitor(ctx, crdbNodes)
459-
m.Go(func(ctx context.Context) error {
460-
return u.c.RunE(ctx, u.c.Node(crdbNodes[0]), cmd)
461-
})
462-
m.Wait()
463-
}
464-
}
465-
466-
func importLargeBankStep(
467-
oldV *clusterupgrade.Version, rows int, crdbNodes option.NodeListOption,
468-
) versionStep {
469-
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
470-
// Use the predecessor binary to load into the predecessor
471-
// cluster to avoid random breakage due to flag changes, etc.
472-
binary := clusterupgrade.BinaryPathForVersion(t, oldV)
473-
474-
// Use a monitor so that we fail cleanly if the cluster crashes
475-
// during import.
476-
m := u.c.NewMonitor(ctx, crdbNodes)
477-
m.Go(func(ctx context.Context) error {
478-
return u.c.RunE(ctx, u.c.Node(crdbNodes[0]), binary, "workload", "fixtures", "import", "bank",
479-
"--payload-bytes=10240", "--rows="+fmt.Sprint(rows), "--seed=4", "--db=bigbank")
480-
})
481-
m.Wait()
482-
}
483-
}
484-
485436
func sleepStep(d time.Duration) versionStep {
486437
return func(ctx context.Context, t test.Test, u *versionUpgradeTest) {
487438
time.Sleep(d)

0 commit comments

Comments
 (0)