Skip to content

Commit 11abdda

Browse files
committed
hlc: introduce synthetic flag on timestamps
Informs #52745. Informs #36431. This commit introduces an 8-bit flags field on the hlc timestamp struct. The flags are used to provide details about the timestamp and its meaning. They do not affect the sort order of Timestamps. The commit then introduces the first flag: SYNTHETIC. As discussed in #52745, a synthetic timestamp is defined as a timestamp that makes no claim about the value of clocks in the system. While standard timestamps are pulled from HLC clocks and indicate that some node in the system has a clock with a reading equal to or above its value, a synthetic timestamp makes no such indication. By avoiding a connection to "real time", synthetic timestamps can be used to write values at a future time and to indicate that observed timestamps do not apply to such writes for the purposes of tracking causality between the write and its observers. Observed timestamps will be a critical part of implementing non-blocking transactions (#52745) and fixing the interaction between observed timestamps and transaction refreshing (#36431). The original plan was to reserve the high-order bit in the logical portion of a timestamp as a "synthetic bit". This is how I began implementing things, but was turned off for a few reasons. First, it was fairly subtle and seemed too easy to get wrong. Using a separate field is more explicit and avoids a class of bugs. Second, I began to have serious concerns about how the synthetic bit would impact timestamp ordering. Every timestamp comparison would need to mask out the bit or risk being incorrect. This was even true of the LSM custom comparator. This seemed difficult to get right, and seemed particularly concerning since we're planning on marking only some of a transaction's committed values as synthetic to fix #36431, so if we weren't careful, we could get atomicity violations. There were also minor backwards compatibility concerns. But a separate field is more expensive in theory, so we need to be careful. However, it turns out that a separate field is mostly free in each case that we care about. In memory, the separate field is effectively free because the Timestamp struct was previously 12 bytes but was always padded out to 16 bytes when included as a field in any other struct. This means that the flags field is replacing existing padding. Over the wire, the field will not be included when zero and will use a varint encoding when not zero, so again, it is mostly free. In the engine key encoding, the field is also not included when zero, and takes up only 1 byte when non-zero, so it is mostly free.
1 parent df5e066 commit 11abdda

36 files changed

Lines changed: 552 additions & 146 deletions

pkg/ccl/changefeedccl/kvfeed/buffer.go

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -211,6 +211,7 @@ var memBufferColTypes = []*types.T{
211211
types.Bytes, // span.EndKey
212212
types.Int, // ts.WallTime
213213
types.Int, // ts.Logical
214+
types.Int, // ts.Flags
214215
}
215216

216217
// memBuffer is an in-memory buffer for changed KV and Resolved timestamp
@@ -266,6 +267,7 @@ func (b *memBuffer) AddKV(
266267
tree.DNull,
267268
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.WallTime)),
268269
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.Logical)),
270+
b.allocMu.a.NewDInt(tree.DInt(kv.Value.Timestamp.Flags)),
269271
}
270272
b.allocMu.Unlock()
271273
return b.addRow(ctx, row)
@@ -284,6 +286,7 @@ func (b *memBuffer) AddResolved(
284286
b.allocMu.a.NewDBytes(tree.DBytes(span.EndKey)),
285287
b.allocMu.a.NewDInt(tree.DInt(ts.WallTime)),
286288
b.allocMu.a.NewDInt(tree.DInt(ts.Logical)),
289+
b.allocMu.a.NewDInt(tree.DInt(ts.Flags)),
287290
}
288291
b.allocMu.Unlock()
289292
return b.addRow(ctx, row)
@@ -300,6 +303,7 @@ func (b *memBuffer) Get(ctx context.Context) (Event, error) {
300303
ts := hlc.Timestamp{
301304
WallTime: int64(*row[5].(*tree.DInt)),
302305
Logical: int32(*row[6].(*tree.DInt)),
306+
Flags: uint32(*row[7].(*tree.DInt)),
303307
}
304308
if row[2] != tree.DNull {
305309
e.prevVal = roachpb.Value{

pkg/ccl/storageccl/export_test.go

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -581,11 +581,7 @@ func TestRandomKeyAndTimestampExport(t *testing.T) {
581581
}
582582
batch.Close()
583583

584-
sort.Slice(timestamps, func(i, j int) bool {
585-
return (timestamps[i].WallTime < timestamps[j].WallTime) ||
586-
(timestamps[i].WallTime == timestamps[j].WallTime &&
587-
timestamps[i].Logical < timestamps[j].Logical)
588-
})
584+
sort.Slice(timestamps, func(i, j int) bool { return timestamps[i].Less(timestamps[j]) })
589585
return keys, timestamps
590586
}
591587

pkg/jobs/helpers.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -54,7 +54,7 @@ func NewFakeNodeLiveness(nodeCount int) *FakeNodeLiveness {
5454
nodeID := roachpb.NodeID(i + 1)
5555
nl.mu.livenessMap[nodeID] = &livenesspb.Liveness{
5656
Epoch: 1,
57-
Expiration: hlc.LegacyTimestamp(hlc.MaxTimestamp),
57+
Expiration: hlc.MaxTimestamp.ToLegacyTimestamp(),
5858
NodeID: nodeID,
5959
}
6060
}
@@ -113,7 +113,7 @@ func (nl *FakeNodeLiveness) FakeIncrementEpoch(id roachpb.NodeID) {
113113
func (nl *FakeNodeLiveness) FakeSetExpiration(id roachpb.NodeID, ts hlc.Timestamp) {
114114
nl.mu.Lock()
115115
defer nl.mu.Unlock()
116-
nl.mu.livenessMap[id].Expiration = hlc.LegacyTimestamp(ts)
116+
nl.mu.livenessMap[id].Expiration = ts.ToLegacyTimestamp()
117117
}
118118

119119
// ResetConstructors resets the registered Resumer constructors.

pkg/kv/kvserver/below_raft_protos_test.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
6666
return m
6767
},
6868
emptySum: 7551962144604783939,
69-
populatedSum: 12720006657210437557,
69+
populatedSum: 5737658018003400959,
7070
},
7171
reflect.TypeOf(&enginepb.RangeAppliedState{}): {
7272
populatedConstructor: func(r *rand.Rand) protoutil.Message {
@@ -124,7 +124,7 @@ var belowRaftGoldenProtos = map[reflect.Type]fixture{
124124
return enginepb.NewPopulatedMVCCMetadataSubsetForMergeSerialization(r, false)
125125
},
126126
emptySum: 14695981039346656037,
127-
populatedSum: 7432412240713840291,
127+
populatedSum: 834545685817460463,
128128
},
129129
}
130130

pkg/kv/kvserver/gc/gc.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -250,7 +250,7 @@ func processReplicatedKeyRange(
250250
if meta.Txn != nil {
251251
// Keep track of intent to resolve if older than the intent
252252
// expiration threshold.
253-
if hlc.Timestamp(meta.Timestamp).Less(intentExp) {
253+
if meta.Timestamp.ToTimestamp().Less(intentExp) {
254254
txnID := meta.Txn.ID
255255
if _, ok := txnMap[txnID]; !ok {
256256
txnMap[txnID] = &roachpb.Transaction{

pkg/kv/kvserver/gc/gc_old_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -101,7 +101,7 @@ func runGCOld(
101101
if meta.Txn != nil {
102102
// Keep track of intent to resolve if older than the intent
103103
// expiration threshold.
104-
if hlc.Timestamp(meta.Timestamp).Less(intentExp) {
104+
if meta.Timestamp.ToTimestamp().Less(intentExp) {
105105
txnID := meta.Txn.ID
106106
if _, ok := txnMap[txnID]; !ok {
107107
txnMap[txnID] = &roachpb.Transaction{

pkg/kv/kvserver/liveness/liveness.go

Lines changed: 2 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -825,8 +825,7 @@ func (nl *NodeLiveness) heartbeatInternal(
825825
// [*]: see TODO below about how errNodeAlreadyLive handling does not
826826
// enforce this guarantee.
827827
beforeQueueTS := nl.clock.Now()
828-
minExpiration := hlc.LegacyTimestamp(
829-
beforeQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0))
828+
minExpiration := beforeQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0).ToLegacyTimestamp()
830829

831830
// Before queueing, record the heartbeat as in-flight.
832831
nl.metrics.HeartbeatsInFlight.Inc(1)
@@ -873,8 +872,7 @@ func (nl *NodeLiveness) heartbeatInternal(
873872
// Grab a new clock reading to compute the new expiration time,
874873
// since we may have queued on the semaphore for a while.
875874
afterQueueTS := nl.clock.Now()
876-
newLiveness.Expiration = hlc.LegacyTimestamp(
877-
afterQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0))
875+
newLiveness.Expiration = afterQueueTS.Add(nl.livenessThreshold.Nanoseconds(), 0).ToLegacyTimestamp()
878876
// This guards against the system clock moving backwards. As long
879877
// as the cockroach process is running, checks inside hlc.Clock
880878
// will ensure that the clock never moves backwards, but these

pkg/kv/kvserver/liveness/liveness_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ func TestShouldReplaceLiveness(t *testing.T) {
4343
l := func(epo int64, expiration hlc.Timestamp, draining bool, membership string) Record {
4444
liveness := livenesspb.Liveness{
4545
Epoch: epo,
46-
Expiration: hlc.LegacyTimestamp(expiration),
46+
Expiration: expiration.ToLegacyTimestamp(),
4747
Draining: draining,
4848
Membership: toMembershipStatus(membership),
4949
}

pkg/kv/kvserver/node_liveness_test.go

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -25,7 +25,6 @@ import (
2525
"github.com/cockroachdb/cockroach/pkg/kv/kvserver/liveness/livenesspb"
2626
"github.com/cockroachdb/cockroach/pkg/roachpb"
2727
"github.com/cockroachdb/cockroach/pkg/testutils"
28-
"github.com/cockroachdb/cockroach/pkg/util/hlc"
2928
"github.com/cockroachdb/cockroach/pkg/util/leaktest"
3029
"github.com/cockroachdb/cockroach/pkg/util/log"
3130
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
@@ -196,7 +195,7 @@ func TestRedundantNodeLivenessHeartbeatsAvoided(t *testing.T) {
196195
livenessAfter, found := nl.Self()
197196
assert.True(t, found)
198197
exp := livenessAfter.Expiration
199-
minExp := hlc.LegacyTimestamp(before.Add(nlActive.Nanoseconds(), 0))
198+
minExp := before.Add(nlActive.Nanoseconds(), 0).ToLegacyTimestamp()
200199
if exp.Less(minExp) {
201200
return errors.Errorf("expected min expiration %v, found %v", minExp, exp)
202201
}
@@ -958,7 +957,7 @@ func TestNodeLivenessDecommissionAbsent(t *testing.T) {
958957
if err := mtc.dbs[0].CPut(ctx, keys.NodeLivenessKey(goneNodeID), &livenesspb.Liveness{
959958
NodeID: goneNodeID,
960959
Epoch: 1,
961-
Expiration: hlc.LegacyTimestamp(mtc.clock().Now()),
960+
Expiration: mtc.clock().Now().ToLegacyTimestamp(),
962961
Membership: livenesspb.MembershipStatus_ACTIVE,
963962
}, nil); err != nil {
964963
t.Fatal(err)

pkg/kv/kvserver/rangefeed/registry.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -348,7 +348,7 @@ func (r *registration) maybeRunCatchupScan() error {
348348
// immediately after) the provisional key.
349349
catchupIter.SeekGE(storage.MVCCKey{
350350
Key: unsafeKey.Key,
351-
Timestamp: hlc.Timestamp(meta.Timestamp).Prev(),
351+
Timestamp: meta.Timestamp.ToTimestamp().Prev(),
352352
})
353353
continue
354354
}

0 commit comments

Comments
 (0)