Skip to content

Commit 2ca7454

Browse files
author
Marius Posta
authored
Merge pull request #83816 from postamar/backport22.1-83544
2 parents adb6636 + fb86e4d commit 2ca7454

7 files changed

Lines changed: 133 additions & 23 deletions

File tree

pkg/sql/backfill.go

Lines changed: 24 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -92,6 +92,16 @@ var columnBackfillBatchSize = settings.RegisterIntSetting(
9292
settings.NonNegativeInt, /* validateFn */
9393
)
9494

95+
// columnBackfillUpdateChunkSizeThresholdBytes is the byte size threshold beyond which
96+
// an update batch is run at once when adding or removing columns.
97+
var columnBackfillUpdateChunkSizeThresholdBytes = settings.RegisterIntSetting(
98+
settings.TenantWritable,
99+
"bulkio.column_backfill.update_chunk_size_threshold_bytes",
100+
"the batch size in bytes above which an update is immediately run when adding/removing columns",
101+
10<<20, /* 10 MiB */
102+
settings.NonNegativeInt, /* validateFn */
103+
)
104+
95105
var _ sort.Interface = columnsByID{}
96106
var _ sort.Interface = indexesByID{}
97107

@@ -1187,6 +1197,7 @@ func (sc *SchemaChanger) distColumnBackfill(
11871197
ctx context.Context,
11881198
version descpb.DescriptorVersion,
11891199
backfillChunkSize int64,
1200+
backfillUpdateChunkSizeThresholdBytes uint64,
11901201
filter backfill.MutationFilter,
11911202
) error {
11921203
duration := checkpointInterval
@@ -1281,7 +1292,7 @@ func (sc *SchemaChanger) distColumnBackfill(
12811292

12821293
planCtx := sc.distSQLPlanner.NewPlanningCtx(ctx, &evalCtx, nil /* planner */, txn,
12831294
DistributionTypeSystemTenantOnly)
1284-
spec, err := initColumnBackfillerSpec(*tableDesc.TableDesc(), duration, chunkSize, readAsOf)
1295+
spec, err := initColumnBackfillerSpec(tableDesc, duration, chunkSize, backfillUpdateChunkSizeThresholdBytes, readAsOf)
12851296
if err != nil {
12861297
return err
12871298
}
@@ -2149,8 +2160,12 @@ func (sc *SchemaChanger) truncateAndBackfillColumns(
21492160
log.Infof(ctx, "clearing and backfilling columns")
21502161

21512162
if err := sc.distColumnBackfill(
2152-
ctx, version, columnBackfillBatchSize.Get(&sc.settings.SV),
2153-
backfill.ColumnMutationFilter); err != nil {
2163+
ctx,
2164+
version,
2165+
columnBackfillBatchSize.Get(&sc.settings.SV),
2166+
uint64(columnBackfillUpdateChunkSizeThresholdBytes.Get(&sc.settings.SV)),
2167+
backfill.ColumnMutationFilter,
2168+
); err != nil {
21542169
return err
21552170
}
21562171
log.Info(ctx, "finished clearing and backfilling columns")
@@ -2579,9 +2594,12 @@ func columnBackfillInTxn(
25792594
sp := tableDesc.PrimaryIndexSpan(evalCtx.Codec)
25802595
for sp.Key != nil {
25812596
var err error
2582-
sp.Key, err = backfiller.RunColumnBackfillChunk(ctx,
2583-
txn, tableDesc, sp, rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV)),
2584-
false /*alsoCommit*/, traceKV)
2597+
scanBatchSize := rowinfra.RowLimit(columnBackfillBatchSize.Get(&evalCtx.Settings.SV))
2598+
updateChunkSizeThresholdBytes := rowinfra.BytesLimit(columnBackfillUpdateChunkSizeThresholdBytes.Get(&evalCtx.Settings.SV))
2599+
const alsoCommit = false
2600+
sp.Key, err = backfiller.RunColumnBackfillChunk(
2601+
ctx, txn, tableDesc, sp, scanBatchSize, updateChunkSizeThresholdBytes, alsoCommit, traceKV,
2602+
)
25852603
if err != nil {
25862604
return err
25872605
}

pkg/sql/backfill/backfill.go

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -267,6 +267,7 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
267267
tableDesc catalog.TableDescriptor,
268268
sp roachpb.Span,
269269
chunkSize rowinfra.RowLimit,
270+
updateChunkSizeThresholdBytes rowinfra.BytesLimit,
270271
alsoCommit bool,
271272
traceKV bool,
272273
) (roachpb.Key, error) {
@@ -375,6 +376,20 @@ func (cb *ColumnBackfiller) RunColumnBackfillChunk(
375376
); err != nil {
376377
return roachpb.Key{}, err
377378
}
379+
380+
// Exit early to flush if the batch byte size exceeds a predefined
381+
// threshold. This can happen when table rows are more on the "fat" side,
382+
// typically with large BYTES or JSONB columns.
383+
//
384+
// This helps prevent exceedingly large raft commands which will
385+
// for instance cause schema changes to be unable to either proceed or to
386+
// roll back.
387+
//
388+
// The threshold is ignored when zero.
389+
//
390+
if updateChunkSizeThresholdBytes > 0 && b.ApproximateMutationBytes() >= int(updateChunkSizeThresholdBytes) {
391+
break
392+
}
378393
}
379394
// Write the new row values.
380395
writeBatch := txn.Run

pkg/sql/distsql_plan_backfill.go

Lines changed: 12 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"unsafe"
1717

1818
"github.com/cockroachdb/cockroach/pkg/roachpb"
19+
"github.com/cockroachdb/cockroach/pkg/sql/catalog"
1920
"github.com/cockroachdb/cockroach/pkg/sql/catalog/descpb"
2021
"github.com/cockroachdb/cockroach/pkg/sql/execinfrapb"
2122
"github.com/cockroachdb/cockroach/pkg/sql/physicalplan"
@@ -26,14 +27,19 @@ import (
2627
)
2728

2829
func initColumnBackfillerSpec(
29-
desc descpb.TableDescriptor, duration time.Duration, chunkSize int64, readAsOf hlc.Timestamp,
30+
tbl catalog.TableDescriptor,
31+
duration time.Duration,
32+
chunkSize int64,
33+
updateChunkSizeThresholdBytes uint64,
34+
readAsOf hlc.Timestamp,
3035
) (execinfrapb.BackfillerSpec, error) {
3136
return execinfrapb.BackfillerSpec{
32-
Table: desc,
33-
Duration: duration,
34-
ChunkSize: chunkSize,
35-
ReadAsOf: readAsOf,
36-
Type: execinfrapb.BackfillerSpec_Column,
37+
Table: *tbl.TableDesc(),
38+
Duration: duration,
39+
ChunkSize: chunkSize,
40+
UpdateChunkSizeThresholdBytes: updateChunkSizeThresholdBytes,
41+
ReadAsOf: readAsOf,
42+
Type: execinfrapb.BackfillerSpec_Column,
3743
}, nil
3844
}
3945

pkg/sql/execinfrapb/processors_bulk_io.proto

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -62,6 +62,10 @@ message BackfillerSpec {
6262
// of entries backfilled per chunk.
6363
optional int64 chunk_size = 5 [(gogoproto.nullable) = false];
6464

65+
// The column backfiller will run an update batch immediately
66+
// once its estimated byte size reaches UpdateChunkSizeThresholdBytes, if nonzero.
67+
optional uint64 update_chunk_size_threshold_bytes = 14 [(gogoproto.nullable) = false];
68+
6569
// WriteAsOf is the time that the backfill entries should be written.
6670
// Note: Older nodes may also use this as the read time instead of readAsOf.
6771
optional util.hlc.Timestamp writeAsOf = 7 [(gogoproto.nullable) = false];
@@ -86,7 +90,7 @@ message BackfillerSpec {
8690
// check MVCCAddSSTable before setting this option.
8791
optional bool write_at_batch_timestamp = 12 [(gogoproto.nullable) = false];
8892

89-
// NEXTID: 14.
93+
// NEXTID: 15.
9094
}
9195

9296
// JobProgress identifies the job to report progress on. This reporting

pkg/sql/logictest/testdata/logic_test/alter_table

Lines changed: 65 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -2184,6 +2184,66 @@ SELECT * FROM multipleinstmt ORDER BY id ASC;
21842184
2 b b false NULL true NULL
21852185
3 c c false NULL true NULL
21862186

2187+
subtest column_backfiller_update_batching
2188+
2189+
let $use_decl_sc
2190+
SHOW use_declarative_schema_changer
2191+
2192+
statement ok
2193+
SET use_declarative_schema_changer = 'off';
2194+
2195+
statement ok
2196+
BEGIN;
2197+
CREATE TABLE tb AS SELECT 123::INT AS k FROM generate_series(1, 10);
2198+
SET tracing = on,kv;
2199+
ALTER TABLE tb ADD COLUMN v STRING NOT NULL DEFAULT ('abc'::STRING);
2200+
SET tracing = off;
2201+
2202+
# Check that the column backfiller batches all its Puts into one batch.
2203+
query I
2204+
SELECT count(*) FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%sending batch%' AND message LIKE '% Put to %';
2205+
----
2206+
1
2207+
2208+
query I
2209+
SELECT count(*) FROM tb WHERE v = 'abc';
2210+
----
2211+
10
2212+
2213+
statement ok
2214+
ROLLBACK;
2215+
2216+
# Bring the threshold way down to force column backfiller batches to have no more 1 Put each.
2217+
statement ok
2218+
SET CLUSTER SETTING bulkio.column_backfill.update_chunk_size_threshold_bytes = 1;
2219+
2220+
statement ok
2221+
BEGIN;
2222+
CREATE TABLE tb AS SELECT 123::INT AS k FROM generate_series(1, 10);
2223+
SET tracing = on,kv;
2224+
ALTER TABLE tb ADD COLUMN v STRING NOT NULL DEFAULT ('abc'::STRING);
2225+
SET tracing = off;
2226+
2227+
query I
2228+
SELECT count(*) FROM [SHOW KV TRACE FOR SESSION] WHERE message LIKE '%sending batch%' AND message LIKE '% Put to %';
2229+
----
2230+
10
2231+
2232+
query I
2233+
SELECT count(*) FROM tb WHERE v = 'abc';
2234+
----
2235+
10
2236+
2237+
statement ok
2238+
ROLLBACK;
2239+
2240+
# Undo subtest side effects.
2241+
statement ok
2242+
RESET CLUSTER SETTING bulkio.column_backfill.update_chunk_size_threshold_bytes;
2243+
2244+
statement ok
2245+
SET use_declarative_schema_changer = $use_decl_sc;
2246+
21872247
subtest storage_params
21882248

21892249
statement ok
@@ -2294,8 +2354,8 @@ FROM (
22942354
LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid;
22952355
----
22962356
table_id name state refobjid
2297-
205 test_serial_b_seq PUBLIC 204
2298-
204 test_serial PUBLIC NULL
2357+
207 test_serial_b_seq PUBLIC 206
2358+
206 test_serial PUBLIC NULL
22992359

23002360
statement ok
23012361
DROP TABLE test_serial;
@@ -2329,8 +2389,8 @@ FROM (
23292389
LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid;
23302390
----
23312391
table_id name state refobjid
2332-
207 test_serial_b_seq PUBLIC 206
2333-
206 test_serial PUBLIC NULL
2392+
209 test_serial_b_seq PUBLIC 208
2393+
208 test_serial PUBLIC NULL
23342394

23352395
statement ok
23362396
ALTER TABLE test_serial DROP COLUMN b;
@@ -2345,7 +2405,7 @@ FROM (
23452405
LEFT JOIN pg_catalog.pg_depend r ON l.table_id = r.objid;
23462406
----
23472407
table_id name state refobjid
2348-
206 test_serial PUBLIC NULL
2408+
208 test_serial PUBLIC NULL
23492409

23502410
statement ok
23512411
DROP TABLE test_serial;

pkg/sql/rowexec/backfiller.go

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -47,6 +47,7 @@ type chunkBackfiller interface {
4747
ctx context.Context,
4848
span roachpb.Span,
4949
chunkSize rowinfra.RowLimit,
50+
updateChunkSizeThresholdBytes rowinfra.BytesLimit,
5051
readAsOf hlc.Timestamp,
5152
) (roachpb.Key, error)
5253

@@ -135,6 +136,8 @@ func (b *backfiller) mainLoop(ctx context.Context) (roachpb.Spans, error) {
135136
// fill more than this amount and cause a flush, then it likely also fills
136137
// a non-trivial part of the next buffer.
137138
const opportunisticCheckpointThreshold = 0.8
139+
chunkSize := rowinfra.RowLimit(b.spec.ChunkSize)
140+
updateChunkSizeThresholdBytes := rowinfra.BytesLimit(b.spec.UpdateChunkSizeThresholdBytes)
138141
start := timeutil.Now()
139142
totalChunks := 0
140143
totalSpans := 0
@@ -148,7 +151,7 @@ func (b *backfiller) mainLoop(ctx context.Context) (roachpb.Spans, error) {
148151
for todo.Key != nil {
149152
log.VEventf(ctx, 3, "%s backfiller starting chunk %d: %s", b.name, chunks, todo)
150153
var err error
151-
todo.Key, err = b.chunks.runChunk(ctx, todo, rowinfra.RowLimit(b.spec.ChunkSize), b.spec.ReadAsOf)
154+
todo.Key, err = b.chunks.runChunk(ctx, todo, chunkSize, updateChunkSizeThresholdBytes, b.spec.ReadAsOf)
152155
if err != nil {
153156
return nil, err
154157
}

pkg/sql/rowexec/columnbackfiller.go

Lines changed: 8 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -102,7 +102,11 @@ func (cb *columnBackfiller) CurrentBufferFill() float32 {
102102

103103
// runChunk implements the chunkBackfiller interface.
104104
func (cb *columnBackfiller) runChunk(
105-
ctx context.Context, sp roachpb.Span, chunkSize rowinfra.RowLimit, _ hlc.Timestamp,
105+
ctx context.Context,
106+
sp roachpb.Span,
107+
chunkSize rowinfra.RowLimit,
108+
updateChunkSizeThresholdBytes rowinfra.BytesLimit,
109+
_ hlc.Timestamp,
106110
) (roachpb.Key, error) {
107111
var key roachpb.Key
108112
var commitWaitFn func(context.Context) error
@@ -123,16 +127,16 @@ func (cb *columnBackfiller) runChunk(
123127
// waiting for consistency when backfilling a column on GLOBAL tables.
124128
commitWaitFn = txn.DeferCommitWait(ctx)
125129

126-
// TODO(knz): do KV tracing in DistSQL processors.
127130
var err error
128131
key, err = cb.RunColumnBackfillChunk(
129132
ctx,
130133
txn,
131134
cb.desc,
132135
sp,
133136
chunkSize,
134-
true, /*alsoCommit*/
135-
false, /*traceKV*/
137+
updateChunkSizeThresholdBytes,
138+
true, /*alsoCommit*/
139+
cb.flowCtx.TraceKV,
136140
)
137141
return err
138142
})

0 commit comments

Comments
 (0)