Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions .github/workflows/dest_test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,22 @@ defaults:

jobs:
build_lint_and_test:
services:
# Label used to access the service container
postgres:
image: postgres:11
env:
POSTGRES_PASSWORD: pass
POSTGRES_USER: postgres
POSTGRES_DB: postgres
ports:
- 5432:5432
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Set up Go 1.x
uses: actions/setup-go@v3
Expand Down
19 changes: 2 additions & 17 deletions .github/workflows/test_unit.yml
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ on:
pull_request:
branches:
- main
env:
env:
CGO_ENABLED: 0
CQ_NO_TELEMETRY: 1

Expand All @@ -26,21 +26,6 @@ jobs:
matrix: ${{ fromJson(needs.resolve-modules.outputs.matrix) }}
fail-fast: false
runs-on: ubuntu-latest
services:
postgres:
image: postgres:11
env:
POSTGRES_PASSWORD: pass
POSTGRES_USER: postgres
POSTGRES_DB: postgres
ports:
- 5432:5432
# Set health checks to wait until postgres has started
options: >-
--health-cmd pg_isready
--health-interval 10s
--health-timeout 5s
--health-retries 5
steps:
- name: Check out code into the Go module directory
uses: actions/checkout@v3
Expand Down Expand Up @@ -78,4 +63,4 @@ jobs:
- name: Test
if: steps.changed-files.outputs.any_changed == 'true'
run: go test ./...
working-directory: ${{ matrix.workdir }}
working-directory: ${{ matrix.workdir }}
66 changes: 45 additions & 21 deletions cli/cmd/sync.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (

"github.com/briandowns/spinner"
"github.com/cloudquery/cloudquery/cli/internal/plugins"
"github.com/cloudquery/plugin-sdk/schema"
"github.com/cloudquery/plugin-sdk/specs"
"github.com/rs/zerolog/log"
"github.com/spf13/cobra"
Expand Down Expand Up @@ -53,54 +52,62 @@ func sync(cmd *cobra.Command, args []string) error {
if len(sourceSpec.Destinations) == 0 {
return fmt.Errorf("no destinations found for source %s", sourceSpec.Name)
}
if err := syncConnection(ctx, pm, specReader, sourceSpec); err != nil {
var destinationsSpecs []specs.Destination
for _, destination := range sourceSpec.Destinations {
spec := specReader.GetDestinationByName(destination)
if spec == nil {
return fmt.Errorf("failed to find destination %s in source %s", destination, sourceSpec.Name)
}
destinationsSpecs = append(destinationsSpecs, *spec)
}
if err := syncConnection(ctx, pm, sourceSpec, destinationsSpecs); err != nil {
return fmt.Errorf("failed to sync source %s: %w", sourceSpec.Name, err)
}
}

return nil
}

func syncConnection(ctx context.Context, pm *plugins.PluginManager, specReader *specs.SpecReader, sourceSpec specs.Source) error {
sourcePlugin, err := pm.NewSourcePlugin(ctx, &sourceSpec)
func syncConnection(ctx context.Context, pm *plugins.PluginManager, sourceSpec specs.Source, destinationsSpecs []specs.Destination) error {
sourcePlugin, err := pm.NewSourcePlugin(ctx, sourceSpec.Registry, sourceSpec.Path, sourceSpec.Version)
if err != nil {
return fmt.Errorf("failed to get source plugin client for %s: %w", sourceSpec.Name, err)
}
defer sourcePlugin.Close()
sourceClient := sourcePlugin.GetClient()

destPlugins := make([]*plugins.DestinationPlugin, len(sourceSpec.Destinations))
destSubscriptions := make([]chan []byte, len(sourceSpec.Destinations))
for i := range destSubscriptions {
destSubscriptions[i] = make(chan []byte)
}
defer func() {
for _, destPlugin := range destPlugins {
if destPlugin != nil {
destPlugin.Close()
}
}
}()
for i, destination := range sourceSpec.Destinations {
spec := specReader.GetDestinationByName(destination)
if spec == nil {
return fmt.Errorf("failed to find destination %s in source %s", destination, sourceSpec.Name)
}
plugin, err := pm.NewDestinationPlugin(ctx, *spec)
for i, destinationSpec := range destinationsSpecs {
plugin, err := pm.NewDestinationPlugin(ctx, destinationSpec.Registry, destinationSpec.Path, destinationSpec.Version)
if err != nil {
return fmt.Errorf("failed to create destination plugin client for %s: %w", destination, err)
return fmt.Errorf("failed to create destination plugin client for %s: %w", destinationSpec.Name, err)
}
destPlugins[i] = plugin
if err := destPlugins[i].GetClient().Initialize(ctx, *spec); err != nil {
return fmt.Errorf("failed to initialize destination plugin client for %s: %w", destination, err)
if err := destPlugins[i].GetClient().Initialize(ctx, destinationSpec); err != nil {
return fmt.Errorf("failed to initialize destination plugin client for %s: %w", destinationSpec.Name, err)
}
tables, err := sourceClient.GetTables(ctx)
if err != nil {
return fmt.Errorf("failed to get tables for source %s: %w", sourceSpec.Name, err)
}

if err := destPlugins[i].GetClient().Migrate(ctx, tables); err != nil {
return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destination, err)
return fmt.Errorf("failed to migrate source %s on destination %s : %w", sourceSpec.Name, destinationSpec.Name, err)
}
}

resources := make(chan *schema.Resource)
resources := make(chan []byte)
g, ctx := errgroup.WithContext(ctx)
fmt.Println("Starting sync for: ", sourceSpec.Name, "->", sourceSpec.Destinations)
g.Go(func() error {
Expand All @@ -116,20 +123,37 @@ func syncConnection(ctx context.Context, pm *plugins.PluginManager, specReader *
format := " Syncing (%d resources) %s"
s.Suffix = fmt.Sprintf(format, 0, time.Duration(0))
s.Start()
failedWrites := 0
failedWrites := uint64(0)
totalResources := 0
for i, destination := range sourceSpec.Destinations {
i := i
destination := destination
g.Go(func() error {
var destFailedWrites uint64
var err error
if destFailedWrites, err = destPlugins[i].GetClient().Write(ctx, destSubscriptions[i]); err != nil {
log.Error().Err(err).Msgf("failed to write for %s->%s", sourceSpec.Name, destination)
}
failedWrites += destFailedWrites
return nil
})
}

g.Go(func() error {
for resource := range resources {
totalResources++
s.Suffix = fmt.Sprintf(format, totalResources, time.Since(startTime).Truncate(time.Second))
for i, destination := range sourceSpec.Destinations {
if err := destPlugins[i].GetClient().Write(ctx, resource.TableName, resource.Data); err != nil {
failedWrites++
log.Error().Err(err).Msgf("failed to write resource for %s->%s", sourceSpec.Name, destination)
for i := range destSubscriptions {
select {
case <-ctx.Done():
return ctx.Err()
case destSubscriptions[i] <- resource:
}
}
}

for i := range destSubscriptions {
close(destSubscriptions[i])
}
return nil
})

Expand Down
4 changes: 4 additions & 0 deletions cli/cmd/testdata/dest.test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
kind: destination
spec:
name: test
version: v1.1.0
7 changes: 0 additions & 7 deletions cli/cmd/testdata/gcp.cq.yml

This file was deleted.

6 changes: 0 additions & 6 deletions cli/cmd/testdata/pg.cq.yml

This file was deleted.

5 changes: 5 additions & 0 deletions cli/cmd/testdata/source.test.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
kind: source
spec:
name: test
destinations: [test]
version: v1.1.5
17 changes: 1 addition & 16 deletions cli/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -4,24 +4,18 @@ go 1.19

require (
github.com/briandowns/spinner v1.19.0
github.com/cloudquery/plugin-sdk v0.8.2
github.com/cloudquery/plugin-sdk v0.9.0
github.com/getsentry/sentry-go v0.13.0
github.com/google/go-cmp v0.5.9
github.com/jackc/pgtype v1.12.0
github.com/jackc/pgx/v4 v4.17.0
github.com/rs/zerolog v1.28.0
github.com/schollz/progressbar/v3 v3.9.0
github.com/spf13/cobra v1.5.0
github.com/spf13/viper v1.12.0
github.com/vgarvardt/pgx-google-uuid/v4 v4.0.0
golang.org/x/sync v0.0.0-20220907140024-f12130a52804
google.golang.org/grpc v1.49.0
)

require (
github.com/cloudquery/faker/v3 v3.7.7 // indirect
github.com/cpuguy83/go-md2man/v2 v2.0.2 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/fatih/color v1.13.0 // indirect
github.com/fsnotify/fsnotify v1.5.4 // indirect
github.com/ghodss/yaml v1.0.0 // indirect
Expand All @@ -31,13 +25,6 @@ require (
github.com/hashicorp/hcl v1.0.0 // indirect
github.com/iancoleman/strcase v0.2.0 // indirect
github.com/inconshreveable/mousetrap v1.0.1 // indirect
github.com/jackc/chunkreader/v2 v2.0.1 // indirect
github.com/jackc/pgconn v1.13.0 // indirect
github.com/jackc/pgio v1.0.0 // indirect
github.com/jackc/pgpassfile v1.0.0 // indirect
github.com/jackc/pgproto3/v2 v2.3.1 // indirect
github.com/jackc/pgservicefile v0.0.0-20200714003250-2b9c44734f2b // indirect
github.com/jackc/puddle v1.2.1 // indirect
github.com/magiconair/properties v1.8.6 // indirect
github.com/mattn/go-colorable v0.1.13 // indirect
github.com/mattn/go-isatty v0.0.16 // indirect
Expand All @@ -47,14 +34,12 @@ require (
github.com/modern-go/reflect2 v1.0.2 // indirect
github.com/pelletier/go-toml v1.9.5 // indirect
github.com/pelletier/go-toml/v2 v2.0.1 // indirect
github.com/pmezard/go-difflib v1.0.0 // indirect
github.com/rivo/uniseg v0.2.0 // indirect
github.com/russross/blackfriday/v2 v2.1.0 // indirect
github.com/spf13/afero v1.8.2 // indirect
github.com/spf13/cast v1.5.0 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/spf13/pflag v1.0.5 // indirect
github.com/stretchr/testify v1.8.0 // indirect
github.com/subosito/gotenv v1.3.0 // indirect
github.com/thoas/go-funk v0.9.2 // indirect
github.com/xeipuuv/gojsonpointer v0.0.0-20190905194746-02993c407bfb // indirect
Expand Down
Loading