Conversation
disq
left a comment
There was a problem hiding this comment.
Good start, few nits and remarks.
Should consider adding in-line gzip and combined-file support once it's reviewed/merged/refactored before that blog post though :-) As you'll soon see high number of tables will be an issue when using with data lakes.
| case BackendTypeS3: | ||
| awsCfg, err := config.LoadDefaultConfig(ctx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("unable to load AWS SDK config, %w", err) |
There was a problem hiding this comment.
| return nil, fmt.Errorf("unable to load AWS SDK config, %w", err) | |
| return nil, fmt.Errorf("unable to load AWS SDK config: %w", err) |
err conversion is wrapper text: %w
| var err error | ||
| c.gcpStorageClient, err = storage.NewClient(ctx) | ||
| if err != nil { | ||
| return nil, fmt.Errorf("failed to create GCP storage client %w", err) |
There was a problem hiding this comment.
| return nil, fmt.Errorf("failed to create GCP storage client %w", err) | |
| return nil, fmt.Errorf("failed to create GCP storage client: %w", err) |
| c.awsUploader = manager.NewUploader(awsClient) | ||
| c.awsDownloader = manager.NewDownloader(awsClient) | ||
|
|
||
| if _, err := c.awsUploader.Upload(ctx, &s3.PutObjectInput{ |
There was a problem hiding this comment.
Test objects shouldn't be part of a "serious" release but they can stay for now.
| defer f.Close() | ||
|
|
||
| for r := range resources { | ||
| b, err := json.Marshal(r) |
There was a problem hiding this comment.
In the future a drop in encoding/json replacement (eg. fastjson) can be used here.
| "github.com/cloudquery/plugin-sdk/schema" | ||
| ) | ||
|
|
||
| const maxJsonSize = 1024 * 1024 * 20 |
There was a problem hiding this comment.
This is only used for reading/scanner yet the name doesn't suggest that. (Could also move the const defn inside the read method)
| github.com/rs/zerolog v1.28.0 | ||
| ) | ||
|
|
||
| replace github.com/cloudquery/plugin-sdk => ../../../../plugin-sdk-split |
There was a problem hiding this comment.
This needs to be removed before merge
| ) | ||
|
|
||
| const ( | ||
| sentryDSN = "https://79d5e237dafe45e1a4ec0785bc528280@o1396617.ingest.sentry.io/4504083471335424" |
There was a problem hiding this comment.
Consider replacing this before release (belongs to the CSV dest)
|
Closes: #4983 |
| @@ -0,0 +1,124 @@ | |||
| package azure_blob_storage | |||
There was a problem hiding this comment.
can we keep the directory also azure_blog_storage instead of azure-blog-storage ?
Added another option needed for destination testing. Needed by this PR: cloudquery/cloudquery#4902 Also, added a `FlattenTables` function for `Tables` type
This is a snowflake destination plugin. This support streaming directly to snowflake (via local csv uploads as streaming insert is not supporting all snowflake data-types as of right now). The streaming way (or psudo streaming) is useful to get started and to test stuff locally but for production usages it is recommended to use standard data-pipelines that uploads first csv/json to a remote storage (S3, GCS, Azure Blob...) and then via a periodic job or snowpipe to load it to Snowflake DB will be the most performant and cheap way to do at scale. For the latter, this PR needs to be implemented and tested #4902 Known Issue: Migrations are slow due to currently not using any batching to get all the tables. I've an idea on how to solve it but maybe will do it in a follow-up PR.
erezrokah
left a comment
There was a problem hiding this comment.
Added a few comments. I would also verify the steps in https://www.notion.so/cloudquery/Adding-a-new-plugin-to-the-monorepo-f216b653dbe648b2b3512fb8d59a8f89
| @@ -0,0 +1,2 @@ | |||
| file | |||
| cq_csv_output No newline at end of file | |||
There was a problem hiding this comment.
| cq_csv_output | |
| cq_file_output |
| variables: | ||
| component: destination/csv | ||
| binary: csv | ||
|
|
||
| project_name: plugins/destination/csv | ||
|
|
||
| monorepo: | ||
| tag_prefix: plugins-destination-csv- | ||
| dir: plugins/destination/csv | ||
|
|
||
| includes: | ||
| - from_file: | ||
| # Relative to the directory Go Releaser is run from (which is the root of the repository) | ||
| path: ./plugins/.goreleaser.yaml No newline at end of file |
There was a problem hiding this comment.
| variables: | |
| component: destination/csv | |
| binary: csv | |
| project_name: plugins/destination/csv | |
| monorepo: | |
| tag_prefix: plugins-destination-csv- | |
| dir: plugins/destination/csv | |
| includes: | |
| - from_file: | |
| # Relative to the directory Go Releaser is run from (which is the root of the repository) | |
| path: ./plugins/.goreleaser.yaml | |
| variables: | |
| component: destination/file | |
| binary: file | |
| project_name: plugins/destination/file | |
| monorepo: | |
| tag_prefix: plugins-destination-file- | |
| dir: plugins/destination/file | |
| includes: | |
| - from_file: | |
| # Relative to the directory Go Releaser is run from (which is the root of the repository) | |
| path: ./plugins/.goreleaser.yaml |
| @@ -0,0 +1,2 @@ | |||
| # Changelog | |||
There was a problem hiding this comment.
You don't need to add this, it will be created by release please
| kind: destination | ||
| spec: | ||
| name: "csv" | ||
| path: "cloudquery/csv" | ||
| version: "v1.0.1" # latest version of csv plugin | ||
| write_mode: "append" # CSV only supports 'append' mode | ||
|
|
||
| spec: | ||
| directory: './cq_csv_output' |
There was a problem hiding this comment.
| kind: destination | |
| spec: | |
| name: "csv" | |
| path: "cloudquery/csv" | |
| version: "v1.0.1" # latest version of csv plugin | |
| write_mode: "append" # CSV only supports 'append' mode | |
| spec: | |
| directory: './cq_csv_output' | |
| kind: destination | |
| spec: | |
| name: "file" | |
| path: "cloudquery/file" | |
| version: "v1.0.0" # latest version of file plugin | |
| write_mode: "append" # File plugin only supports 'append' mode | |
| spec: | |
| directory: './cq_file_output' |
| directory: './cq_csv_output' | ||
| ``` | ||
|
|
||
| ## CSV Spec |
There was a problem hiding this comment.
| ## CSV Spec | |
| ## File Spec |
|
|
||
| ## CSV Spec | ||
|
|
||
| This is the (nested) spec used by the CSV destination Plugin. |
There was a problem hiding this comment.
| This is the (nested) spec used by the CSV destination Plugin. | |
| This is the (nested) spec used by the File destination Plugin. |
|
|
||
| This is the (nested) spec used by the CSV destination Plugin. | ||
|
|
||
| - `directory` (string) (optional, defaults to `./cq_csv_output`) |
There was a problem hiding this comment.
| - `directory` (string) (optional, defaults to `./cq_csv_output`) | |
| - `directory` (string) (optional, defaults to `./cq_file_output`) |
|
|
||
| - `directory` (string) (optional, defaults to `./cq_csv_output`) | ||
|
|
||
| Directory where all CSV files will be written. A CSV file will be created per table. |
There was a problem hiding this comment.
| Directory where all CSV files will be written. A CSV file will be created per table. | |
| Directory where all files will be written. A file will be created per table. |
|
|
||
| if _, err := c.awsUploader.Upload(ctx, &s3.PutObjectInput{ | ||
| Bucket: aws.String(c.baseDir), | ||
| Key: aws.String(c.path + "/cq-test-file"), |
| return v.String() | ||
| } | ||
|
|
||
| func (*Client) transformCSVMacaddrArray(v *schema.MacaddrArray) interface{} { |
There was a problem hiding this comment.
can it be just a single func?
func (*Client) transform(v fmt.Stringer) any {
return v.String()
](even more. so, make ret value a string in signature?
| ) | ||
|
|
||
| func (*Client) DeleteStale(ctx context.Context, tables schema.Tables, sourceName string, syncTime time.Time) error { | ||
| return fmt.Errorf("csv destination doesn't support overwrite-delete-stale mode. please use append mode") |
There was a problem hiding this comment.
| return fmt.Errorf("csv destination doesn't support overwrite-delete-stale mode. please use append mode") | |
| return fmt.Errorf("csv destination doesn't support overwrite-delete-stale mode, use append mode instead") |
| "github.com/cloudquery/plugin-sdk/schema" | ||
| ) | ||
|
|
||
| const maxJsonSize = 1024 * 1024 * 20 |
| case FormatTypeCSV: | ||
| go func() { | ||
| defer wg.Done() | ||
| c.writeCSVResource(ctx, t.Name, workers[t.Name].writeChan) |
There was a problem hiding this comment.
don't do a lookup, just do
worker := &worker{writeChan: make(chan any)}
workers[t.Name] = workerabove and then just use worker
| } | ||
|
|
||
| for r := range res { | ||
| workers[r.TableName].writeChan <- r.Data |
There was a problem hiding this comment.
shouldn't it be done in parallel?
| github.com/rs/zerolog v1.28.0 | ||
| ) | ||
|
|
||
| replace github.com/cloudquery/plugin-sdk => ../../../../plugin-sdk-split |
ed05fa0 to
f77537c
Compare
|
The latest updates on your projects. Learn more about Vercel for Git ↗︎
|
|
Closing. Done here: #6096 |
Fixes #4983
New destination plugin
filewill support multiple format (csv, json) with multiple storage backends such as local, S3, GCS, Azure Blob Storage.This is ready now for initial review.