@@ -92,6 +92,16 @@ var columnBackfillBatchSize = settings.RegisterIntSetting(
9292 settings .NonNegativeInt , /* validateFn */
9393)
9494
95+ // columnBackfillUpdateChunkSizeThresholdBytes is the byte size threshold beyond which
96+ // an update batch is run at once when adding or removing columns.
97+ var columnBackfillUpdateChunkSizeThresholdBytes = settings .RegisterIntSetting (
98+ settings .TenantWritable ,
99+ "bulkio.column_backfill.update_chunk_size_threshold_bytes" ,
100+ "the batch size in bytes above which an update is immediately run when adding/removing columns" ,
101+ 10 << 20 , /* 10 MiB */
102+ settings .NonNegativeInt , /* validateFn */
103+ )
104+
95105var _ sort.Interface = columnsByID {}
96106var _ sort.Interface = indexesByID {}
97107
@@ -1187,6 +1197,7 @@ func (sc *SchemaChanger) distColumnBackfill(
11871197 ctx context.Context ,
11881198 version descpb.DescriptorVersion ,
11891199 backfillChunkSize int64 ,
1200+ backfillUpdateChunkSizeThresholdBytes uint64 ,
11901201 filter backfill.MutationFilter ,
11911202) error {
11921203 duration := checkpointInterval
@@ -1281,7 +1292,7 @@ func (sc *SchemaChanger) distColumnBackfill(
12811292
12821293 planCtx := sc .distSQLPlanner .NewPlanningCtx (ctx , & evalCtx , nil /* planner */ , txn ,
12831294 DistributionTypeSystemTenantOnly )
1284- spec , err := initColumnBackfillerSpec (* tableDesc . TableDesc () , duration , chunkSize , readAsOf )
1295+ spec , err := initColumnBackfillerSpec (tableDesc , duration , chunkSize , backfillUpdateChunkSizeThresholdBytes , readAsOf )
12851296 if err != nil {
12861297 return err
12871298 }
@@ -2149,8 +2160,12 @@ func (sc *SchemaChanger) truncateAndBackfillColumns(
21492160 log .Infof (ctx , "clearing and backfilling columns" )
21502161
21512162 if err := sc .distColumnBackfill (
2152- ctx , version , columnBackfillBatchSize .Get (& sc .settings .SV ),
2153- backfill .ColumnMutationFilter ); err != nil {
2163+ ctx ,
2164+ version ,
2165+ columnBackfillBatchSize .Get (& sc .settings .SV ),
2166+ uint64 (columnBackfillUpdateChunkSizeThresholdBytes .Get (& sc .settings .SV )),
2167+ backfill .ColumnMutationFilter ,
2168+ ); err != nil {
21542169 return err
21552170 }
21562171 log .Info (ctx , "finished clearing and backfilling columns" )
@@ -2579,9 +2594,12 @@ func columnBackfillInTxn(
25792594 sp := tableDesc .PrimaryIndexSpan (evalCtx .Codec )
25802595 for sp .Key != nil {
25812596 var err error
2582- sp .Key , err = backfiller .RunColumnBackfillChunk (ctx ,
2583- txn , tableDesc , sp , rowinfra .RowLimit (columnBackfillBatchSize .Get (& evalCtx .Settings .SV )),
2584- false /*alsoCommit*/ , traceKV )
2597+ scanBatchSize := rowinfra .RowLimit (columnBackfillBatchSize .Get (& evalCtx .Settings .SV ))
2598+ updateChunkSizeThresholdBytes := rowinfra .BytesLimit (columnBackfillUpdateChunkSizeThresholdBytes .Get (& evalCtx .Settings .SV ))
2599+ const alsoCommit = false
2600+ sp .Key , err = backfiller .RunColumnBackfillChunk (
2601+ ctx , txn , tableDesc , sp , scanBatchSize , updateChunkSizeThresholdBytes , alsoCommit , traceKV ,
2602+ )
25852603 if err != nil {
25862604 return err
25872605 }
0 commit comments