@@ -17,7 +17,9 @@ import (
1717 "math/rand"
1818 "runtime"
1919 "strings"
20+ "time"
2021
22+ "github.com/cockroachdb/cockroach/pkg/sql/pgwire/pgcode"
2123 "github.com/cockroachdb/cockroach/pkg/util/timeutil"
2224 "github.com/cockroachdb/cockroach/pkg/workload"
2325 "github.com/cockroachdb/cockroach/pkg/workload/histogram"
@@ -216,11 +218,29 @@ var (
216218 errRunInTxnRbkSentinel = errors .New ("txn needs to rollback" )
217219)
218220
221+ type histBin int
222+
223+ const (
224+ operationOk histBin = iota
225+ txnOk
226+ txnCommitError
227+ txnRollback
228+ )
229+
230+ func (d histBin ) String () string {
231+ return [... ]string {"opOk" , "txnOk" , "txnCmtErr" , "txnRbk" }[d ]
232+ }
233+
234+ func (w * schemaChangeWorker ) recordInHist (elapsed time.Duration , bin histBin ) {
235+ w .hists .Get (bin .String ()).Record (elapsed )
236+ }
237+
219238func (w * schemaChangeWorker ) runInTxn (tx * pgx.Tx ) (string , error ) {
220239 var log strings.Builder
221240 opsNum := 1 + w .opGen .randIntn (w .maxOpsPerWorker )
222241
223242 for i := 0 ; i < opsNum ; i ++ {
243+ w .opGen .resetOpState ()
224244 op , noops , err := w .opGen .randOp (tx )
225245 if err != nil {
226246 return noops , errors .Mark (
@@ -234,16 +254,48 @@ func (w *schemaChangeWorker) runInTxn(tx *pgx.Tx) (string, error) {
234254 }
235255 log .WriteString (fmt .Sprintf (" %s;\n " , op ))
236256 if ! w .dryRun {
237- histBin := "opOk"
238257 start := timeutil .Now ()
258+
239259 if _ , err = tx .Exec (op ); err != nil {
240- histBin = "txnRbk"
241- log .WriteString (fmt .Sprintf ("***FAIL: %v\n " , err ))
242- log .WriteString ("ROLLBACK;\n " )
260+ if w .opGen .screenForExecErrors {
261+ if w .opGen .expectedExecErrors .empty () {
262+ log .WriteString (fmt .Sprintf ("***FAIL; Expected no errors, but got %v\n " , err ))
263+ return log .String (), errors .Mark (
264+ errors .Wrap (err , "***UNEXPECTED ERROR" ),
265+ errRunInTxnFatalSentinel ,
266+ )
267+ } else if pgErr := (pgx.PgError {}); ! errors .As (err , & pgErr ) || errors .As (err , & pgErr ) && ! w .opGen .expectedExecErrors .empty () && ! w .opGen .expectedExecErrors .contains (pgcode .MakeCode (pgErr .Code )) {
268+ log .WriteString (fmt .Sprintf ("***FAIL; Expected one of SQLSTATES %s, but got %v\n " , w .opGen .expectedExecErrors .string (), err ))
269+ return log .String (), errors .Mark (
270+ errors .Wrap (err , "***UNEXPECTED ERROR" ),
271+ errRunInTxnFatalSentinel ,
272+ )
273+ }
274+
275+ log .WriteString (fmt .Sprintf ("ROLLBACK; expected SQLSTATE(S) %s, and got %v\n " , w .opGen .expectedExecErrors .string (), err ))
276+ w .recordInHist (timeutil .Since (start ), txnRollback )
277+ return log .String (), errors .Mark (
278+ err ,
279+ errRunInTxnRbkSentinel ,
280+ )
281+ }
282+
283+ // TODO(jayshrivastava): Once all operations support error screening, delete this default and remove w.opGen.screenForExecErrors state
284+ w .recordInHist (timeutil .Since (start ), txnRollback )
285+ log .WriteString (fmt .Sprintf ("ROLLBACK; %v\n " , err ))
243286 return log .String (), errors .Mark (err , errRunInTxnRbkSentinel )
287+
288+ }
289+ if w .opGen .screenForExecErrors {
290+ if ! w .opGen .expectedExecErrors .empty () {
291+ log .WriteString (fmt .Sprintf ("Expected SQLSTATE(S) %s, but got no errors\n " , w .opGen .expectedExecErrors .string ()))
292+ return log .String (), errors .Mark (
293+ errors .Errorf ("***UNEXPECTED SUCCESS" ),
294+ errRunInTxnFatalSentinel ,
295+ )
296+ }
244297 }
245- elapsed := timeutil .Since (start )
246- w .hists .Get (histBin ).Record (elapsed )
298+ w .recordInHist (timeutil .Since (start ), operationOk )
247299 }
248300 }
249301 return log .String (), nil
@@ -257,6 +309,7 @@ func (w *schemaChangeWorker) run(_ context.Context) error {
257309
258310 // Run between 1 and maxOpsPerWorker schema change operations.
259311 start := timeutil .Now ()
312+ w .opGen .resetTxnState ()
260313 logs , err := w .runInTxn (tx )
261314 logs = "BEGIN\n " + logs
262315 defer func () {
@@ -268,29 +321,31 @@ func (w *schemaChangeWorker) run(_ context.Context) error {
268321 if err != nil {
269322 // Rollback in all cases to release the txn object and its conn pool.
270323 if rbkErr := tx .Rollback (); rbkErr != nil {
271- return errors .Wrapf (err , "Could not rollback %v" , rbkErr )
324+ return errors .Wrapf (err , "***UNEXPECTED ERROR IN ROLLBACK %v" , rbkErr )
272325 }
273326 switch {
274327 case errors .Is (err , errRunInTxnFatalSentinel ):
275328 return err
276329 case errors .Is (err , errRunInTxnRbkSentinel ):
330+ // TODO(jayshrivastava): Once all operations support error screening, return nil
331+ // All unexpected or non pg errors will be fatal, and all rollbacks will be expected
277332 if seriousErr := handleOpError (err ); seriousErr != nil {
278333 return seriousErr
279334 }
280335 return nil
281336 default :
282- return errors .Wrapf (err , "Unexpected error " )
337+ return errors .Wrapf (err , "***UNEXPECTED ERROR " )
283338 }
284339 }
285340
286341 // If there were no errors commit the txn.
287- histBin := " txnOk"
342+ histBin := txnOk
288343 cmtErrMsg := ""
289344 if err = tx .Commit (); err != nil {
290- histBin = "txnCmtErr"
345+ histBin = txnCommitError
291346 cmtErrMsg = fmt .Sprintf ("***FAIL: %v" , err )
292347 }
293- w .hists . Get ( histBin ). Record ( timeutil .Since (start ))
348+ w .recordInHist ( timeutil .Since (start ), histBin )
294349 logs = logs + fmt .Sprintf ("COMMIT; %s\n " , cmtErrMsg )
295350 return nil
296351}
0 commit comments