Skip to content

Commit 574e805

Browse files
committed
sql: rank the errors received by the DistSQLReceiver
A DistSQL flow can potentially return many errors; different sub-flows from different nodes, and different processors within a flow, can all generate different errors. Before this patch, the first one to make it to the receiver was the one presented to the client. This patch adds more smarts be chosing the "best" error. The ranking is as follows, from high precedence to low: - non-retriable error - TxnAbortedError - other retriable errors Release note: None
1 parent d55a15c commit 574e805

8 files changed

Lines changed: 163 additions & 48 deletions

File tree

pkg/internal/client/txn.go

Lines changed: 5 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -893,17 +893,17 @@ func (txn *Txn) replaceSenderIfTxnAbortedLocked(
893893
// transaction attempt.
894894
newTxn := &retryErr.Transaction
895895

896-
if txn.mu.ID == newTxn.ID {
897-
// We don't need a new transaction as a result of this error. Nothing more
898-
// to do.
899-
return
900-
}
901896
if txn.mu.ID != origTxnID {
902897
// The transaction has changed since the request that generated the error
903898
// was sent. Nothing more to do.
904899
log.VEventf(ctx, 2, "retriable error for old incarnation of the transaction")
905900
return
906901
}
902+
if !retryErr.PrevTxnAborted() {
903+
// We don't need a new transaction as a result of this error. Nothing more
904+
// to do.
905+
return
906+
}
907907

908908
// The ID changed, which means that the cause was a TransactionAbortedError;
909909
// we've created a new Transaction that we're about to start using, so we save

pkg/roachpb/errors.go

Lines changed: 13 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -349,7 +349,19 @@ func NewTransactionAbortedError(reason TransactionAbortedReason) *TransactionAbo
349349
func NewTransactionRetryWithProtoRefreshError(
350350
msg string, txnID uuid.UUID, txn Transaction,
351351
) *TransactionRetryWithProtoRefreshError {
352-
return &TransactionRetryWithProtoRefreshError{Msg: msg, TxnID: txnID, Transaction: txn}
352+
return &TransactionRetryWithProtoRefreshError{
353+
Msg: msg,
354+
TxnID: txnID,
355+
Transaction: txn,
356+
}
357+
}
358+
359+
// PrevTxnAborted returns true if this error originated from a
360+
// TransactionAbortedError. If true, the client will need to create a new
361+
// transaction, as opposed to continuing with the existing one at a bumped
362+
// epoch.
363+
func (e *TransactionRetryWithProtoRefreshError) PrevTxnAborted() bool {
364+
return !e.TxnID.Equal(e.Transaction.ID)
353365
}
354366

355367
// NewTransactionPushError initializes a new TransactionPushError.

pkg/roachpb/errors.proto

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -327,10 +327,10 @@ message RaftGroupDeletedError {
327327
message ReplicaCorruptionError {
328328
option (gogoproto.equal) = true;
329329

330-
optional string error_msg = 1 [(gogoproto.nullable) = false];;
330+
optional string error_msg = 1 [(gogoproto.nullable) = false];
331331
// processed indicates that the error has been taken into account and
332332
// necessary steps will be taken. For now, required for testing.
333-
optional bool processed = 2 [(gogoproto.nullable) = false];;
333+
optional bool processed = 2 [(gogoproto.nullable) = false];
334334
}
335335

336336
// ReplicaTooOldError is sent in response to a raft message when the

pkg/sql/conn_executor_exec.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -154,9 +154,9 @@ func (ex *connExecutor) execStmtInOpenState(
154154
// nicer to look at for the client.
155155
if ctx.Err() != nil && res.Err() != nil {
156156
if queryTimedOut {
157-
res.OverwriteError(sqlbase.QueryTimeoutError)
157+
res.SetError(sqlbase.QueryTimeoutError)
158158
} else {
159-
res.OverwriteError(sqlbase.QueryCanceledError)
159+
res.SetError(sqlbase.QueryCanceledError)
160160
}
161161
}
162162
}

pkg/sql/conn_io.go

Lines changed: 4 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -627,19 +627,14 @@ type CommandResult interface {
627627
// query execution error.
628628
type CommandResultErrBase interface {
629629
// SetError accumulates an execution error that needs to be reported to the
630-
// client. No further calls other than OverwriteError(), Close() and Discard()
631-
// are allowed. In particular, CloseWithErr() is not allowed.
630+
// client. No further calls other than SetError(), Close()/CloseWithError()
631+
// and Discard() are allowed.
632+
//
633+
// Calling SetError() a second time overwrites the previously set error.
632634
SetError(error)
633635

634636
// Err returns the error previously set with SetError(), if any.
635637
Err() error
636-
637-
// OverwriteError is like SetError(), except it can be called after SetError()
638-
// has already been called and it will overwrite the error. Used by high-level
639-
// code when it has a strong opinion about what the error that should be
640-
// returned to the client is and doesn't much care about whether an error has
641-
// already been set on the result.
642-
OverwriteError(error)
643638
}
644639

645640
// ResultBase is the common interface implemented by all the different command
@@ -884,11 +879,6 @@ func (r *bufferedCommandResult) SetError(err error) {
884879
r.err = err
885880
}
886881

887-
// OverwriteError is part of the RestrictedCommandResult interface.
888-
func (r *bufferedCommandResult) OverwriteError(err error) {
889-
r.err = err
890-
}
891-
892882
// Err is part of the RestrictedCommandResult interface.
893883
func (r *bufferedCommandResult) Err() error {
894884
return r.err

pkg/sql/distsql_running.go

Lines changed: 53 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -262,6 +262,17 @@ func (dsp *DistSQLPlanner) Run(
262262
flow.Cleanup(ctx)
263263
}
264264

265+
// errorPriority is used to rank errors such that the "best" one is chosen to be
266+
// presented as the query result.
267+
type errorPriority int
268+
269+
const (
270+
scoreNoError errorPriority = iota
271+
scoreTxnRestart
272+
scoreTxnAbort
273+
scoreNonRetriable
274+
)
275+
265276
// DistSQLReceiver is a RowReceiver that writes results to a rowResultWriter.
266277
// This is where the DistSQL execution meets the SQL Session - the RowContainer
267278
// comes from a client Session.
@@ -467,23 +478,27 @@ func (r *DistSQLReceiver) Push(
467478
errors.Errorf("received a leaf TxnCoordMeta (%s); but have no root", meta.TxnCoordMeta))
468479
}
469480
}
470-
if meta.Err != nil && r.resultWriter.Err() == nil {
471-
if r.txn != nil {
472-
if retryErr, ok := meta.Err.(*roachpb.UnhandledRetryableError); ok {
473-
// Update the txn in response to remote errors. In the non-DistSQL
474-
// world, the TxnCoordSender handles "unhandled" retryable errors,
475-
// but this one is coming from a distributed SQL node, which has
476-
// left the handling up to the root transaction.
477-
meta.Err = r.txn.UpdateStateOnRemoteRetryableErr(r.ctx, &retryErr.PErr)
478-
// Update the clock with information from the error. On non-DistSQL
479-
// code paths, the DistSender does this.
480-
// TODO(andrei): We don't propagate clock signals on success cases
481-
// through DistSQL; we should. We also don't propagate them through
482-
// non-retryable errors; we also should.
483-
r.updateClock(retryErr.PErr.Now)
481+
if meta.Err != nil {
482+
// Check if the error we just received should take precedence over a
483+
// previous error (if any).
484+
if errPriority(meta.Err) > errPriority(r.resultWriter.Err()) {
485+
if r.txn != nil {
486+
if retryErr, ok := meta.Err.(*roachpb.UnhandledRetryableError); ok {
487+
// Update the txn in response to remote errors. In the non-DistSQL
488+
// world, the TxnCoordSender handles "unhandled" retryable errors,
489+
// but this one is coming from a distributed SQL node, which has
490+
// left the handling up to the root transaction.
491+
meta.Err = r.txn.UpdateStateOnRemoteRetryableErr(r.ctx, &retryErr.PErr)
492+
// Update the clock with information from the error. On non-DistSQL
493+
// code paths, the DistSender does this.
494+
// TODO(andrei): We don't propagate clock signals on success cases
495+
// through DistSQL; we should. We also don't propagate them through
496+
// non-retryable errors; we also should.
497+
r.updateClock(retryErr.PErr.Now)
498+
}
484499
}
500+
r.resultWriter.SetError(meta.Err)
485501
}
486-
r.resultWriter.SetError(meta.Err)
487502
}
488503
if len(meta.Ranges) > 0 {
489504
if err := r.updateCaches(r.ctx, meta.Ranges); err != nil && r.resultWriter.Err() == nil {
@@ -562,6 +577,29 @@ func (r *DistSQLReceiver) Push(
562577
return r.status
563578
}
564579

580+
// errPriority computes the priority of err.
581+
func errPriority(err error) errorPriority {
582+
if err == nil {
583+
return scoreNoError
584+
}
585+
if retryErr, ok := err.(*roachpb.UnhandledRetryableError); ok {
586+
pErr := retryErr.PErr
587+
switch pErr.GetDetail().(type) {
588+
case *roachpb.TransactionAbortedError:
589+
return scoreTxnAbort
590+
default:
591+
return scoreTxnRestart
592+
}
593+
}
594+
if retryErr, ok := err.(*roachpb.TransactionRetryWithProtoRefreshError); ok {
595+
if retryErr.PrevTxnAborted() {
596+
return scoreTxnAbort
597+
}
598+
return scoreTxnRestart
599+
}
600+
return scoreNonRetriable
601+
}
602+
565603
// ProducerDone is part of the RowReceiver interface.
566604
func (r *DistSQLReceiver) ProducerDone() {
567605
if r.txn != nil {

pkg/sql/distsql_running_test.go

Lines changed: 83 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,6 +25,7 @@ import (
2525
"github.com/cockroachdb/cockroach/pkg/kv"
2626
"github.com/cockroachdb/cockroach/pkg/roachpb"
2727
"github.com/cockroachdb/cockroach/pkg/security"
28+
"github.com/cockroachdb/cockroach/pkg/sql/distsqlrun"
2829
"github.com/cockroachdb/cockroach/pkg/sql/parser"
2930
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
3031
"github.com/cockroachdb/cockroach/pkg/testutils"
@@ -180,3 +181,85 @@ func TestDistSQLRunningInAbortedTxn(t *testing.T) {
180181
t.Fatalf("didn't find expected message in trace: %s", clientRejectedMsg)
181182
}
182183
}
184+
185+
// Test that the DistSQLReceiver overwrites previous errors as "better" errors
186+
// come along.
187+
func TestDistSQLReceiverErrorRanking(t *testing.T) {
188+
defer leaktest.AfterTest(t)()
189+
190+
// This test goes through the trouble of creating a server because it wants to
191+
// create a txn. It creates the txn because it wants to test an interaction
192+
// between the DistSQLReceiver and the TxnCoordSender: the DistSQLReceiver
193+
// will feed retriable errors to the TxnCoordSender which will change those
194+
// errors to TransactionRetryWithProtoRefreshError.
195+
ctx := context.Background()
196+
s, _, db := serverutils.StartServer(t, base.TestServerArgs{})
197+
defer s.Stopper().Stop(ctx)
198+
199+
txn := client.NewTxn(ctx, db, s.NodeID(), client.RootTxn)
200+
201+
// We're going to use a rowResultWriter to which only errors will be passed.
202+
rw := newCallbackResultWriter(nil /* fn */)
203+
recv := MakeDistSQLReceiver(
204+
ctx,
205+
rw,
206+
tree.Rows, /* StatementType */
207+
nil, /* rangeCache */
208+
nil, /* leaseCache */
209+
txn,
210+
func(hlc.Timestamp) {}, /* updateClock */
211+
&SessionTracing{},
212+
)
213+
214+
retryErr := roachpb.NewErrorWithTxn(
215+
roachpb.NewTransactionRetryError(
216+
roachpb.RETRY_SERIALIZABLE),
217+
txn.Serialize()).GoError()
218+
219+
abortErr := roachpb.NewErrorWithTxn(
220+
roachpb.NewTransactionAbortedError(
221+
roachpb.ABORT_REASON_ABORTED_RECORD_FOUND),
222+
txn.Serialize()).GoError()
223+
224+
errs := []struct {
225+
err error
226+
expErr string
227+
}{
228+
{
229+
// Initial error, retriable.
230+
err: retryErr,
231+
expErr: "TransactionRetryWithProtoRefreshError: TransactionRetryError",
232+
},
233+
{
234+
// A TransactionAbortedError overwrites another retriable one.
235+
err: abortErr,
236+
expErr: "TransactionRetryWithProtoRefreshError: TransactionAbortedError",
237+
},
238+
{
239+
// A non-aborted retriable error does not overried the
240+
// TransactionAbortedError.
241+
err: retryErr,
242+
expErr: "TransactionRetryWithProtoRefreshError: TransactionAbortedError",
243+
},
244+
{
245+
// A non-retriable error overwrites a retriable one.
246+
err: fmt.Errorf("err1"),
247+
expErr: "err1",
248+
},
249+
{
250+
// Another non-retriable error doesn't overwrite the previous one.
251+
err: fmt.Errorf("err2"),
252+
expErr: "err1",
253+
},
254+
}
255+
256+
for i, tc := range errs {
257+
recv.Push(nil, /* row */
258+
&distsqlrun.ProducerMetadata{
259+
Err: tc.err,
260+
})
261+
if !testutils.IsError(rw.Err(), tc.expErr) {
262+
t.Fatalf("%d: expected %s, got %s", i, tc.expErr, rw.Err())
263+
}
264+
}
265+
}

pkg/sql/pgwire/command_result.go

Lines changed: 1 addition & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -189,16 +189,8 @@ func (r *commandResult) Err() error {
189189
// SetError is part of the CommandResult interface.
190190
//
191191
// We're not going to write any bytes to the buffer in order to support future
192-
// OverwriteError() calls. The error will only be serialized at Close() time.
192+
// SetError() calls. The error will only be serialized at Close() time.
193193
func (r *commandResult) SetError(err error) {
194-
if r.err != nil {
195-
panic(fmt.Sprintf("can't overwrite err: %s with err: %s", r.err, err))
196-
}
197-
r.err = err
198-
}
199-
200-
// OverwriteError is part of the CommandResult interface.
201-
func (r *commandResult) OverwriteError(err error) {
202194
r.err = err
203195
}
204196

0 commit comments

Comments
 (0)