Skip to content

Commit 5bd37e8

Browse files
Andrew Wernernvb
authored andcommitted
storage: ensure that spans of reproposals of applied commands are not used
This commit deals with the case where more than one reproposal of a command exists at the same MaxAppliedIndex. Code that already exist prevents the command from being finished more than once but leaves open the possibility of writing events into a potentially closed span. Release note: None
1 parent 61651a2 commit 5bd37e8

2 files changed

Lines changed: 108 additions & 0 deletions

File tree

pkg/storage/replica_application.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -889,6 +889,11 @@ func (r *Replica) applyCmdAppBatch(
889889
}
890890
for ok := it.init(&b.cmdBuf); ok; ok = it.next() {
891891
cmd := it.cur()
892+
// Reset the context for already applied commands to ensure that
893+
// reproposals at the same MaxLeaseIndex do not record into closed spans.
894+
if cmd.proposedLocally() && cmd.proposal.applied {
895+
cmd.ctx = ctx
896+
}
892897
for _, sc := range cmd.replicatedResult().SuggestedCompactions {
893898
r.store.compactor.Suggest(cmd.ctx, sc)
894899
}

pkg/storage/replica_test.go

Lines changed: 103 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -58,6 +58,7 @@ import (
5858
"github.com/cockroachdb/logtags"
5959
"github.com/gogo/protobuf/proto"
6060
"github.com/kr/pretty"
61+
opentracing "github.com/opentracing/opentracing-go"
6162
"github.com/pkg/errors"
6263
"github.com/stretchr/testify/assert"
6364
"github.com/stretchr/testify/require"
@@ -11718,6 +11719,108 @@ func TestHighestMaxLeaseIndexReproposalFinishesCommand(t *testing.T) {
1171811719
}
1171911720
}
1172011721

11722+
// TestLaterReproposalsDoNotReuseContext ensures that when commands are
11723+
// reproposed more than once at the same MaxLeaseIndex and the first command
11724+
// applies that the later reproposals do not log into the proposal's context
11725+
// as its underlying trace span may already be finished.
11726+
func TestLaterReproposalsDoNotReuseContext(t *testing.T) {
11727+
defer leaktest.AfterTest(t)()
11728+
11729+
// Set the trace infrastructure to log if a span is used after being finished.
11730+
defer enableTraceDebugUseAfterFree()()
11731+
11732+
tc := testContext{}
11733+
ctx := context.Background()
11734+
11735+
// Set logging up to a test specific directory.
11736+
scope := log.Scope(t)
11737+
defer scope.Close(t)
11738+
11739+
stopper := stop.NewStopper()
11740+
defer stopper.Stop(ctx)
11741+
cfg := TestStoreConfig(hlc.NewClock(hlc.UnixNano, time.Nanosecond))
11742+
// Set up tracing.
11743+
tracer := tracing.NewTracer()
11744+
tracer.Configure(&cfg.Settings.SV)
11745+
tracer.AlwaysTrace()
11746+
cfg.AmbientCtx.Tracer = tracer
11747+
tc.StartWithStoreConfig(t, stopper, cfg)
11748+
key := roachpb.Key("a")
11749+
lease, _ := tc.repl.GetLease()
11750+
txn := newTransaction("test", key, roachpb.NormalUserPriority, tc.Clock())
11751+
ba := roachpb.BatchRequest{
11752+
Header: roachpb.Header{
11753+
RangeID: tc.repl.RangeID,
11754+
Txn: txn,
11755+
},
11756+
}
11757+
ba.Timestamp = txn.OrigTimestamp
11758+
ba.Add(&roachpb.PutRequest{
11759+
RequestHeader: roachpb.RequestHeader{
11760+
Key: key,
11761+
},
11762+
Value: roachpb.MakeValueFromBytes([]byte("val")),
11763+
})
11764+
11765+
// Hold the RaftLock to encourage the reproposals to occur in the same batch.
11766+
tc.repl.RaftLock()
11767+
sp := tracer.StartRootSpan("replica send", logtags.FromContext(ctx), tracing.RecordableSpan)
11768+
tracedCtx := opentracing.ContextWithSpan(ctx, sp)
11769+
// Go out of our way to enable recording so that expensive logging is enabled
11770+
// for this context.
11771+
tracing.StartRecording(sp, tracing.SingleNodeRecording)
11772+
ch, _, _, pErr := tc.repl.evalAndPropose(tracedCtx, lease, &ba, &allSpans, endCmds{})
11773+
if pErr != nil {
11774+
t.Fatal(pErr)
11775+
}
11776+
// Launch a goroutine to finish the span as soon as a result has been sent.
11777+
errCh := make(chan *roachpb.Error)
11778+
go func() {
11779+
res := <-ch
11780+
sp.Finish()
11781+
errCh <- res.Err
11782+
}()
11783+
11784+
// Flush the proposal and then repropose it twice.
11785+
// This test verifies that these later reproposals don't record into the
11786+
// tracedCtx after its span has been finished.
11787+
func() {
11788+
tc.repl.mu.Lock()
11789+
defer tc.repl.mu.Unlock()
11790+
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
11791+
t.Fatal(err)
11792+
}
11793+
tc.repl.refreshProposalsLocked(0, reasonNewLeaderOrConfigChange)
11794+
if err := tc.repl.mu.proposalBuf.flushLocked(); err != nil {
11795+
t.Fatal(err)
11796+
}
11797+
tc.repl.refreshProposalsLocked(0, reasonNewLeaderOrConfigChange)
11798+
}()
11799+
tc.repl.RaftUnlock()
11800+
11801+
if pErr = <-errCh; pErr != nil {
11802+
t.Fatal(pErr)
11803+
}
11804+
// Round trip another proposal through the replica to ensure that previously
11805+
// committed entries have been applied.
11806+
_, pErr = tc.repl.sendWithRangeID(ctx, tc.repl.RangeID, ba)
11807+
if pErr != nil {
11808+
t.Fatal(pErr)
11809+
}
11810+
11811+
stopper.Quiesce(ctx)
11812+
// Check and see if the trace package logged an error.
11813+
log.Flush()
11814+
entries, err := log.FetchEntriesFromFiles(0, math.MaxInt64, 1,
11815+
regexp.MustCompile("net/trace"))
11816+
if err != nil {
11817+
t.Fatal(err)
11818+
}
11819+
if len(entries) > 0 {
11820+
t.Fatalf("reused span after free: %v", entries)
11821+
}
11822+
}
11823+
1172111824
func enableTraceDebugUseAfterFree() (restore func()) {
1172211825
prev := trace.DebugUseAfterFinish
1172311826
trace.DebugUseAfterFinish = true

0 commit comments

Comments
 (0)