-
Notifications
You must be signed in to change notification settings - Fork 4.1k
Expand file tree
/
Copy pathwriter.go
More file actions
321 lines (295 loc) · 11.7 KB
/
writer.go
File metadata and controls
321 lines (295 loc) · 11.7 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
// Copyright 2015 The Cockroach Authors.
//
// Use of this software is governed by the CockroachDB Software License
// included in the /LICENSE file.
package row
import (
"context"
"github.com/cockroachdb/cockroach/pkg/keys"
"github.com/cockroachdb/cockroach/pkg/roachpb"
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
"github.com/cockroachdb/cockroach/pkg/sql/rowenc/valueside"
"github.com/cockroachdb/cockroach/pkg/sql/sem/tree"
"github.com/cockroachdb/cockroach/pkg/util"
"github.com/cockroachdb/errors"
)
// This file contains common functions for the three writers, Inserter, Deleter
// and Updater.
// ColIDtoRowIndexFromCols groups a slice of ColumnDescriptors by their ID
// field, returning a map from ID to the index of the column in the input slice.
// It assumes there are no duplicate descriptors in the input.
func ColIDtoRowIndexFromCols(cols []catalog.Column) catalog.TableColMap {
var colIDtoRowIndex catalog.TableColMap
for i := range cols {
colIDtoRowIndex.Set(cols[i].GetID(), i)
}
return colIDtoRowIndex
}
// ColMapping returns a map from ordinals in the fromCols list to ordinals in
// the toCols list. More precisely, for 0 <= i < fromCols:
//
// result[i] = j such that fromCols[i].ID == toCols[j].ID, or
// -1 if the column is not part of toCols.
func ColMapping(fromCols, toCols []catalog.Column) []int {
// colMap is a map from ColumnID to ordinal into fromCols.
var colMap util.FastIntMap
for i := range fromCols {
colMap.Set(int(fromCols[i].GetID()), i)
}
result := make([]int, len(fromCols))
for i := range result {
// -1 value indicates that this column is not being returned.
result[i] = -1
}
// Set the appropriate index values for the returning columns.
for toOrd := range toCols {
if fromOrd, ok := colMap.Get(int(toCols[toOrd].GetID())); ok {
result[fromOrd] = toOrd
}
}
return result
}
// prepareInsertOrUpdateBatch constructs a KV batch that inserts or
// updates a row in KV in the primary index.
// - batch is the KV batch where commands should be appended.
// - helper is the rowHelper that knows about the table being modified.
// - primaryIndexKey is the PK prefix for the current row.
// - fetchedCols is the list of schema columns that have been fetched
// in preparation for this update.
// - values is the SQL-level row values that are being written.
// - valColIDMapping is the mapping from column IDs into positions of the slice
// values.
// - updatedColIDMapping is the mapping from column IDs into the positions of
// the updated values.
// - kvKey and kvValues must be heap-allocated scratch buffers to write
// roachpb.Key and roachpb.Value values.
// - rawValueBuf must be a scratch byte array. This must be reinitialized
// to an empty slice on each call but can be preserved at its current
// capacity to avoid allocations. The function returns the slice.
// - kvOp indicates which KV write operation should be used. If it is PutOp,
// it also indicates that the old keys have been locked.
// - mustValidateOldPKValues indicates whether the expected previous row must
// be verified (using CPut)
// - traceKV is to be set to log the KV operations added to the batch.
func prepareInsertOrUpdateBatch(
ctx context.Context,
batch Putter,
helper *RowHelper,
primaryIndexKey []byte,
fetchedCols []catalog.Column,
values []tree.Datum,
valColIDMapping catalog.TableColMap,
updatedColIDMapping catalog.TableColMap,
kvKey *roachpb.Key,
kvValue *roachpb.Value,
rawValueBuf []byte,
oth OriginTimestampCPutHelper,
oldValues []tree.Datum,
kvOp KVInsertOp,
mustValidateOldPKValues bool,
traceKV bool,
) ([]byte, error) {
families := helper.TableDesc.GetFamilies()
// TODO(ssd): We don't currently support multiple column
// families on the LDR write path. As a result, we don't have
// good end-to-end testing of multi-column family writes with
// the origin timestamp helper set. Until we write such tests,
// we error if we ever see such writes.
if oth.IsSet() && len(families) > 1 {
return nil, errors.AssertionFailedf("OriginTimestampCPutHelper is not yet testing with multi-column family writes")
}
var putFn func(context.Context, Putter, *roachpb.Key, *roachpb.Value, bool, *RowHelper, lazyIndexDirs)
var oldKeysLocked, overwrite bool
switch kvOp {
case CPutOp:
putFn = insertCPutFn
oldKeysLocked = false
overwrite = false
case PutOp:
putFn = insertPutFn
oldKeysLocked = true
overwrite = true
case PutMustAcquireExclusiveLockOp:
putFn = insertPutMustAcquireExclusiveLockFn
oldKeysLocked = false
overwrite = true
}
for i := range families {
family := &families[i]
update := false
for _, colID := range family.ColumnIDs {
if _, ok := updatedColIDMapping.Get(colID); ok {
update = true
break
}
}
// We can have an empty family.ColumnIDs in the following case:
// * A table is created with the primary key not in family 0, and another column in family 0.
// * The column in family 0 is dropped, leaving the 0'th family empty.
// In this case, we must keep the empty 0'th column family in order to ensure that column family 0
// is always encoded as the sentinel k/v for a row.
if !update && len(family.ColumnIDs) != 0 {
continue
}
if i > 0 {
// HACK: MakeFamilyKey appends to its argument, so on every loop iteration
// after the first, trim primaryIndexKey so nothing gets overwritten.
// TODO(dan): Instead of this, use something like engine.ChunkAllocator.
primaryIndexKey = primaryIndexKey[:len(primaryIndexKey):len(primaryIndexKey)]
}
*kvKey = keys.MakeFamilyKey(primaryIndexKey, uint32(family.ID))
// We need to ensure that column family 0 contains extra metadata, like composite primary key values.
// Additionally, the decoders expect that column family 0 is encoded with a TUPLE value tag, so we
// don't want to use the untagged value encoding.
if len(family.ColumnIDs) == 1 && family.ColumnIDs[0] == family.DefaultColumnID && family.ID != 0 {
// Storage optimization to store DefaultColumnID directly as a value. Also
// backwards compatible with the original BaseFormatVersion.
idx, ok := valColIDMapping.Get(family.DefaultColumnID)
if !ok {
continue
}
var marshaled roachpb.Value
var err error
typ := fetchedCols[idx].GetType()
// Skip any values with a default ID not stored in the primary index,
// which can happen if we are adding new columns.
skip, couldBeComposite := helper.SkipColumnNotInPrimaryIndexValue(family.DefaultColumnID, values[idx])
if skip {
// If the column could be composite, there could be a previous KV, so we
// still need to issue a Delete.
if !couldBeComposite {
continue
}
} else {
marshaled, err = valueside.MarshalLegacy(typ, values[idx])
if err != nil {
return nil, err
}
}
var oldVal []byte
if (oth.IsSet() || mustValidateOldPKValues) && len(oldValues) > 0 {
// If the column could be composite, we only encode the old value if it
// was a composite value.
if !couldBeComposite || oldValues[idx].(tree.CompositeDatum).IsComposite() {
old, err := valueside.MarshalLegacy(typ, oldValues[idx])
if err != nil {
return nil, err
}
if old.IsPresent() {
oldVal = old.TagAndDataBytes()
}
}
}
if !marshaled.IsPresent() {
if oth.IsSet() {
// If using OriginTimestamp'd CPuts, we _always_ want to issue a Delete
// so that we can confirm our expected bytes were correct.
oth.DelWithCPut(ctx, batch, kvKey, oldVal, traceKV)
} else if overwrite {
// If the new family contains a NULL value, then we must
// delete any pre-existing row.
if mustValidateOldPKValues {
delWithCPutFn(ctx, batch, kvKey, oldVal, traceKV, helper, primaryIndexDirs)
} else {
needsLock := !oldKeysLocked
delFn(ctx, batch, kvKey, needsLock, traceKV, helper, primaryIndexDirs)
}
}
} else {
// We only output non-NULL values. Non-existent column keys are
// considered NULL during scanning and the row sentinel ensures we know
// the row exists.
if err := helper.CheckRowSize(ctx, kvKey, marshaled.RawBytes, family.ID); err != nil {
return nil, err
}
if oth.IsSet() {
oth.CPutFn(ctx, batch, kvKey, &marshaled, oldVal, traceKV)
} else if mustValidateOldPKValues {
updateCPutFn(ctx, batch, kvKey, &marshaled, oldVal, traceKV, helper, primaryIndexDirs)
} else {
// TODO(yuzefovich): in case of multiple column families,
// whenever we locked the primary index during the initial
// scan, we might not have locked the key for a column
// family where all columns had NULL values (because the KV
// didn't exist) and now at least one becomes non-NULL. In
// this scenario we're inserting a new KV with non-locking
// Put, yet we don't have the lock.
//
// However, at the moment we disable the lock eliding
// optimization with multiple column families, so we'll use
// the locking Put because of that.
putFn(ctx, batch, kvKey, &marshaled, traceKV, helper, primaryIndexDirs)
}
}
continue
}
familySortedColumnIDs, ok := helper.SortedColumnFamily(family.ID)
if !ok {
return nil, errors.AssertionFailedf("invalid family sorted column id map")
}
rawValueBuf = rawValueBuf[:0]
var err error
rawValueBuf, err = helper.encodePrimaryIndexValuesToBuf(values, valColIDMapping, familySortedColumnIDs, fetchedCols, rawValueBuf)
if err != nil {
return nil, err
}
// TODO(ssd): Here and below investigate reducing the number of
// allocations required to marshal the old value.
//
// If we are using OriginTimestamp ConditionalPuts, calculate the expected
// value.
var expBytes []byte
if (oth.IsSet() || mustValidateOldPKValues) && len(oldValues) > 0 {
var oldBytes []byte
oldBytes, err = helper.encodePrimaryIndexValuesToBuf(oldValues, valColIDMapping, familySortedColumnIDs, fetchedCols, oldBytes)
if err != nil {
return nil, err
}
// For family 0, we expect a value even when
// no columns have been encoded to oldBytes.
if family.ID == 0 || len(oldBytes) > 0 {
old := &roachpb.Value{}
old.SetTuple(oldBytes)
expBytes = old.TagAndDataBytes()
}
}
if family.ID != 0 && len(rawValueBuf) == 0 {
if oth.IsSet() {
// If using OriginTimestamp'd CPuts, we _always_ want to issue a Delete
// so that we can confirm our expected bytes were correct.
oth.DelWithCPut(ctx, batch, kvKey, expBytes, traceKV)
} else if overwrite {
// The family might have already existed but every column in it is being
// set to NULL, so delete it.
if mustValidateOldPKValues {
delWithCPutFn(ctx, batch, kvKey, expBytes, traceKV, helper, primaryIndexDirs)
} else {
needsLock := !oldKeysLocked
delFn(ctx, batch, kvKey, needsLock, traceKV, helper, primaryIndexDirs)
}
}
} else {
// Copy the contents of rawValueBuf into the roachpb.Value. This is
// a deep copy so rawValueBuf can be re-used by other calls to the
// function.
kvValue.SetTuple(rawValueBuf)
if err := helper.CheckRowSize(ctx, kvKey, kvValue.RawBytes, family.ID); err != nil {
return nil, err
}
if oth.IsSet() {
oth.CPutFn(ctx, batch, kvKey, kvValue, expBytes, traceKV)
} else if mustValidateOldPKValues {
updateCPutFn(ctx, batch, kvKey, kvValue, expBytes, traceKV, helper, primaryIndexDirs)
} else {
putFn(ctx, batch, kvKey, kvValue, traceKV, helper, primaryIndexDirs)
}
}
// Release reference to roachpb.Key.
*kvKey = nil
// Prevent future calls to prepareInsertOrUpdateBatch from mutating
// the RawBytes in the kvValue we just added to the batch. Remember
// that we share the kvValue reference across calls to this function.
*kvValue = roachpb.Value{}
}
return rawValueBuf, nil
}