@@ -11,25 +11,25 @@ import (
1111 "sync/atomic"
1212 "time"
1313
14+ cloudquery_api "github.com/cloudquery/cloudquery-api-go"
1415 "github.com/cloudquery/cloudquery-api-go/auth"
15- "github.com/cloudquery/cloudquery/cli/internal/analytics"
16- "github.com/cloudquery/cloudquery/cli/internal/api"
17- "github.com/cloudquery/cloudquery/cli/internal/specs/v0"
18- "github.com/cloudquery/cloudquery/cli/internal/tablenamechanger"
19- "github.com/cloudquery/cloudquery/cli/internal/transformer"
20- "github.com/cloudquery/cloudquery/cli/internal/transformerpipeline"
2116 "github.com/cloudquery/plugin-pb-go/managedplugin"
2217 "github.com/cloudquery/plugin-pb-go/metrics"
2318 "github.com/cloudquery/plugin-pb-go/pb/plugin/v3"
2419 "github.com/cloudquery/plugin-sdk/v4/schema"
20+ "github.com/google/uuid"
2521 "github.com/rs/zerolog/log"
2622 "github.com/schollz/progressbar/v3"
2723 "github.com/vnteamopen/godebouncer"
2824 "golang.org/x/sync/errgroup"
2925 "google.golang.org/protobuf/types/known/timestamppb"
3026
31- cloudquery_api "github.com/cloudquery/cloudquery-api-go"
32- "github.com/google/uuid"
27+ "github.com/cloudquery/cloudquery/cli/internal/analytics"
28+ "github.com/cloudquery/cloudquery/cli/internal/api"
29+ "github.com/cloudquery/cloudquery/cli/internal/specs/v0"
30+ "github.com/cloudquery/cloudquery/cli/internal/tablenamechanger"
31+ "github.com/cloudquery/cloudquery/cli/internal/transformer"
32+ "github.com/cloudquery/cloudquery/cli/internal/transformerpipeline"
3333)
3434
3535type v3source struct {
@@ -369,6 +369,9 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
369369 },
370370 }
371371 if err := writeClientsByName [destinationName ].Send (wr ); err != nil {
372+ // Close the transformations pipeline when the destination is closed.
373+ pipeline .Close ()
374+
372375 return handleSendError (err , writeClientsByName [destinationName ], "insert" )
373376 }
374377 return nil
@@ -384,9 +387,7 @@ func syncConnectionV3(ctx context.Context, source v3source, destinations []v3des
384387 // Close all transformation pipelines when the source is done
385388 defer func () {
386389 for _ , pipeline := range pipelineByDestinationName {
387- if err := pipeline .Close (); err != nil {
388- log .Warn ().Err (err ).Msg ("Failed to close transformer pipeline" )
389- }
390+ pipeline .Close ()
390391 }
391392 }()
392393 for {
0 commit comments