Skip to content

Commit 5b60540

Browse files
workload/schemachange: add expected error handling
Previously, there was no mechanism for checking for expected errors vs unexpected errors when operations fail due to intentionally produced errors. This commit adds logic for screening the types of errors that are generated and state variables to track which errors are expected. Release note: None
1 parent 7a7e48a commit 5b60540

4 files changed

Lines changed: 165 additions & 13 deletions

File tree

pkg/workload/schemachange/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -4,6 +4,7 @@ go_library(
44
name = "schemachange",
55
srcs = [
66
"deck.go",
7+
"error_code_set.go",
78
"operation_generator.go",
89
"optype_string.go",
910
"schemachange.go",
Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
// Copyright 2020 The Cockroach Authors.
2+
//
3+
// Use of this software is governed by the Business Source License
4+
// included in the file licenses/BSL.txt.
5+
//
6+
// As of the Change Date specified in that file, in accordance with
7+
// the Business Source License, use of this software will be governed
8+
// by the Apache License, Version 2.0, included in the file
9+
// licenses/APL.txt.
10+
11+
package schemachange
12+
13+
import (
14+
"sort"
15+
"strings"
16+
17+
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
18+
)
19+
20+
type errorCodeSet map[pgcode.Code]bool
21+
22+
func makeExpectedErrorSet() errorCodeSet {
23+
return errorCodeSet(map[pgcode.Code]bool{})
24+
}
25+
26+
func (set errorCodeSet) add(code pgcode.Code) {
27+
set[code] = true
28+
}
29+
30+
func (set errorCodeSet) reset() {
31+
for k := range set {
32+
delete(set, k)
33+
}
34+
}
35+
36+
func (set errorCodeSet) contains(code pgcode.Code) bool {
37+
_, ok := set[code]
38+
return ok
39+
}
40+
41+
func (set errorCodeSet) string() string {
42+
var codes []string
43+
for code := range set {
44+
codes = append(codes, code.String())
45+
}
46+
sort.Strings(codes)
47+
return strings.Join(codes, ",")
48+
}
49+
50+
func (set errorCodeSet) empty() bool {
51+
return len(set) == 0
52+
}
53+
54+
type codesWithConditions []struct {
55+
code pgcode.Code
56+
condition bool
57+
}
58+
59+
func (c codesWithConditions) add(s errorCodeSet) {
60+
for _, cc := range c {
61+
if cc.condition {
62+
s.add(cc.code)
63+
}
64+
}
65+
}

pkg/workload/schemachange/operation_generator.go

Lines changed: 33 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -39,18 +39,45 @@ type operationGeneratorParams struct {
3939

4040
// The OperationBuilder has the sole responsibility of generating ops
4141
type operationGenerator struct {
42-
params *operationGeneratorParams
42+
params *operationGeneratorParams
43+
screenForExecErrors bool
44+
expectedExecErrors errorCodeSet
45+
expectedCommitErrors errorCodeSet
4346
}
4447

4548
func makeOperationGenerator(params *operationGeneratorParams) *operationGenerator {
4649
return &operationGenerator{
47-
params: params,
50+
params: params,
51+
expectedExecErrors: makeExpectedErrorSet(),
52+
expectedCommitErrors: makeExpectedErrorSet(),
4853
}
4954
}
5055

56+
// Reset internal state used per operation within a transaction
57+
func (og *operationGenerator) resetOpState() {
58+
og.screenForExecErrors = false
59+
og.expectedExecErrors.reset()
60+
}
61+
62+
// Reset internal state used per transaction
63+
func (og *operationGenerator) resetTxnState() {
64+
og.expectedCommitErrors.reset()
65+
}
66+
5167
//go:generate stringer -type=opType
5268
type opType int
5369

70+
// opsWithErrorScreening stores ops which currently check for exec
71+
// errors and update expectedExecErrors in the op generator state
72+
var opsWithExecErrorScreening = map[opType]bool{}
73+
74+
func opScreensForExecErrors(op opType) bool {
75+
if _, exists := opsWithExecErrorScreening[op]; exists {
76+
return true
77+
}
78+
return false
79+
}
80+
5481
const (
5582
addColumn opType = iota // ALTER TABLE <table> ADD [COLUMN] <column> <type>
5683
addConstraint // ALTER TABLE <table> ADD CONSTRAINT <constraint> <def>
@@ -226,6 +253,10 @@ func (og *operationGenerator) randOp(tx *pgx.Tx) (string, string, error) {
226253
log.WriteString(fmt.Sprintf("NOOP: %s -> %v\n", op, err))
227254
continue
228255
}
256+
257+
if opScreensForExecErrors(op) {
258+
og.screenForExecErrors = true
259+
}
229260
return stmt, log.String(), err
230261
}
231262
}

pkg/workload/schemachange/schemachange.go

Lines changed: 66 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,9 @@ import (
1717
"math/rand"
1818
"runtime"
1919
"strings"
20+
"time"
2021

22+
"github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
2123
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
2224
"github.com/cockroachdb/cockroach/pkg/workload"
2325
"github.com/cockroachdb/cockroach/pkg/workload/histogram"
@@ -216,11 +218,29 @@ var (
216218
errRunInTxnRbkSentinel = errors.New("txn needs to rollback")
217219
)
218220

221+
type histBin int
222+
223+
const (
224+
operationOk histBin = iota
225+
txnOk
226+
txnCommitError
227+
txnRollback
228+
)
229+
230+
func (d histBin) String() string {
231+
return [...]string{"opOk", "txnOk", "txnCmtErr", "txnRbk"}[d]
232+
}
233+
234+
func (w *schemaChangeWorker) recordInHist(elapsed time.Duration, bin histBin) {
235+
w.hists.Get(bin.String()).Record(elapsed)
236+
}
237+
219238
func (w *schemaChangeWorker) runInTxn(tx *pgx.Tx) (string, error) {
220239
var log strings.Builder
221240
opsNum := 1 + w.opGen.randIntn(w.maxOpsPerWorker)
222241

223242
for i := 0; i < opsNum; i++ {
243+
w.opGen.resetOpState()
224244
op, noops, err := w.opGen.randOp(tx)
225245
if err != nil {
226246
return noops, errors.Mark(
@@ -234,16 +254,48 @@ func (w *schemaChangeWorker) runInTxn(tx *pgx.Tx) (string, error) {
234254
}
235255
log.WriteString(fmt.Sprintf(" %s;\n", op))
236256
if !w.dryRun {
237-
histBin := "opOk"
238257
start := timeutil.Now()
258+
239259
if _, err = tx.Exec(op); err != nil {
240-
histBin = "txnRbk"
241-
log.WriteString(fmt.Sprintf("***FAIL: %v\n", err))
242-
log.WriteString("ROLLBACK;\n")
260+
if w.opGen.screenForExecErrors {
261+
if w.opGen.expectedExecErrors.empty() {
262+
log.WriteString(fmt.Sprintf("***FAIL; Expected no errors, but got %v\n", err))
263+
return log.String(), errors.Mark(
264+
errors.Wrap(err, "***UNEXPECTED ERROR"),
265+
errRunInTxnFatalSentinel,
266+
)
267+
} else if pgErr := (pgx.PgError{}); !errors.As(err, &pgErr) || errors.As(err, &pgErr) && !w.opGen.expectedExecErrors.empty() && !w.opGen.expectedExecErrors.contains(pgcode.MakeCode(pgErr.Code)) {
268+
log.WriteString(fmt.Sprintf("***FAIL; Expected one of SQLSTATES %s, but got %v\n", w.opGen.expectedExecErrors.string(), err))
269+
return log.String(), errors.Mark(
270+
errors.Wrap(err, "***UNEXPECTED ERROR"),
271+
errRunInTxnFatalSentinel,
272+
)
273+
}
274+
275+
log.WriteString(fmt.Sprintf("ROLLBACK; expected SQLSTATE(S) %s, and got %v\n", w.opGen.expectedExecErrors.string(), err))
276+
w.recordInHist(timeutil.Since(start), txnRollback)
277+
return log.String(), errors.Mark(
278+
err,
279+
errRunInTxnRbkSentinel,
280+
)
281+
}
282+
283+
// TODO(jayshrivastava): Once all operations support error screening, delete this default and remove w.opGen.screenForExecErrors state
284+
w.recordInHist(timeutil.Since(start), txnRollback)
285+
log.WriteString(fmt.Sprintf("ROLLBACK; %v\n", err))
243286
return log.String(), errors.Mark(err, errRunInTxnRbkSentinel)
287+
288+
}
289+
if w.opGen.screenForExecErrors {
290+
if !w.opGen.expectedExecErrors.empty() {
291+
log.WriteString(fmt.Sprintf("Expected SQLSTATE(S) %s, but got no errors\n", w.opGen.expectedExecErrors.string()))
292+
return log.String(), errors.Mark(
293+
errors.Errorf("***UNEXPECTED SUCCESS"),
294+
errRunInTxnFatalSentinel,
295+
)
296+
}
244297
}
245-
elapsed := timeutil.Since(start)
246-
w.hists.Get(histBin).Record(elapsed)
298+
w.recordInHist(timeutil.Since(start), operationOk)
247299
}
248300
}
249301
return log.String(), nil
@@ -257,6 +309,7 @@ func (w *schemaChangeWorker) run(_ context.Context) error {
257309

258310
// Run between 1 and maxOpsPerWorker schema change operations.
259311
start := timeutil.Now()
312+
w.opGen.resetTxnState()
260313
logs, err := w.runInTxn(tx)
261314
logs = "BEGIN\n" + logs
262315
defer func() {
@@ -268,29 +321,31 @@ func (w *schemaChangeWorker) run(_ context.Context) error {
268321
if err != nil {
269322
// Rollback in all cases to release the txn object and its conn pool.
270323
if rbkErr := tx.Rollback(); rbkErr != nil {
271-
return errors.Wrapf(err, "Could not rollback %v", rbkErr)
324+
return errors.Wrapf(err, "***UNEXPECTED ERROR IN ROLLBACK %v", rbkErr)
272325
}
273326
switch {
274327
case errors.Is(err, errRunInTxnFatalSentinel):
275328
return err
276329
case errors.Is(err, errRunInTxnRbkSentinel):
330+
// TODO(jayshrivastava): Once all operations support error screening, return nil
331+
// All unexpected or non pg errors will be fatal, and all rollbacks will be expected
277332
if seriousErr := handleOpError(err); seriousErr != nil {
278333
return seriousErr
279334
}
280335
return nil
281336
default:
282-
return errors.Wrapf(err, "Unexpected error")
337+
return errors.Wrapf(err, "***UNEXPECTED ERROR")
283338
}
284339
}
285340

286341
// If there were no errors commit the txn.
287-
histBin := "txnOk"
342+
histBin := txnOk
288343
cmtErrMsg := ""
289344
if err = tx.Commit(); err != nil {
290-
histBin = "txnCmtErr"
345+
histBin = txnCommitError
291346
cmtErrMsg = fmt.Sprintf("***FAIL: %v", err)
292347
}
293-
w.hists.Get(histBin).Record(timeutil.Since(start))
348+
w.recordInHist(timeutil.Since(start), histBin)
294349
logs = logs + fmt.Sprintf("COMMIT; %s\n", cmtErrMsg)
295350
return nil
296351
}

0 commit comments

Comments
 (0)