Skip to content

Commit aed892a

Browse files
committed
roachpb: consider txn epochs in Transaction.Update
Previously, Transaction.Update was pretty loose about which fields it updated given a receiver and an argument with different epochs. This relied on a number of assumptions about how these fields could change between epochs. This commit tightens this up by making this all more explicit. It also adds in some log warnings around calls to Update that we don't expect to see in practice. Release note: None
1 parent 9e76362 commit aed892a

2 files changed

Lines changed: 135 additions & 32 deletions

File tree

pkg/roachpb/data.go

Lines changed: 59 additions & 25 deletions
Original file line numberDiff line numberDiff line change
@@ -934,8 +934,12 @@ func (t *Transaction) Restart(
934934
// - the conflicting transaction's upgradePriority
935935
t.UpgradePriority(MakePriority(userPriority))
936936
t.UpgradePriority(upgradePriority)
937-
t.WriteTooOld = false
937+
// Reset all epoch-scoped state.
938938
t.Sequence = 0
939+
t.WriteTooOld = false
940+
t.OrigTimestampWasObserved = false
941+
t.IntentSpans = nil
942+
t.InFlightWrites = nil
939943
}
940944

941945
// BumpEpoch increments the transaction's epoch, allowing for an in-place
@@ -978,30 +982,68 @@ func (t *Transaction) Update(o *Transaction) {
978982
if t.ID == (uuid.UUID{}) {
979983
*t = *o
980984
return
985+
} else if t.ID != o.ID {
986+
log.Fatalf(context.Background(), "updating txn %v with different txn %v", t, o)
987+
return
981988
}
982989
if len(t.Key) == 0 {
983990
t.Key = o.Key
984991
}
985-
if !t.Status.IsFinalized() {
986-
if (t.Epoch < o.Epoch) || (t.Epoch == o.Epoch && o.Status != PENDING) {
987-
t.Status = o.Status
988-
}
989-
}
990992

991-
// If the epoch or refreshed timestamp move forward, overwrite
992-
// WriteTooOld, otherwise the flags are cumulative.
993-
if t.Epoch < o.Epoch || t.RefreshedTimestamp.Less(o.RefreshedTimestamp) {
993+
// Update epoch-scoped state, depending on the two transactions' epochs.
994+
if t.Epoch < o.Epoch {
995+
// Replace all epoch-scoped state.
996+
t.Epoch = o.Epoch
997+
t.Status = o.Status
994998
t.WriteTooOld = o.WriteTooOld
995999
t.OrigTimestampWasObserved = o.OrigTimestampWasObserved
996-
} else {
997-
t.WriteTooOld = t.WriteTooOld || o.WriteTooOld
998-
t.OrigTimestampWasObserved = t.OrigTimestampWasObserved || o.OrigTimestampWasObserved
999-
}
1000+
t.Sequence = o.Sequence
1001+
t.IntentSpans = o.IntentSpans
1002+
t.InFlightWrites = o.InFlightWrites
1003+
} else if t.Epoch == o.Epoch {
1004+
// Forward all epoch-scoped state.
1005+
switch t.Status {
1006+
case PENDING:
1007+
t.Status = o.Status
1008+
case STAGING:
1009+
if o.Status != PENDING {
1010+
t.Status = o.Status
1011+
}
1012+
case ABORTED:
1013+
if o.Status == COMMITTED {
1014+
log.Warningf(context.Background(), "updating ABORTED txn %v with COMMITTED txn %v", t, o)
1015+
}
1016+
case COMMITTED:
1017+
// Nothing to do.
1018+
}
10001019

1001-
if t.Epoch < o.Epoch {
1002-
t.Epoch = o.Epoch
1020+
// If the refreshed timestamp move forward, overwrite
1021+
// WriteTooOld, otherwise the flags are cumulative.
1022+
if t.RefreshedTimestamp.Less(o.RefreshedTimestamp) {
1023+
t.WriteTooOld = o.WriteTooOld
1024+
t.OrigTimestampWasObserved = o.OrigTimestampWasObserved
1025+
} else {
1026+
t.WriteTooOld = t.WriteTooOld || o.WriteTooOld
1027+
t.OrigTimestampWasObserved = t.OrigTimestampWasObserved || o.OrigTimestampWasObserved
1028+
}
1029+
1030+
if t.Sequence < o.Sequence {
1031+
t.Sequence = o.Sequence
1032+
}
1033+
if len(o.IntentSpans) > 0 {
1034+
t.IntentSpans = o.IntentSpans
1035+
}
1036+
if len(o.InFlightWrites) > 0 {
1037+
t.InFlightWrites = o.InFlightWrites
1038+
}
1039+
} else /* t.Epoch > o.Epoch */ {
1040+
// Ignore epoch-specific state from previous epoch.
1041+
if o.Status == COMMITTED {
1042+
log.Warningf(context.Background(), "updating txn %v with COMMITTED txn at earlier epoch %v", t, o)
1043+
}
10031044
}
10041045

1046+
// Forward each of the transaction timestamps.
10051047
t.Timestamp.Forward(o.Timestamp)
10061048
t.LastHeartbeat.Forward(o.LastHeartbeat)
10071049
t.OrigTimestamp.Forward(o.OrigTimestamp)
@@ -1026,17 +1068,9 @@ func (t *Transaction) Update(o *Transaction) {
10261068
for _, v := range o.ObservedTimestamps {
10271069
t.UpdateObservedTimestamp(v.NodeID, v.Timestamp)
10281070
}
1029-
t.UpgradePriority(o.Priority)
10301071

1031-
if t.Sequence < o.Sequence {
1032-
t.Sequence = o.Sequence
1033-
}
1034-
if len(o.IntentSpans) > 0 {
1035-
t.IntentSpans = o.IntentSpans
1036-
}
1037-
if len(o.InFlightWrites) > 0 {
1038-
t.InFlightWrites = o.InFlightWrites
1039-
}
1072+
// Ratchet the transaction priority.
1073+
t.UpgradePriority(o.Priority)
10401074
}
10411075

10421076
// UpgradePriority sets transaction priority to the maximum of current

pkg/roachpb/data_test.go

Lines changed: 76 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,7 @@ import (
2929
"github.com/cockroachdb/cockroach/pkg/util/encoding"
3030
"github.com/cockroachdb/cockroach/pkg/util/hlc"
3131
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
32+
"github.com/cockroachdb/cockroach/pkg/util/log"
3233
"github.com/cockroachdb/cockroach/pkg/util/protoutil"
3334
"github.com/cockroachdb/cockroach/pkg/util/randutil"
3435
"github.com/cockroachdb/cockroach/pkg/util/uuid"
@@ -502,22 +503,74 @@ func TestTransactionUpdate(t *testing.T) {
502503
t.Fatal(err)
503504
}
504505

506+
// Updating an empty Transaction copies all fields.
505507
var txn2 Transaction
506508
txn2.Update(&txn)
507509

508-
if err := zerofields.NoZeroField(txn2); err != nil {
509-
t.Fatal(err)
510-
}
510+
expTxn2 := txn
511+
require.Equal(t, expTxn2, txn2)
511512

513+
// Updating a Transaction at an earlier epoch replaces all epoch-scoped fields.
512514
var txn3 Transaction
513-
txn3.ID = uuid.MakeV4()
515+
txn3.ID = txn.ID
516+
txn3.Epoch = txn.Epoch - 1
517+
txn3.Status = STAGING
514518
txn3.Name = "carl"
515519
txn3.Priority = 123
516520
txn3.Update(&txn)
517521

518-
if err := zerofields.NoZeroField(txn3); err != nil {
519-
t.Fatal(err)
520-
}
522+
expTxn3 := txn
523+
expTxn3.Name = "carl"
524+
require.Equal(t, expTxn3, txn3)
525+
526+
// Updating a Transaction at the same epoch forwards all epoch-scoped fields.
527+
var txn4 Transaction
528+
txn4.ID = txn.ID
529+
txn4.Epoch = txn.Epoch
530+
txn4.Status = STAGING
531+
txn4.Sequence = txn.Sequence + 10
532+
txn4.Name = "carl"
533+
txn4.Priority = 123
534+
txn4.Update(&txn)
535+
536+
expTxn4 := txn
537+
expTxn4.Name = "carl"
538+
expTxn4.Sequence = txn.Sequence + 10
539+
require.Equal(t, expTxn4, txn4)
540+
541+
// Updating a Transaction at a future epoch ignores all epoch-scoped fields.
542+
var txn5 Transaction
543+
txn5.ID = txn.ID
544+
txn5.Epoch = txn.Epoch + 1
545+
txn5.Status = PENDING
546+
txn5.Sequence = txn.Sequence - 10
547+
txn5.Name = "carl"
548+
txn5.Priority = 123
549+
txn5.Update(&txn)
550+
551+
expTxn5 := txn
552+
expTxn5.Name = "carl"
553+
expTxn5.Epoch = txn.Epoch + 1
554+
expTxn5.Status = PENDING
555+
expTxn5.Sequence = txn.Sequence - 10
556+
expTxn5.IntentSpans = nil
557+
expTxn5.InFlightWrites = nil
558+
expTxn5.WriteTooOld = false
559+
expTxn5.OrigTimestampWasObserved = false
560+
require.Equal(t, expTxn5, txn5)
561+
562+
// Updating a different transaction fatals.
563+
var exited bool
564+
log.SetExitFunc(true /* hideStack */, func(int) { exited = true })
565+
defer log.ResetExitFunc()
566+
567+
var txn6 Transaction
568+
txn6.ID = uuid.MakeV4()
569+
origTxn6 := txn6
570+
txn6.Update(&txn)
571+
572+
require.Equal(t, origTxn6, txn6)
573+
require.True(t, exited)
521574
}
522575

523576
func TestTransactionUpdateMinTimestamp(t *testing.T) {
@@ -611,6 +664,22 @@ func TestTransactionClone(t *testing.T) {
611664
}
612665
}
613666

667+
func TestTransactionRestart(t *testing.T) {
668+
txn := nonZeroTxn
669+
txn.Restart(1, 1, makeTS(25, 1))
670+
671+
expTxn := nonZeroTxn
672+
expTxn.Epoch++
673+
expTxn.Sequence = 0
674+
expTxn.Timestamp = makeTS(25, 1)
675+
expTxn.OrigTimestamp = makeTS(25, 1)
676+
expTxn.WriteTooOld = false
677+
expTxn.OrigTimestampWasObserved = false
678+
expTxn.IntentSpans = nil
679+
expTxn.InFlightWrites = nil
680+
require.Equal(t, expTxn, txn)
681+
}
682+
614683
// TestTransactionRecordRoundtrips tests a few properties about Transaction
615684
// and TransactionRecord protos. Remember that the latter is wire compatible
616685
// with the former and contains a subset of its protos.

0 commit comments

Comments
 (0)