Skip to content

Commit dc689ae

Browse files
committed
concurrency: emit structured contention information to trace
This change attaches a protobuf payload to the current Span whenever a request conflicts with another transaction. The payload contains the contending txn (i.e. the pushee) at the time at which it was first encountered, the key on which the conflict occurred (note that this is not necessarily the key at which the pushee is anchored) and the time spent waiting on the conflict (excluding intent resolution). This enables #57114. Touches #55583. I am not closing that issue yet because we also want to potentially track the outcome of the conflict. Release note: None
1 parent ff7d7f4 commit dc689ae

23 files changed

Lines changed: 1151 additions & 706 deletions

pkg/kv/kvserver/concurrency/BUILD.bazel

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@ go_library(
3131
"//pkg/util/stop",
3232
"//pkg/util/syncutil",
3333
"//pkg/util/timeutil",
34+
"//pkg/util/tracing",
3435
"//pkg/util/uuid",
3536
"@com_github_cockroachdb_errors//:errors",
3637
],

pkg/kv/kvserver/concurrency/concurrency_manager.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -59,6 +59,7 @@ type Config struct {
5959
// Configs + Knobs.
6060
MaxLockTableSize int64
6161
DisableTxnPushing bool
62+
OnContentionEvent func(*roachpb.ContentionEvent) // may be nil; allowed to mutate the event
6263
TxnWaitKnobs txnwait.TestingKnobs
6364
}
6465

@@ -92,6 +93,7 @@ func NewManager(cfg Config) Manager {
9293
ir: cfg.IntentResolver,
9394
lt: lt,
9495
disableTxnPushing: cfg.DisableTxnPushing,
96+
onContentionEvent: cfg.OnContentionEvent,
9597
},
9698
// TODO(nvanbenschoten): move pkg/storage/txnwait to a new
9799
// pkg/storage/concurrency/txnwait package.

pkg/kv/kvserver/concurrency/concurrency_manager_test.go

Lines changed: 49 additions & 19 deletions
Original file line numberDiff line numberDiff line change
@@ -164,16 +164,17 @@ func TestConcurrencyManagerBasic(t *testing.T) {
164164
}
165165
latchSpans, lockSpans := c.collectSpans(t, txn, ts, reqs)
166166

167-
c.requestsByName[reqName] = concurrency.Request{
168-
Txn: txn,
169-
Timestamp: ts,
170-
// TODO(nvanbenschoten): test Priority
171-
ReadConsistency: readConsistency,
172-
WaitPolicy: waitPolicy,
173-
Requests: reqUnions,
174-
LatchSpans: latchSpans,
175-
LockSpans: lockSpans,
176-
}
167+
c.requestsByName[reqName] = testReq{
168+
Request: concurrency.Request{
169+
Txn: txn,
170+
Timestamp: ts,
171+
// TODO(nvanbenschoten): test Priority
172+
ReadConsistency: readConsistency,
173+
WaitPolicy: waitPolicy,
174+
Requests: reqUnions,
175+
LatchSpans: latchSpans,
176+
LockSpans: lockSpans,
177+
}}
177178
return ""
178179

179180
case "sequence":
@@ -190,8 +191,8 @@ func TestConcurrencyManagerBasic(t *testing.T) {
190191
c.mu.Unlock()
191192

192193
opName := fmt.Sprintf("sequence %s", reqName)
193-
mon.runAsync(opName, func(ctx context.Context) {
194-
guard, resp, err := m.SequenceReq(ctx, prev, req)
194+
cancel := mon.runAsync(opName, func(ctx context.Context) {
195+
guard, resp, err := m.SequenceReq(ctx, prev, req.Request)
195196
if err != nil {
196197
log.Eventf(ctx, "sequencing complete, returned error: %v", err)
197198
} else if resp != nil {
@@ -205,6 +206,8 @@ func TestConcurrencyManagerBasic(t *testing.T) {
205206
log.Event(ctx, "sequencing complete, returned no guard")
206207
}
207208
})
209+
req.cancel = cancel
210+
c.requestsByName[reqName] = req
208211
return c.waitAndCollect(t, mon)
209212

210213
case "finish":
@@ -477,6 +480,11 @@ func TestConcurrencyManagerBasic(t *testing.T) {
477480
})
478481
}
479482

483+
type testReq struct {
484+
cancel func()
485+
concurrency.Request
486+
}
487+
480488
// cluster encapsulates the state of a running cluster and a set of requests.
481489
// It serves as the test harness in TestConcurrencyManagerBasic - maintaining
482490
// transaction and request declarations, recording the state of in-flight
@@ -491,7 +499,7 @@ type cluster struct {
491499
// Definitions.
492500
txnCounter uint32
493501
txnsByName map[string]*roachpb.Transaction
494-
requestsByName map[string]concurrency.Request
502+
requestsByName map[string]testReq
495503

496504
// Request state. Cleared on reset.
497505
mu syncutil.Mutex
@@ -511,6 +519,7 @@ type txnRecord struct {
511519
type txnPush struct {
512520
ctx context.Context
513521
pusher, pushee uuid.UUID
522+
count int
514523
}
515524

516525
func newCluster() *cluster {
@@ -520,7 +529,7 @@ func newCluster() *cluster {
520529
rangeDesc: &roachpb.RangeDescriptor{RangeID: 1},
521530

522531
txnsByName: make(map[string]*roachpb.Transaction),
523-
requestsByName: make(map[string]concurrency.Request),
532+
requestsByName: make(map[string]testReq),
524533
guardsByReqName: make(map[string]*concurrency.Guard),
525534
txnRecords: make(map[uuid.UUID]*txnRecord),
526535
txnPushes: make(map[uuid.UUID]*txnPush),
@@ -533,6 +542,9 @@ func (c *cluster) makeConfig() concurrency.Config {
533542
RangeDesc: c.rangeDesc,
534543
Settings: c.st,
535544
IntentResolver: c,
545+
OnContentionEvent: func(ev *roachpb.ContentionEvent) {
546+
ev.Duration = 1234 * time.Millisecond // for determinism
547+
},
536548
TxnWaitMetrics: txnwait.NewMetrics(time.Minute),
537549
}
538550
}
@@ -680,11 +692,18 @@ func (r *txnRecord) asTxn() (*roachpb.Transaction, chan struct{}) {
680692
func (c *cluster) registerPush(ctx context.Context, pusher, pushee uuid.UUID) (*txnPush, error) {
681693
c.mu.Lock()
682694
defer c.mu.Unlock()
683-
if _, ok := c.txnPushes[pusher]; ok {
684-
return nil, errors.Errorf("txn %v already pushing", pusher)
695+
if p, ok := c.txnPushes[pusher]; ok {
696+
if pushee != p.pushee {
697+
return nil, errors.Errorf("pusher %s can't push two txns %s and %s at the same time",
698+
pusher.Short(), pushee.Short(), p.pushee.Short(),
699+
)
700+
}
701+
p.count++
702+
return p, nil
685703
}
686704
p := &txnPush{
687705
ctx: ctx,
706+
count: 1,
688707
pusher: pusher,
689708
pushee: pushee,
690709
}
@@ -695,7 +714,17 @@ func (c *cluster) registerPush(ctx context.Context, pusher, pushee uuid.UUID) (*
695714
func (c *cluster) unregisterPush(push *txnPush) {
696715
c.mu.Lock()
697716
defer c.mu.Unlock()
698-
delete(c.txnPushes, push.pusher)
717+
p, ok := c.txnPushes[push.pusher]
718+
if !ok {
719+
return
720+
}
721+
p.count--
722+
if p.count == 0 {
723+
delete(c.txnPushes, push.pusher)
724+
}
725+
if p.count < 0 {
726+
panic(fmt.Sprintf("negative count: %+v", p))
727+
}
699728
}
700729

701730
// detectDeadlocks looks at all in-flight transaction pushes and determines
@@ -792,7 +821,7 @@ func (c *cluster) resetNamespace() {
792821
defer c.mu.Unlock()
793822
c.txnCounter = 0
794823
c.txnsByName = make(map[string]*roachpb.Transaction)
795-
c.requestsByName = make(map[string]concurrency.Request)
824+
c.requestsByName = make(map[string]testReq)
796825
c.txnRecords = make(map[uuid.UUID]*txnRecord)
797826
}
798827

@@ -871,7 +900,7 @@ func (m *monitor) runSync(opName string, fn func(context.Context)) {
871900
atomic.StoreInt32(&g.finished, 1)
872901
}
873902

874-
func (m *monitor) runAsync(opName string, fn func(context.Context)) {
903+
func (m *monitor) runAsync(opName string, fn func(context.Context)) (cancel func()) {
875904
m.seq++
876905
ctx, collect, cancel := tracing.ContextWithRecordingSpan(context.Background(), opName)
877906
g := &monitoredGoroutine{
@@ -887,6 +916,7 @@ func (m *monitor) runAsync(opName string, fn func(context.Context)) {
887916
fn(ctx)
888917
atomic.StoreInt32(&g.finished, 1)
889918
}()
919+
return cancel
890920
}
891921

892922
func (m *monitor) numMonitored() int {

pkg/kv/kvserver/concurrency/lock_table.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -80,7 +80,7 @@ type waitingState struct {
8080
// Represents who the request is waiting for. The conflicting
8181
// transaction may be a lock holder of a conflicting lock or a
8282
// conflicting request being sequenced through the same lockTable.
83-
txn *enginepb.TxnMeta // always non-nil
83+
txn *enginepb.TxnMeta // always non-nil in waitFor{,Distinguished,Self}
8484
key roachpb.Key // the key of the conflict
8585
held bool // is the conflict a held lock?
8686

@@ -968,7 +968,12 @@ func (l *lockState) informActiveWaiters() {
968968
g := qg.guard
969969
var state waitingState
970970
if g.isSameTxnAsReservation(waitForState) {
971-
state = waitingState{kind: waitSelf}
971+
state = waitingState{
972+
kind: waitSelf,
973+
key: waitForState.key,
974+
txn: waitForState.txn,
975+
held: waitForState.held, // false
976+
}
972977
} else {
973978
state = waitForState
974979
state.guardAccess = spanset.SpanReadWrite
@@ -1339,7 +1344,12 @@ func (l *lockState) tryActiveWait(
13391344
g.key = l.key
13401345
g.mu.startWait = true
13411346
if g.isSameTxnAsReservation(waitForState) {
1342-
g.mu.state = waitingState{kind: waitSelf}
1347+
g.mu.state = waitingState{
1348+
kind: waitSelf,
1349+
key: waitForState.key,
1350+
txn: waitForState.txn,
1351+
held: waitForState.held, // false
1352+
}
13431353
} else {
13441354
state := waitForState
13451355
state.guardAccess = sa

pkg/kv/kvserver/concurrency/lock_table_waiter.go

Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@
1111
package concurrency
1212

1313
import (
14+
"bytes"
1415
"context"
1516
"math"
1617
"time"
@@ -26,6 +27,7 @@ import (
2627
"github.com/cockroachdb/cockroach/pkg/util/stop"
2728
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
2829
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
30+
"github.com/cockroachdb/cockroach/pkg/util/tracing"
2931
"github.com/cockroachdb/cockroach/pkg/util/uuid"
3032
"github.com/cockroachdb/errors"
3133
)
@@ -99,6 +101,9 @@ type lockTableWaiterImpl struct {
99101
// When set, WriteIntentError are propagated instead of pushing
100102
// conflicting transactions.
101103
disableTxnPushing bool
104+
// When set, called just before each ContentionEvent is emitted.
105+
// Is allowed to mutate the event.
106+
onContentionEvent func(ev *roachpb.ContentionEvent)
102107
}
103108

104109
// IntentResolver is an interface used by lockTableWaiterImpl to push
@@ -131,11 +136,24 @@ func (w *lockTableWaiterImpl) WaitOn(
131136
var timer *timeutil.Timer
132137
var timerC <-chan time.Time
133138
var timerWaitingState waitingState
139+
140+
h := contentionEventHelper{
141+
sp: tracing.SpanFromContext(ctx),
142+
onEvent: w.onContentionEvent,
143+
}
144+
defer h.emit()
145+
134146
for {
135147
select {
148+
// newStateC will be signaled for the transaction we are currently
149+
// contending on. We will continue to receive updates about this
150+
// transaction until it no longer contends with us, at which point
151+
// either one of the other channels fires or we receive state
152+
// about another contending transaction on newStateC.
136153
case <-newStateC:
137154
timerC = nil
138155
state := guard.CurState()
156+
h.emitAndInit(state)
139157
switch state.kind {
140158
case waitFor, waitForDistinguished:
141159
if req.WaitPolicy == lock.WaitPolicy_Error {
@@ -654,6 +672,75 @@ func (c *txnCache) insertFrontLocked(txn *roachpb.Transaction) {
654672
c.txns[0] = txn
655673
}
656674

675+
// contentionEventHelper tracks and emits ContentionEvents.
676+
type contentionEventHelper struct {
677+
sp *tracing.Span
678+
onEvent func(event *roachpb.ContentionEvent) // may be nil
679+
680+
// Internal.
681+
ev *roachpb.ContentionEvent
682+
tBegin time.Time
683+
}
684+
685+
// emit emits the open contention event, if any.
686+
func (h *contentionEventHelper) emit() {
687+
if h.ev == nil {
688+
return
689+
}
690+
h.ev.Duration = timeutil.Since(h.tBegin)
691+
if h.onEvent != nil {
692+
// NB: this is intentionally above the call to LogStructured so that
693+
// this interceptor gets to mutate the event (used for test determinism).
694+
h.onEvent(h.ev)
695+
}
696+
h.sp.LogStructured(h.ev)
697+
h.ev = nil
698+
}
699+
700+
// emitAndInit compares the waitingState's active txn (if any) against the current
701+
// ContentionEvent (if any). If the they match, we are continuing to handle the
702+
// same event and no action is taken. If they differ, the open event (if any) is
703+
// finalized and added to the Span, and a new event initialized from the inputs.
704+
func (h *contentionEventHelper) emitAndInit(s waitingState) {
705+
if h.sp == nil {
706+
// No span to attach payloads to - don't do any work.
707+
//
708+
// TODO(tbg): we could special case the noop span here too, but the plan is for
709+
// nobody to use noop spans any more (trace.mode=background).
710+
return
711+
}
712+
713+
// If true, we want to emit the current event and possibly start a new one.
714+
// Otherwise,
715+
switch s.kind {
716+
case waitFor, waitForDistinguished, waitSelf:
717+
// If we're tracking an event and see a different txn/key, the event is
718+
// done and we initialize the new event tracking the new txn/key.
719+
//
720+
// NB: we're guaranteed to have `s.{txn,key}` populated here.
721+
if h.ev != nil &&
722+
(!h.ev.TxnMeta.ID.Equal(s.txn.ID) || !bytes.Equal(h.ev.Key, s.key)) {
723+
h.emit() // h.ev is now nil
724+
}
725+
726+
if h.ev == nil {
727+
h.ev = &roachpb.ContentionEvent{
728+
Key: s.key,
729+
TxnMeta: *s.txn,
730+
}
731+
h.tBegin = timeutil.Now()
732+
}
733+
case waitElsewhere, doneWaiting:
734+
// If we have an event, emit it now and that's it - the case we're in
735+
// does not give us a new transaction/key.
736+
if h.ev != nil {
737+
h.emit()
738+
}
739+
default:
740+
panic("unhandled waitingState.kind")
741+
}
742+
}
743+
657744
func newWriteIntentErr(ws waitingState) *Error {
658745
return roachpb.NewError(&roachpb.WriteIntentError{
659746
Intents: []roachpb.Intent{roachpb.MakeIntent(ws.txn, ws.key)},

0 commit comments

Comments
 (0)