Skip to content

Commit 25682d6

Browse files
fix: Close Transformers pipeline on both source and destination closes (#19105)
1 parent ff06b18 commit 25682d6

3 files changed

Lines changed: 30 additions & 16 deletions

File tree

cli/cmd/sync_v3.go

Lines changed: 12 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -11,25 +11,25 @@ import (
1111
"sync/atomic"
1212
"time"
1313

14+
cloudquery_api "github.com/cloudquery/cloudquery-api-go"
1415
"github.com/cloudquery/cloudquery-api-go/auth"
15-
"github.com/cloudquery/cloudquery/cli/internal/analytics"
16-
"github.com/cloudquery/cloudquery/cli/internal/api"
17-
"github.com/cloudquery/cloudquery/cli/internal/specs/v0"
18-
"github.com/cloudquery/cloudquery/cli/internal/tablenamechanger"
19-
"github.com/cloudquery/cloudquery/cli/internal/transformer"
20-
"github.com/cloudquery/cloudquery/cli/internal/transformerpipeline"
2116
"github.com/cloudquery/plugin-pb-go/managedplugin"
2217
"github.com/cloudquery/plugin-pb-go/metrics"
2318
"github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
2419
"github.com/cloudquery/plugin-sdk/v4/schema"
20+
"github.com/google/uuid"
2521
"github.com/rs/zerolog/log"
2622
"github.com/schollz/progressbar/v3"
2723
"github.com/vnteamopen/godebouncer"
2824
"golang.org/x/sync/errgroup"
2925
"google.golang.org/protobuf/types/known/timestamppb"
3026

31-
cloudquery_api "github.com/cloudquery/cloudquery-api-go"
32-
"github.com/google/uuid"
27+
"github.com/cloudquery/cloudquery/cli/internal/analytics"
28+
"github.com/cloudquery/cloudquery/cli/internal/api"
29+
"github.com/cloudquery/cloudquery/cli/internal/specs/v0"
30+
"github.com/cloudquery/cloudquery/cli/internal/tablenamechanger"
31+
"github.com/cloudquery/cloudquery/cli/internal/transformer"
32+
"github.com/cloudquery/cloudquery/cli/internal/transformerpipeline"
3333
)
3434

3535
type v3source struct {
@@ -369,6 +369,9 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
369369
},
370370
}
371371
if err := writeClientsByName[destinationName].Send(wr); err != nil {
372+
// Close the transformations pipeline when the destination is closed.
373+
pipeline.Close()
374+
372375
return handleSendError(err, writeClientsByName[destinationName], "insert")
373376
}
374377
return nil
@@ -384,9 +387,7 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
384387
// Close all transformation pipelines when the source is done
385388
defer func() {
386389
for _, pipeline := range pipelineByDestinationName {
387-
if err := pipeline.Close(); err != nil {
388-
log.Warn().Err(err).Msg("Failed to close transformer pipeline")
389-
}
390+
pipeline.Close()
390391
}
391392
}()
392393
for {

cli/internal/transformerpipeline/pipeline.go

Lines changed: 15 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ func (lp *TransformerPipeline) Send(data []byte) error {
6868
if lp.clientWrappers[len(lp.clientWrappers)-1].nextSendFn == nil {
6969
return errors.New("OnOutput must be registered before Send is called, otherwise what do I do with the transformed data?")
7070
}
71+
72+
if lp.clientWrappers[0].isClosed {
73+
return errors.New("cannot send data to a closed pipeline")
74+
}
75+
7176
return lp.clientWrappers[0].client.Send(&plugin.Transform_Request{Record: data})
7277
}
7378

@@ -81,16 +86,24 @@ func (lp *TransformerPipeline) OnOutput(fn func([]byte) error) error {
8186
return nil
8287
}
8388

84-
func (lp *TransformerPipeline) Close() error {
89+
func (lp *TransformerPipeline) Close() {
90+
// Closing the pipeline happens on both source as well as destination close.
91+
// Not handling this will result in a close of closed channel panic.
92+
if lp.clientWrappers[0].isClosed {
93+
return
94+
}
95+
8596
// Close the first transformer. The rest will follow gracefully, otherwise records will be lost.
86-
return lp.clientWrappers[0].client.CloseSend()
97+
lp.clientWrappers[0].client.CloseSend()
98+
lp.clientWrappers[0].isClosed = true
8799
}
88100

89101
type clientWrapper struct {
90102
i int
91103
client plugin.Plugin_TransformClient
92104
nextSendFn func(*plugin.Transform_Request) error
93105
nextClose func() error
106+
isClosed bool
94107
}
95108

96109
func (s clientWrapper) startBlocking() error {

cli/internal/transformerpipeline/transformerpipeline_test.go

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -35,7 +35,7 @@ func TestTransformerPipelineDoesntChangeInputsWithTwoIdentityTransformers(t *tes
3535
for _, input := range inputs {
3636
require.NoError(t, pipeline.Send([]byte(input)))
3737
}
38-
require.NoError(t, pipeline.Close())
38+
pipeline.Close()
3939
}()
4040

4141
// Blocks until pipeline is closed and all messages passed through
@@ -70,7 +70,7 @@ func TestTransformerPipelineReversesInputs(t *testing.T) {
7070
for _, input := range inputs {
7171
require.NoError(t, pipeline.Send([]byte(input)))
7272
}
73-
require.NoError(t, pipeline.Close())
73+
pipeline.Close()
7474
}()
7575

7676
// Blocks until pipeline is closed and all messages passed through
@@ -105,7 +105,7 @@ func TestTransformerPipelineDoesntChangeInputsWithTwoReversers(t *testing.T) {
105105
for _, input := range inputs {
106106
require.NoError(t, pipeline.Send([]byte(input)))
107107
}
108-
require.NoError(t, pipeline.Close())
108+
pipeline.Close()
109109
}()
110110

111111
// Blocks until pipeline is closed and all messages passed through

0 commit comments

Comments
 (0)