Skip to content

Commit c00ea84

Browse files
author
Lidor Carmel
committed
kv,kvcoord,sql: poison txnCoordSender after a retryable error
Previously kv users could lose parts of a transaction without getting an error. After Send() returned a retryable error the state of txn got reset which made it usable again. If the caller ignored the error they could continue applying more operations without realizing the first part of the transaction was discarded. See more details in the issue (#22615). The simple case example is where the retryable closure of DB.Txn() returns nil instead of returning the retryable error back to the retry loop - in this case the retry loop declares success without realizing we lost the first part of the transaction (all the operations before the retryable error). This PR leaves the txn in a "poisoned" state after encountering an error, so that all future operations fail fast. The caller is therefore expected to reset the txn handle back to a usable state intentionally, by calling Txn.PrepareForRetry(). In the simple case of DB.Txn() the retry loop will reset the handle and run the retry even if the callback returned nil. Closes #22615 Release note: None
1 parent de11fd7 commit c00ea84

13 files changed

Lines changed: 344 additions & 83 deletions

pkg/kv/db.go

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -851,6 +851,18 @@ func (db *DB) NewTxn(ctx context.Context, debugName string) *Txn {
851851
// from recoverable internal errors, and is automatically committed
852852
// otherwise. The retryable function should have no side effects which could
853853
// cause problems in the event it must be run more than once.
854+
// For example:
855+
// err := db.Txn(ctx, func(ctx context.Context, txn *kv.Txn) error {
856+
// if kv, err := txn.Get(ctx, key); err != nil {
857+
// return err
858+
// }
859+
// // ...
860+
// return nil
861+
// })
862+
// Note that once the transaction encounters a retryable error, the txn object
863+
// is marked as poisoned and all future ops fail fast until the retry. The
864+
// callback may return either nil or the retryable error. Txn is responsible for
865+
// resetting the transaction and retrying the callback.
854866
func (db *DB) Txn(ctx context.Context, retryable func(context.Context, *Txn) error) error {
855867
// TODO(radu): we should open a tracing Span here (we need to figure out how
856868
// to use the correct tracer).

pkg/kv/db_test.go

Lines changed: 82 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ package kv_test
1313
import (
1414
"bytes"
1515
"context"
16+
"fmt"
1617
"testing"
1718
"time"
1819

@@ -715,3 +716,84 @@ func TestGenerateForcedRetryableError(t *testing.T) {
715716
require.True(t, errors.As(err, &retryErr))
716717
require.Equal(t, 1, int(retryErr.Transaction.Epoch))
717718
}
719+
720+
// Get a retryable error within a db.Txn transaction and verify the retry
721+
// succeeds. We are verifying the behavior is the same whether the retryable
722+
// callback returns the retryable error or returns nil. Both implementations are
723+
// legal - returning early (with either nil or the error) after a retryable
724+
// error is optional.
725+
func TestDB_TxnRetry(t *testing.T) {
726+
defer leaktest.AfterTest(t)()
727+
defer log.Scope(t).Close(t)
728+
s, db := setup(t)
729+
defer s.Stopper().Stop(context.Background())
730+
731+
testutils.RunTrueAndFalse(t, "returnNil", func(t *testing.T, returnNil bool) {
732+
keyA := fmt.Sprintf("a_return_nil_%t", returnNil)
733+
keyB := fmt.Sprintf("b_return_nil_%t", returnNil)
734+
runNumber := 0
735+
err := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
736+
require.NoError(t, txn.Put(ctx, keyA, "1"))
737+
require.NoError(t, txn.Put(ctx, keyB, "1"))
738+
739+
{
740+
// High priority txn - will abort the other txn.
741+
hpTxn := kv.NewTxn(ctx, db, 0)
742+
require.NoError(t, hpTxn.SetUserPriority(roachpb.MaxUserPriority))
743+
// Only write if we have not written before, because otherwise we will keep aborting
744+
// the other txn forever.
745+
r, err := hpTxn.Get(ctx, keyA)
746+
require.NoError(t, err)
747+
if !r.Exists() {
748+
require.Zero(t, runNumber)
749+
require.NoError(t, hpTxn.Put(ctx, keyA, "hp txn"))
750+
require.NoError(t, hpTxn.Commit(ctx))
751+
} else {
752+
// We already wrote to keyA, meaning this is a retry, no need to write again.
753+
require.Equal(t, 1, runNumber)
754+
require.NoError(t, hpTxn.Rollback(ctx))
755+
}
756+
}
757+
758+
// Read, so that we'll get a retryable error.
759+
r, err := txn.Get(ctx, keyA)
760+
if runNumber == 0 {
761+
// First run, we should get a retryable error.
762+
require.Zero(t, runNumber)
763+
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err)
764+
require.Equal(t, []byte(nil), r.ValueBytes())
765+
766+
// At this point txn is poisoned, and any op returns the same (poisoning) error.
767+
r, err = txn.Get(ctx, keyB)
768+
require.IsType(t, &roachpb.TransactionRetryWithProtoRefreshError{}, err)
769+
require.Equal(t, []byte(nil), r.ValueBytes())
770+
} else {
771+
// The retry should succeed.
772+
require.Equal(t, 1, runNumber)
773+
require.NoError(t, err)
774+
require.Equal(t, []byte("1"), r.ValueBytes())
775+
}
776+
runNumber++
777+
778+
if returnNil {
779+
return nil
780+
}
781+
// Return the retryable error.
782+
return err
783+
})
784+
require.NoError(t, err)
785+
require.Equal(t, 2, runNumber)
786+
787+
err1 := db.Txn(context.Background(), func(ctx context.Context, txn *kv.Txn) error {
788+
// The high priority txn was overwritten by the successful retry.
789+
kv, e1 := txn.Get(ctx, keyA)
790+
require.NoError(t, e1)
791+
require.Equal(t, []byte("1"), kv.ValueBytes())
792+
kv, e2 := txn.Get(ctx, keyB)
793+
require.NoError(t, e2)
794+
require.Equal(t, []byte("1"), kv.ValueBytes())
795+
return nil
796+
})
797+
require.NoError(t, err1)
798+
})
799+
}

pkg/kv/kvclient/kvcoord/testdata/savepoints

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -486,6 +486,11 @@ savepoint x
486486
abort
487487
----
488488
(*roachpb.TransactionRetryWithProtoRefreshError)
489+
txn id not changed
490+
491+
reset
492+
----
493+
txn error cleared
489494
txn id changed
490495

491496
release x

pkg/kv/kvclient/kvcoord/txn_coord_sender.go

Lines changed: 40 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,12 @@ const (
5050
// txnPending is the normal state for ongoing transactions.
5151
txnPending txnState = iota
5252

53+
// txnRetryableError means that the transaction encountered a
54+
// TransactionRetryWithProtoRefreshError, and calls to Send() fail in this
55+
// state. It is possible to move back to txnPending by calling
56+
// ClearTxnRetryableErr().
57+
txnRetryableError
58+
5359
// txnError means that a batch encountered a non-retriable error. Further
5460
// batches except EndTxn(commit=false) will be rejected.
5561
txnError
@@ -105,6 +111,11 @@ type TxnCoordSender struct {
105111
syncutil.Mutex
106112

107113
txnState txnState
114+
115+
// storedRetryableErr is set when txnState == txnRetryableError. This
116+
// storedRetryableErr is returned to clients on Send().
117+
storedRetryableErr *roachpb.TransactionRetryWithProtoRefreshError
118+
108119
// storedErr is set when txnState == txnError. This storedErr is returned to
109120
// clients on Send().
110121
storedErr *roachpb.Error
@@ -686,6 +697,8 @@ func (tc *TxnCoordSender) maybeRejectClientLocked(
686697
switch tc.mu.txnState {
687698
case txnPending:
688699
// All good.
700+
case txnRetryableError:
701+
return roachpb.NewError(tc.mu.storedRetryableErr)
689702
case txnError:
690703
return tc.mu.storedErr
691704
case txnFinalized:
@@ -813,6 +826,11 @@ func (tc *TxnCoordSender) handleRetryableErrLocked(
813826
errTxnID, // the id of the transaction that encountered the error
814827
newTxn)
815828

829+
// Move to a retryable error state, where all Send() calls fail until the
830+
// state is cleared.
831+
tc.mu.txnState = txnRetryableError
832+
tc.mu.storedRetryableErr = retErr
833+
816834
// If the ID changed, it means we had to start a new transaction and the
817835
// old one is toast. This TxnCoordSender cannot be used any more - future
818836
// Send() calls will be rejected; the client is supposed to create a new
@@ -1357,3 +1375,25 @@ func (tc *TxnCoordSender) DeferCommitWait(ctx context.Context) func(context.Cont
13571375
return tc.maybeCommitWait(ctx, true /* deferred */)
13581376
}
13591377
}
1378+
1379+
// GetTxnRetryableErr is part of the TxnSender interface.
1380+
func (tc *TxnCoordSender) GetTxnRetryableErr(
1381+
ctx context.Context,
1382+
) *roachpb.TransactionRetryWithProtoRefreshError {
1383+
tc.mu.Lock()
1384+
defer tc.mu.Unlock()
1385+
if tc.mu.txnState == txnRetryableError {
1386+
return tc.mu.storedRetryableErr
1387+
}
1388+
return nil
1389+
}
1390+
1391+
// ClearTxnRetryableErr is part of the TxnSender interface.
1392+
func (tc *TxnCoordSender) ClearTxnRetryableErr(ctx context.Context) {
1393+
tc.mu.Lock()
1394+
defer tc.mu.Unlock()
1395+
if tc.mu.txnState == txnRetryableError {
1396+
tc.mu.storedRetryableErr = nil
1397+
tc.mu.txnState = txnPending
1398+
}
1399+
}

pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints.go

Lines changed: 2 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -126,16 +126,12 @@ func (tc *TxnCoordSender) RollbackToSavepoint(ctx context.Context, s kv.Savepoin
126126
err = roachpb.NewTransactionRetryWithProtoRefreshError(
127127
"cannot rollback to savepoint after a transaction restart",
128128
tc.mu.txn.ID,
129-
// The transaction inside this error doesn't matter.
130-
roachpb.Transaction{},
129+
tc.mu.txn,
131130
)
132131
}
133132
return err
134133
}
135134

136-
// Restore the transaction's state, in case we're rewiding after an error.
137-
tc.mu.txnState = txnPending
138-
139135
tc.mu.active = sp.active
140136

141137
for _, reqInt := range tc.interceptorStack {
@@ -173,8 +169,7 @@ func (tc *TxnCoordSender) ReleaseSavepoint(ctx context.Context, s kv.SavepointTo
173169
err = roachpb.NewTransactionRetryWithProtoRefreshError(
174170
"cannot release savepoint after a transaction restart",
175171
tc.mu.txn.ID,
176-
// The transaction inside this error doesn't matter.
177-
roachpb.Transaction{},
172+
tc.mu.txn,
178173
)
179174
}
180175
return err

pkg/kv/kvclient/kvcoord/txn_coord_sender_savepoints_test.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -124,6 +124,16 @@ func TestSavepoints(t *testing.T) {
124124
}
125125
fmt.Fprintf(&buf, "txn id %s\n", changed)
126126

127+
case "reset":
128+
prevID := txn.ID()
129+
txn.PrepareForRetry(ctx)
130+
changed := "changed"
131+
if prevID == txn.ID() {
132+
changed = "not changed"
133+
}
134+
fmt.Fprintf(&buf, "txn error cleared\n")
135+
fmt.Fprintf(&buf, "txn id %s\n", changed)
136+
127137
case "put":
128138
b := txn.NewBatch()
129139
b.Put(td.CmdArgs[0].Key, td.CmdArgs[1].Key)

0 commit comments

Comments
 (0)