Skip to content

Commit 2ce7387

Browse files
authored
fix: Skip incremental tables when performing delete-stale (#13482)
This fixes a bug introduced with the migration to SDK v4 / Protocol v3; when `overwrite-delete-stale` write mode is used, stale entries should not be removed from incremental tables when a state backend is active.
1 parent 2ac61d1 commit 2ce7387

File tree

1 file changed

+13
-3
lines changed

1 file changed

+13
-3
lines changed

cli/cmd/sync_v3.go

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -34,7 +34,7 @@ type v3destination struct {
3434
func syncConnectionV3(ctx context.Context, source v3source, destinations []v3destination, backend *v3destination, uid string, noMigrate bool) error {
3535
var mt metrics.Metrics
3636
var exitReason = ExitReasonStopped
37-
tables := make(map[string]bool, 0)
37+
tablesForDeleteStale := make(map[string]bool, 0)
3838

3939
sourceSpec := source.spec
4040
sourceClient := source.client
@@ -200,6 +200,8 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
200200
}
201201
}()
202202

203+
isStateBackendEnabled := sourceSpec.BackendOptions != nil && sourceSpec.BackendOptions.TableName != ""
204+
203205
// Read from the sync stream and write to all destinations.
204206
totalResources := 0
205207
for {
@@ -242,7 +244,10 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
242244
return err
243245
}
244246
tableName := tableNameFromSchema(sc)
245-
tables[tableName] = true
247+
248+
if !isStateBackendEnabled || !tableIsIncremental(sc) {
249+
tablesForDeleteStale[tableName] = true
250+
}
246251
if noMigrate {
247252
continue
248253
}
@@ -274,7 +279,7 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
274279

275280
for i := range destinationsClients {
276281
if destinationSpecs[i].WriteMode == specs.WriteModeOverwriteDeleteStale {
277-
if err := deleteStale(writeClients[i], tables, sourceName, syncTime); err != nil {
282+
if err := deleteStale(writeClients[i], tablesForDeleteStale, sourceName, syncTime); err != nil {
278283
return err
279284
}
280285
}
@@ -313,6 +318,11 @@ func tableNameFromSchema(sc *arrow.Schema) string {
313318
return tableName
314319
}
315320

321+
func tableIsIncremental(sc *arrow.Schema) bool {
322+
inc, _ := sc.Metadata().GetValue("cq:extension:incremental")
323+
return inc == "true"
324+
}
325+
316326
func deleteStale(client plugin.Plugin_WriteClient, tables map[string]bool, sourceName string, syncTime time.Time) error {
317327
for tableName := range tables {
318328
if err := client.Send(&plugin.Write_Request{

0 commit comments

Comments
 (0)