Skip to content

Commit d064059

Browse files
committed
kvserver/apply: use a better ctx for cmd.AckSuccess
Before this patch, CheckedCommand.AckSuccess() was called with a Raft worker context. That's wasn't great because each command captures a better context to use - one that derives from the proposal's ctx in the case of local proposals. This patch switches to using that by exposing the captured context through the Command interface. Taking advantage of the new ctx, we also log a message now about early acks, as it seems like a notable hint to see in a trace. This patch also cleans up most existing uses of that captured context to use the new interface method; before, various code paths were type asserting the implementation of the Command, and getting the internal context that way. This patch moves the resposibility of deciding what context to use upwards, to callers. Release note: None
1 parent f16c6a2 commit d064059

6 files changed

Lines changed: 44 additions & 24 deletions

File tree

pkg/kv/kvserver/apply/cmd.go

Lines changed: 14 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,14 @@ type Command interface {
2424
// that were locally proposed typically have a client waiting on a
2525
// response, so there is additional urgency to apply them quickly.
2626
IsLocal() bool
27+
// Ctx returns the Context in which operations on this Command should be
28+
// performed.
29+
//
30+
// A Command does the unusual thing of capturing a Context because commands
31+
// are generally processed in batches, but different commands might want their
32+
// events going to different places. In particular, commands that have been
33+
// proposed locally get a tracing span tied to the local proposal.
34+
Ctx() context.Context
2735
// AckErrAndFinish signals that the application of the command has been
2836
// rejected due to the provided error. It also relays this rejection of
2937
// the command to its client if it was proposed locally. An error will
@@ -167,12 +175,13 @@ func takeWhileCmdIter(iter CommandIterator, pred func(Command) bool) CommandIter
167175
// responsible for converting Commands into CheckedCommand. The function
168176
// closes the provided iterator.
169177
func mapCmdIter(
170-
iter CommandIterator, fn func(Command) (CheckedCommand, error),
178+
iter CommandIterator, fn func(context.Context, Command) (CheckedCommand, error),
171179
) (CheckedCommandIterator, error) {
172180
defer iter.Close()
173181
ret := iter.NewCheckedList()
174182
for iter.Valid() {
175-
checked, err := fn(iter.Cur())
183+
cur := iter.Cur()
184+
checked, err := fn(cur.Ctx(), cur)
176185
if err != nil {
177186
ret.Close()
178187
return nil, err
@@ -188,12 +197,13 @@ func mapCmdIter(
188197
// is responsible for converting CheckedCommand into AppliedCommand. The
189198
// function closes the provided iterator.
190199
func mapCheckedCmdIter(
191-
iter CheckedCommandIterator, fn func(CheckedCommand) (AppliedCommand, error),
200+
iter CheckedCommandIterator, fn func(context.Context, CheckedCommand) (AppliedCommand, error),
192201
) (AppliedCommandIterator, error) {
193202
defer iter.Close()
194203
ret := iter.NewAppliedList()
195204
for iter.Valid() {
196-
applied, err := fn(iter.CurChecked())
205+
curChecked := iter.CurChecked()
206+
applied, err := fn(curChecked.Ctx(), curChecked)
197207
if err != nil {
198208
ret.Close()
199209
return nil, err

pkg/kv/kvserver/apply/task.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -52,7 +52,7 @@ type StateMachine interface {
5252
// an untimely crash. This means that applying these side-effects will
5353
// typically update the in-memory representation of the state machine
5454
// to the same state that it would be in if the process restarted.
55-
ApplySideEffects(CheckedCommand) (AppliedCommand, error)
55+
ApplySideEffects(context.Context, CheckedCommand) (AppliedCommand, error)
5656
}
5757

5858
// ErrRemoved can be returned from ApplySideEffects which will stop the task
@@ -67,7 +67,7 @@ var ErrRemoved = errors.New("replica removed")
6767
type Batch interface {
6868
// Stage inserts a Command into the Batch. In doing so, the Command is
6969
// checked for rejection and a CheckedCommand is returned.
70-
Stage(Command) (CheckedCommand, error)
70+
Stage(context.Context, Command) (CheckedCommand, error)
7171
// ApplyToStateMachine applies the persistent state transitions staged
7272
// in the Batch to the StateMachine, atomically.
7373
ApplyToStateMachine(context.Context) error
@@ -225,7 +225,7 @@ func (t *Task) AckCommittedEntriesBeforeApplication(ctx context.Context, maxInde
225225
// want to retry the command instead of returning the error to the client.
226226
return forEachCheckedCmdIter(ctx, stagedIter, func(cmd CheckedCommand, ctx context.Context) error {
227227
if !cmd.Rejected() && cmd.IsLocal() && cmd.CanAckBeforeApplication() {
228-
return cmd.AckSuccess(ctx)
228+
return cmd.AckSuccess(cmd.Ctx())
229229
}
230230
return nil
231231
})

pkg/kv/kvserver/apply/task_test.go

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -52,9 +52,10 @@ type appliedCmd struct {
5252
*checkedCmd
5353
}
5454

55-
func (c *cmd) Index() uint64 { return c.index }
56-
func (c *cmd) IsTrivial() bool { return !c.nonTrivial }
57-
func (c *cmd) IsLocal() bool { return !c.nonLocal }
55+
func (c *cmd) Index() uint64 { return c.index }
56+
func (c *cmd) IsTrivial() bool { return !c.nonTrivial }
57+
func (c *cmd) IsLocal() bool { return !c.nonLocal }
58+
func (c *cmd) Ctx() context.Context { return context.Background() }
5859
func (c *cmd) AckErrAndFinish(_ context.Context, err error) error {
5960
c.acked = true
6061
c.finished = true
@@ -138,7 +139,7 @@ func (sm *testStateMachine) NewBatch(ephemeral bool) apply.Batch {
138139
return &testBatch{sm: sm, ephemeral: ephemeral}
139140
}
140141
func (sm *testStateMachine) ApplySideEffects(
141-
cmdI apply.CheckedCommand,
142+
_ context.Context, cmdI apply.CheckedCommand,
142143
) (apply.AppliedCommand, error) {
143144
cmd := cmdI.(*checkedCmd)
144145
sm.appliedSideEffects = append(sm.appliedSideEffects, cmd.index)
@@ -160,7 +161,7 @@ type testBatch struct {
160161
staged []uint64
161162
}
162163

163-
func (b *testBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) {
164+
func (b *testBatch) Stage(_ context.Context, cmdI apply.Command) (apply.CheckedCommand, error) {
164165
cmd := cmdI.(*cmd)
165166
b.staged = append(b.staged, cmd.index)
166167
ccmd := checkedCmd{cmd: cmd, rejected: cmd.shouldReject}

pkg/kv/kvserver/replica_application_cmd.go

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,6 +114,11 @@ func (c *replicatedCmd) IsLocal() bool {
114114
return c.proposal != nil
115115
}
116116

117+
// Ctx implements the apply.Command interface.
118+
func (c *replicatedCmd) Ctx() context.Context {
119+
return c.ctx
120+
}
121+
117122
// AckErrAndFinish implements the apply.Command interface.
118123
func (c *replicatedCmd) AckErrAndFinish(ctx context.Context, err error) error {
119124
if c.IsLocal() {
@@ -143,7 +148,7 @@ func (c *replicatedCmd) CanAckBeforeApplication() bool {
143148
}
144149

145150
// AckSuccess implements the apply.CheckedCommand interface.
146-
func (c *replicatedCmd) AckSuccess(_ context.Context) error {
151+
func (c *replicatedCmd) AckSuccess(ctx context.Context) error {
147152
if !c.IsLocal() {
148153
return nil
149154
}
@@ -158,6 +163,7 @@ func (c *replicatedCmd) AckSuccess(_ context.Context) error {
158163
resp.Reply = &reply
159164
resp.EncounteredIntents = c.proposal.Local.DetachEncounteredIntents()
160165
resp.EndTxns = c.proposal.Local.DetachEndTxns(false /* alwaysOnly */)
166+
log.Event(ctx, "ack-ing replication success to the client; application will continue async w.r.t. the client")
161167
c.proposal.signalProposalResult(resp)
162168
return nil
163169
}

pkg/kv/kvserver/replica_application_state_machine.go

Lines changed: 12 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -428,9 +428,10 @@ type replicaAppBatch struct {
428428
// the batch. This allows the batch to make an accurate determination about
429429
// whether to accept or reject the next command that is staged without needing
430430
// to actually update the replica state machine in between.
431-
func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) {
431+
func (b *replicaAppBatch) Stage(
432+
ctx context.Context, cmdI apply.Command,
433+
) (apply.CheckedCommand, error) {
432434
cmd := cmdI.(*replicatedCmd)
433-
ctx := cmd.ctx
434435
if cmd.ent.Index == 0 {
435436
return nil, makeNonDeterministicFailure("processRaftCommand requires a non-zero index")
436437
}
@@ -457,7 +458,7 @@ func (b *replicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error
457458
cmd.raftCmd.LogicalOpLog = nil
458459
cmd.raftCmd.ClosedTimestamp = nil
459460
} else {
460-
if err := b.assertNoCmdClosedTimestampRegression(cmd); err != nil {
461+
if err := b.assertNoCmdClosedTimestampRegression(ctx, cmd); err != nil {
461462
return nil, err
462463
}
463464
if err := b.assertNoWriteBelowClosedTimestamp(cmd); err != nil {
@@ -992,7 +993,9 @@ func (b *replicaAppBatch) assertNoWriteBelowClosedTimestamp(cmd *replicatedCmd)
992993

993994
// Assert that the closed timestamp carried by the command is not below one from
994995
// previous commands.
995-
func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCmd) error {
996+
func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(
997+
ctx context.Context, cmd *replicatedCmd,
998+
) error {
996999
if !raftClosedTimestampAssertionsEnabled {
9971000
return nil
9981001
}
@@ -1012,7 +1015,7 @@ func (b *replicaAppBatch) assertNoCmdClosedTimestampRegression(cmd *replicatedCm
10121015
prevReq.SafeString("<unknown; not leaseholder or not lease request>")
10131016
}
10141017

1015-
logTail, err := b.r.printRaftTail(cmd.ctx, 100 /* maxEntries */, 2000 /* maxCharsPerEntry */)
1018+
logTail, err := b.r.printRaftTail(ctx, 100 /* maxEntries */, 2000 /* maxCharsPerEntry */)
10161019
if err != nil {
10171020
if logTail != "" {
10181021
logTail = logTail + "\n; error printing log: " + err.Error()
@@ -1043,9 +1046,10 @@ type ephemeralReplicaAppBatch struct {
10431046
}
10441047

10451048
// Stage implements the apply.Batch interface.
1046-
func (mb *ephemeralReplicaAppBatch) Stage(cmdI apply.Command) (apply.CheckedCommand, error) {
1049+
func (mb *ephemeralReplicaAppBatch) Stage(
1050+
ctx context.Context, cmdI apply.Command,
1051+
) (apply.CheckedCommand, error) {
10471052
cmd := cmdI.(*replicatedCmd)
1048-
ctx := cmd.ctx
10491053

10501054
mb.r.shouldApplyCommand(ctx, cmd, &mb.state)
10511055
mb.state.LeaseAppliedIndex = cmd.leaseIndex
@@ -1071,10 +1075,9 @@ func (mb *ephemeralReplicaAppBatch) Close() {
10711075
// side effects of commands, such as finalizing splits/merges and informing
10721076
// raft about applied config changes.
10731077
func (sm *replicaStateMachine) ApplySideEffects(
1074-
cmdI apply.CheckedCommand,
1078+
ctx context.Context, cmdI apply.CheckedCommand,
10751079
) (apply.AppliedCommand, error) {
10761080
cmd := cmdI.(*replicatedCmd)
1077-
ctx := cmd.ctx
10781081

10791082
// Deal with locking during side-effect handling, which is sometimes
10801083
// associated with complex commands such as splits and merged.

pkg/kv/kvserver/replica_application_state_machine_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -110,7 +110,7 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) {
110110
},
111111
}
112112

113-
checkedCmd, err := b.Stage(cmd)
113+
checkedCmd, err := b.Stage(cmd.ctx, cmd)
114114
require.NoError(t, err)
115115
require.Equal(t, !add, b.changeRemovesReplica)
116116
require.Equal(t, b.state.RaftAppliedIndex, cmd.ent.Index)
@@ -129,7 +129,7 @@ func TestReplicaStateMachineChangeReplicas(t *testing.T) {
129129
require.NoError(t, err)
130130

131131
// Apply the side effects of the command to the StateMachine.
132-
_, err = sm.ApplySideEffects(checkedCmd)
132+
_, err = sm.ApplySideEffects(checkedCmd.Ctx(), checkedCmd)
133133
if add {
134134
require.NoError(t, err)
135135
} else {

0 commit comments

Comments
 (0)