Skip to content

feat: New destination file plugin#4902

Closed
yevgenypats wants to merge 19 commits intomainfrom
feat/file_plugin
Closed

feat: New destination file plugin#4902
yevgenypats wants to merge 19 commits intomainfrom
feat/file_plugin

Conversation

@yevgenypats
Copy link
Copy Markdown
Contributor

@yevgenypats yevgenypats commented Nov 22, 2022

Fixes #4983

New destination plugin file will support multiple format (csv, json) with multiple storage backends such as local, S3, GCS, Azure Blob Storage.

This is ready now for initial review.

@yevgenypats yevgenypats changed the title Feat/file plugin feat: New destination file plugin Nov 22, 2022
@yevgenypats yevgenypats marked this pull request as ready for review November 22, 2022 21:45
@yevgenypats yevgenypats requested review from a team, disq, hermanschaaf and shimonp21 and removed request for a team and shimonp21 November 22, 2022 21:45
Copy link
Copy Markdown
Member

@disq disq left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good start, few nits and remarks.

Should consider adding in-line gzip and combined-file support once it's reviewed/merged/refactored before that blog post though :-) As you'll soon see high number of tables will be an issue when using with data lakes.

case BackendTypeS3:
awsCfg, err := config.LoadDefaultConfig(ctx)
if err != nil {
return nil, fmt.Errorf("unable to load AWS SDK config, %w", err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return nil, fmt.Errorf("unable to load AWS SDK config, %w", err)
return nil, fmt.Errorf("unable to load AWS SDK config: %w", err)

err conversion is wrapper text: %w

var err error
c.gcpStorageClient, err = storage.NewClient(ctx)
if err != nil {
return nil, fmt.Errorf("failed to create GCP storage client %w", err)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return nil, fmt.Errorf("failed to create GCP storage client %w", err)
return nil, fmt.Errorf("failed to create GCP storage client: %w", err)

c.awsUploader = manager.NewUploader(awsClient)
c.awsDownloader = manager.NewDownloader(awsClient)

if _, err := c.awsUploader.Upload(ctx, &s3.PutObjectInput{
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Test objects shouldn't be part of a "serious" release but they can stay for now.

defer f.Close()

for r := range resources {
b, err := json.Marshal(r)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In the future a drop in encoding/json replacement (eg. fastjson) can be used here.

"github.com/cloudquery/plugin-sdk/schema"
)

const maxJsonSize = 1024 * 1024 * 20
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is only used for reading/scanner yet the name doesn't suggest that. (Could also move the const defn inside the read method)

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

github.com/rs/zerolog v1.28.0
)

replace github.com/cloudquery/plugin-sdk => ../../../../plugin-sdk-split
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This needs to be removed before merge

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

)

const (
sentryDSN = "https://79d5e237dafe45e1a4ec0785bc528280@o1396617.ingest.sentry.io/4504083471335424"
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Consider replacing this before release (belongs to the CSV dest)

@yevgenypats
Copy link
Copy Markdown
Contributor Author

yevgenypats commented Nov 23, 2022

Closes: #4983

@@ -0,0 +1,124 @@
package azure_blob_storage
Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can we keep the directory also azure_blog_storage instead of azure-blog-storage ?

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Fixed

kodiakhq bot pushed a commit to cloudquery/plugin-sdk that referenced this pull request Nov 24, 2022
Added another option needed for destination testing. Needed by this PR: cloudquery/cloudquery#4902

Also, added a `FlattenTables` function for `Tables` type
kodiakhq bot pushed a commit that referenced this pull request Nov 24, 2022
This is a snowflake destination plugin.

This support streaming directly to snowflake (via local csv uploads as streaming insert is not supporting all snowflake data-types as of right now).

The streaming way (or psudo streaming) is useful to get started and to test stuff locally but for production usages it is recommended to use standard data-pipelines that uploads first csv/json to a remote storage (S3, GCS, Azure Blob...) and then via a periodic job or snowpipe to load it to Snowflake DB will be the most performant and cheap way to do at scale.

For the latter, this PR needs to be implemented and tested #4902

Known Issue: Migrations are slow due to currently not using any batching to get all the tables. I've an idea on how to solve it but maybe will do it in a follow-up PR.
Copy link
Copy Markdown
Member

@erezrokah erezrokah left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@@ -0,0 +1,2 @@
file
cq_csv_output No newline at end of file
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
cq_csv_output
cq_file_output

Comment on lines +1 to +14
variables:
component: destination/csv
binary: csv

project_name: plugins/destination/csv

monorepo:
tag_prefix: plugins-destination-csv-
dir: plugins/destination/csv

includes:
- from_file:
# Relative to the directory Go Releaser is run from (which is the root of the repository)
path: ./plugins/.goreleaser.yaml No newline at end of file
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
variables:
component: destination/csv
binary: csv
project_name: plugins/destination/csv
monorepo:
tag_prefix: plugins-destination-csv-
dir: plugins/destination/csv
includes:
- from_file:
# Relative to the directory Go Releaser is run from (which is the root of the repository)
path: ./plugins/.goreleaser.yaml
variables:
component: destination/file
binary: file
project_name: plugins/destination/file
monorepo:
tag_prefix: plugins-destination-file-
dir: plugins/destination/file
includes:
- from_file:
# Relative to the directory Go Releaser is run from (which is the root of the repository)
path: ./plugins/.goreleaser.yaml

@@ -0,0 +1,2 @@
# Changelog
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You don't need to add this, it will be created by release please

Comment on lines +14 to +22
kind: destination
spec:
name: "csv"
path: "cloudquery/csv"
version: "v1.0.1" # latest version of csv plugin
write_mode: "append" # CSV only supports 'append' mode

spec:
directory: './cq_csv_output'
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
kind: destination
spec:
name: "csv"
path: "cloudquery/csv"
version: "v1.0.1" # latest version of csv plugin
write_mode: "append" # CSV only supports 'append' mode
spec:
directory: './cq_csv_output'
kind: destination
spec:
name: "file"
path: "cloudquery/file"
version: "v1.0.0" # latest version of file plugin
write_mode: "append" # File plugin only supports 'append' mode
spec:
directory: './cq_file_output'

directory: './cq_csv_output'
```

## CSV Spec
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
## CSV Spec
## File Spec


## CSV Spec

This is the (nested) spec used by the CSV destination Plugin.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
This is the (nested) spec used by the CSV destination Plugin.
This is the (nested) spec used by the File destination Plugin.


This is the (nested) spec used by the CSV destination Plugin.

- `directory` (string) (optional, defaults to `./cq_csv_output`)
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
- `directory` (string) (optional, defaults to `./cq_csv_output`)
- `directory` (string) (optional, defaults to `./cq_file_output`)


- `directory` (string) (optional, defaults to `./cq_csv_output`)

Directory where all CSV files will be written. A CSV file will be created per table.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
Directory where all CSV files will be written. A CSV file will be created per table.
Directory where all files will be written. A file will be created per table.


if _, err := c.awsUploader.Upload(ctx, &s3.PutObjectInput{
Bucket: aws.String(c.baseDir),
Key: aws.String(c.path + "/cq-test-file"),
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should it be here?

return v.String()
}

func (*Client) transformCSVMacaddrArray(v *schema.MacaddrArray) interface{} {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

can it be just a single func?

func (*Client) transform(v fmt.Stringer) any {
	return v.String()
]

(even more. so, make ret value a string in signature?

)

func (*Client) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error {
return fmt.Errorf("csv destination doesn't support overwrite-delete-stale mode. please use append mode")
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
return fmt.Errorf("csv destination doesn't support overwrite-delete-stale mode. please use append mode")
return fmt.Errorf("csv destination doesn't support overwrite-delete-stale mode, use append mode instead")

"github.com/cloudquery/plugin-sdk/schema"
)

const maxJsonSize = 1024 * 1024 * 20
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

case FormatTypeCSV:
go func() {
defer wg.Done()
c.writeCSVResource(ctx, t.Name, workers[t.Name].writeChan)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

don't do a lookup, just do

worker := &worker{writeChan: make(chan any)}
workers[t.Name] = worker

above and then just use worker

}

for r := range res {
workers[r.TableName].writeChan <- r.Data
Copy link
Copy Markdown
Contributor

@candiduslynx candiduslynx Dec 17, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

shouldn't it be done in parallel?

github.com/rs/zerolog v1.28.0
)

replace github.com/cloudquery/plugin-sdk => ../../../../plugin-sdk-split
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@vercel
Copy link
Copy Markdown

vercel bot commented Dec 27, 2022

The latest updates on your projects. Learn more about Vercel for Git ↗︎

Name Status Preview Updated
cloudquery-web 🔄 Building (Inspect) Dec 27, 2022 at 8:32AM (UTC)

@yevgenypats
Copy link
Copy Markdown
Contributor Author

Closing. Done here: #6096

@hermanschaaf hermanschaaf deleted the feat/file_plugin branch November 21, 2023 08:48
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

plugin: File destination plugin

5 participants