Skip to content

Commit bdefc85

Browse files
Move read queue to a more general "command" queue.
Over the course of testing distributed transactions, I noticed many unnecessary restarts on SERIALIZABLE transactions due to the fact that we previously were always updating the range's timestamp cache as soon as a read or a write command arrived. For reads or writes which conflict with an ongoing transaction, the update to the timestamp cache would happen regardless of whether the read or write succeeded. In many cases, the read or write would fail on the conflict, so the restart due to advancing timestamp cache ended up being unnecessary. This change blends both read/write and read-only commands into a single command queue, which operates similarly to the old read queue, only now commands always wait on preceding commands which overlap the same key(s). Read-only commands don't need to wait on other read-only commands, so we exempt this case for better read concurrency. This allows us to wait until after the command is executed to update the timestamp cache, so we can avoid updating the cache on failures.
1 parent bde7a87 commit bdefc85

8 files changed

Lines changed: 536 additions & 374 deletions

File tree

storage/command_queue.go

Lines changed: 131 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,131 @@
1+
// Copyright 2014 The Cockroach Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied. See the License for the specific language governing
13+
// permissions and limitations under the License. See the AUTHORS file
14+
// for names of contributors.
15+
//
16+
// Author: Spencer Kimball (spencer.kimball@gmail.com)
17+
18+
package storage
19+
20+
import (
21+
"sync"
22+
23+
"github.com/cockroachdb/cockroach/storage/engine"
24+
"github.com/cockroachdb/cockroach/util"
25+
)
26+
27+
// A CommandQueue maintains an interval tree of keys or key ranges for
28+
// executing commands. New commands affecting keys or key ranges must
29+
// wait on already-executing commands which overlap their key range.
30+
//
31+
// Before executing, a command invokes GetWait() to initialize a
32+
// WaitGroup with the number of overlapping commands which are already
33+
// running. The wait group is waited on by the caller for confirmation
34+
// that all overlapping, pending commands have completed and the
35+
// pending command can proceed.
36+
//
37+
// After waiting, a command is added to the queue's already-executing
38+
// set via Add(). Add accepts a parameter indicating whether the
39+
// command is read-only. Read-only commands don't need to wait on
40+
// other read-only commands, so the wait group returned via GetWait()
41+
// doesn't include read-only on read-only overlapping commands as an
42+
// optimization.
43+
//
44+
// Once commands complete, Remove() is invoked to remove the executing
45+
// command and decrement the counts on any pending WaitGroups,
46+
// possibly signaling waiting commands who were gated by the executing
47+
// command's affected key(s).
48+
//
49+
// CommandQueue is not thread safe.
50+
type CommandQueue struct {
51+
cache *util.IntervalCache
52+
}
53+
54+
type cmd struct {
55+
readOnly bool
56+
pending []*sync.WaitGroup // Pending commands gated on cmd
57+
}
58+
59+
// NewCommandQueue returns a new command queue.
60+
func NewCommandQueue() *CommandQueue {
61+
cq := &CommandQueue{
62+
cache: util.NewIntervalCache(util.CacheConfig{Policy: util.CacheNone}),
63+
}
64+
cq.cache.OnEvicted = cq.onEvicted
65+
return cq
66+
}
67+
68+
// onEvicted is called when any entry is removed from the interval
69+
// tree. This happens on calls to Remove() and to Clear().
70+
func (cq *CommandQueue) onEvicted(key, value interface{}) {
71+
c := value.(*cmd)
72+
for _, wg := range c.pending {
73+
wg.Done()
74+
}
75+
}
76+
77+
// GetWait initializes the supplied wait group with the number of
78+
// executing commands which overlap the specified key range. If end is
79+
// nil, end is set to start, meaning the command affects a single
80+
// key. The caller should call wg.Wait() to wait for confirmation that
81+
// all gating commands have completed or failed. readOnly is true if
82+
// the requester is a read-only command; false for read-write.
83+
func (cq *CommandQueue) GetWait(start, end engine.Key, readOnly bool, wg *sync.WaitGroup) {
84+
if end == nil {
85+
end = engine.NextKey(start)
86+
}
87+
for _, c := range cq.cache.GetOverlaps(rangeKey(start), rangeKey(end)) {
88+
c := c.(*cmd)
89+
// Only add to the wait group if both commands aren't read-only.
90+
if !readOnly || !c.readOnly {
91+
c.pending = append(c.pending, wg)
92+
wg.Add(1)
93+
}
94+
}
95+
}
96+
97+
// Add adds a command to the queue which affects the specified key
98+
// range. If end is nil, it is set to start, meaning the command
99+
// affects a single key. The returned interface is the key for the
100+
// command queue and must be re-supplied on subsequent invocation of
101+
// Remove().
102+
//
103+
// Add should be invoked after waiting on already-executing,
104+
// overlapping commands via the WaitGroup initialized through
105+
// GetWait().
106+
func (cq *CommandQueue) Add(start, end engine.Key, readOnly bool) interface{} {
107+
if end == nil {
108+
end = engine.NextKey(start)
109+
}
110+
key := cq.cache.NewKey(rangeKey(start), rangeKey(end))
111+
cq.cache.Add(key, &cmd{readOnly: readOnly})
112+
return key
113+
}
114+
115+
// Remove is invoked to signal that the command associated with the
116+
// specified key has completed and should be removed. Any pending
117+
// commands waiting on this command will be signaled if this is the
118+
// only command upon which they are still waiting.
119+
//
120+
// Remove is invoked after a mutating command has been committed to
121+
// the Raft log and applied to the underlying state machine. Similarly,
122+
// Remove is invoked after a read-only command has been executed
123+
// against the underlying state machine.
124+
func (cq *CommandQueue) Remove(key interface{}) {
125+
cq.cache.Del(key)
126+
}
127+
128+
// Clear removes all executing commands, signaling any waiting commands.
129+
func (cq *CommandQueue) Clear() {
130+
cq.cache.Clear()
131+
}

storage/command_queue_test.go

Lines changed: 167 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
// Copyright 2014 The Cockroach Authors.
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
12+
// implied. See the License for the specific language governing
13+
// permissions and limitations under the License. See the AUTHORS file
14+
// for names of contributors.
15+
//
16+
// Author: Spencer Kimball (spencer.kimball@gmail.com)
17+
18+
package storage
19+
20+
import (
21+
"sync"
22+
"testing"
23+
"time"
24+
25+
"github.com/cockroachdb/cockroach/storage/engine"
26+
)
27+
28+
// waitForCmd launches a goroutine to wait on the supplied
29+
// WaitGroup. A channel is returned which signals the completion of
30+
// the wait.
31+
func waitForCmd(wg *sync.WaitGroup) <-chan struct{} {
32+
cmdDone := make(chan struct{})
33+
go func() {
34+
wg.Wait()
35+
close(cmdDone)
36+
}()
37+
return cmdDone
38+
}
39+
40+
// testCmdDone waits for the cmdDone channel to be closed for at most
41+
// the specified wait duration. Returns true if the command finished in
42+
// the allotted time, false otherwise.
43+
func testCmdDone(cmdDone <-chan struct{}, wait time.Duration) bool {
44+
select {
45+
case <-cmdDone:
46+
return true
47+
case <-time.After(wait):
48+
return false
49+
}
50+
}
51+
52+
func TestCommandQueue(t *testing.T) {
53+
cq := NewCommandQueue()
54+
wg := sync.WaitGroup{}
55+
56+
// Try a command with no overlapping already-running commands.
57+
cq.GetWait(engine.Key("a"), nil, false, &wg)
58+
wg.Wait()
59+
cq.GetWait(engine.Key("a"), engine.Key("b"), false, &wg)
60+
wg.Wait()
61+
62+
// Add a command and verify wait group is returned.
63+
wk := cq.Add(engine.Key("a"), nil, false)
64+
cq.GetWait(engine.Key("a"), nil, false, &wg)
65+
cmdDone := waitForCmd(&wg)
66+
if testCmdDone(cmdDone, 1*time.Millisecond) {
67+
t.Fatal("command should not finish with command outstanding")
68+
}
69+
cq.Remove(wk)
70+
if !testCmdDone(cmdDone, 5*time.Millisecond) {
71+
t.Fatal("command should finish with no commands outstanding")
72+
}
73+
}
74+
75+
func TestCommandQueueNoWaitOnReadOnly(t *testing.T) {
76+
cq := NewCommandQueue()
77+
wg := sync.WaitGroup{}
78+
// Add a read-only command.
79+
wk := cq.Add(engine.Key("a"), nil, true)
80+
// Verify no wait on another read-only command.
81+
cq.GetWait(engine.Key("a"), nil, true, &wg)
82+
wg.Wait()
83+
// Verify wait with a read-write command.
84+
cq.GetWait(engine.Key("a"), nil, false, &wg)
85+
cmdDone := waitForCmd(&wg)
86+
if testCmdDone(cmdDone, 1*time.Millisecond) {
87+
t.Fatal("command should not finish with command outstanding")
88+
}
89+
cq.Remove(wk)
90+
if !testCmdDone(cmdDone, 5*time.Millisecond) {
91+
t.Fatal("command should finish with no commands outstanding")
92+
}
93+
}
94+
95+
func TestCommandQueueMultipleExecutingCommands(t *testing.T) {
96+
cq := NewCommandQueue()
97+
wg := sync.WaitGroup{}
98+
99+
// Add multiple commands and add a command which overlaps them all.
100+
wk1 := cq.Add(engine.Key("a"), nil, false)
101+
wk2 := cq.Add(engine.Key("b"), engine.Key("c"), false)
102+
wk3 := cq.Add(engine.Key("0"), engine.Key("d"), false)
103+
cq.GetWait(engine.Key("a"), engine.Key("cc"), false, &wg)
104+
cmdDone := waitForCmd(&wg)
105+
cq.Remove(wk1)
106+
if testCmdDone(cmdDone, 1*time.Millisecond) {
107+
t.Fatal("command should not finish with two commands outstanding")
108+
}
109+
cq.Remove(wk2)
110+
if testCmdDone(cmdDone, 1*time.Millisecond) {
111+
t.Fatal("command should not finish with one command outstanding")
112+
}
113+
cq.Remove(wk3)
114+
if !testCmdDone(cmdDone, 5*time.Millisecond) {
115+
t.Fatal("command should finish with no commands outstanding")
116+
}
117+
}
118+
119+
func TestCommandQueueMultiplePendingCommands(t *testing.T) {
120+
cq := NewCommandQueue()
121+
wg1 := sync.WaitGroup{}
122+
wg2 := sync.WaitGroup{}
123+
wg3 := sync.WaitGroup{}
124+
125+
// Add a command which will overlap all commands.
126+
wk := cq.Add(engine.Key("a"), engine.Key("d"), false)
127+
cq.GetWait(engine.Key("a"), nil, false, &wg1)
128+
cq.GetWait(engine.Key("b"), nil, false, &wg2)
129+
cq.GetWait(engine.Key("c"), nil, false, &wg3)
130+
cmdDone1 := waitForCmd(&wg1)
131+
cmdDone2 := waitForCmd(&wg2)
132+
cmdDone3 := waitForCmd(&wg3)
133+
134+
if testCmdDone(cmdDone1, 1*time.Millisecond) ||
135+
testCmdDone(cmdDone2, 1*time.Millisecond) ||
136+
testCmdDone(cmdDone3, 1*time.Millisecond) {
137+
t.Fatal("no commands should finish with command outstanding")
138+
}
139+
cq.Remove(wk)
140+
if !testCmdDone(cmdDone1, 5*time.Millisecond) ||
141+
!testCmdDone(cmdDone2, 5*time.Millisecond) ||
142+
!testCmdDone(cmdDone3, 5*time.Millisecond) {
143+
t.Fatal("commands should finish with no commands outstanding")
144+
}
145+
}
146+
147+
func TestCommandQueueClear(t *testing.T) {
148+
cq := NewCommandQueue()
149+
wg1 := sync.WaitGroup{}
150+
wg2 := sync.WaitGroup{}
151+
152+
// Add multiple commands and commands which access each.
153+
cq.Add(engine.Key("a"), nil, false)
154+
cq.Add(engine.Key("b"), nil, false)
155+
cq.GetWait(engine.Key("a"), nil, false, &wg1)
156+
cq.GetWait(engine.Key("b"), nil, false, &wg2)
157+
cmdDone1 := waitForCmd(&wg1)
158+
cmdDone2 := waitForCmd(&wg2)
159+
160+
// Clear the queue and verify both commands are signaled.
161+
cq.Clear()
162+
163+
if !testCmdDone(cmdDone1, 1*time.Millisecond) ||
164+
!testCmdDone(cmdDone2, 1*time.Millisecond) {
165+
t.Fatal("commands should finish when clearing queue")
166+
}
167+
}

storage/engine/in_mem.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -127,7 +127,7 @@ func (in *InMem) Put(key Key, value []byte) error {
127127
return in.putLocked(key, value)
128128
}
129129

130-
// putLocked assumes mutex is already held by caller. See put().
130+
// putLocked assumes mutex is already held by caller. See Put().
131131
func (in *InMem) putLocked(key Key, value []byte) error {
132132
if len(key) == 0 {
133133
return emptyKeyError()

0 commit comments

Comments
 (0)