-
Notifications
You must be signed in to change notification settings - Fork 46
feat: add GCS sink #469
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
feat: add GCS sink #469
Conversation
plugins/sinks/gcs/client.go
Outdated
| "google.golang.org/api/option" | ||
| ) | ||
|
|
||
| type GCSClient interface { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We can call this Writer?
plugins/sinks/gcs/README.md
Outdated
| @@ -0,0 +1,41 @@ | |||
| # GCS | |||
|
|
|||
| Sinks json data to a file in `ndjson` format in a Google Cloud Storage bucket | |||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe we can rephrase this as:
Sinks json data to a file as ndjson format in Google Cloud Storage bucket
plugins/sinks/gcs/client.go
Outdated
| writer := client.Bucket(bucketname).Object(filepath).NewWriter(ctx) | ||
|
|
||
| return &gcsClient{ | ||
| client: client, |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I don't see client getting used anywhere, should we get rid of it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
yes, its's only used to create the writer, will update it
plugins/sinks/gcs/client.go
Outdated
| writer *storage.Writer | ||
| } | ||
|
|
||
| func newGCSClient(ctx context.Context, serviceAccountJSON []byte, bucketname string, filepath string) (GCSClient, error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You should return *gcsClient instead of interface.
Remember always return struct and accept interfaces in args
plugins/sinks/gcs/client.go
Outdated
|
|
||
| func (c *gcsClient) WriteData(jsonBytes []byte) error { | ||
| if _, err := c.writer.Write(jsonBytes); err != nil { | ||
| return errors.Wrap(err, "error in writing json data to an object") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Why client should know if the data is json or something else? It's just raw bytes for it. So this error won't be applicable if you decide to write simple text.
plugins/sinks/gcs/client.go
Outdated
| func (c *gcsClient) Close() error { | ||
| if err := c.writer.Close(); err != nil { | ||
| return errors.Wrap(err, "error closing the writer") | ||
| } | ||
|
|
||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Maybe simplify this as
func (c *gcsClient) Close() error {
return c.writer.Close()
}
plugins/sinks/gcs/gcs.go
Outdated
| func (s *Sink) resolveBucketandObjectNames() (string, string) { | ||
| dirs := strings.Split(s.config.Path, "/") | ||
| bucketname := dirs[0] | ||
| timestamp := time.Now().Format("2006.01.02 15:04:05") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I would suggest using a pre-defined time RFC format, specially the one without space.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
will RFC3339 do ? Output format is "2006-01-02T15:04:05Z07:00"
plugins/sinks/gcs/gcs.go
Outdated
| if s.config.ObjectPrefix != "" { | ||
| s.config.ObjectPrefix = s.config.ObjectPrefix + "-" | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
What if user has provided prefix as hello-, you will add double hyphens?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It should be a single hyphen only. I will add a check for that
plugins/sinks/gcs/gcs.go
Outdated
| s.config.ObjectPrefix = s.config.ObjectPrefix + "-" | ||
| } | ||
|
|
||
| objectname := s.config.ObjectPrefix + timestamp + ".ndjson" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use fmt.Sprintf to build strings.
plugins/sinks/gcs/gcs.go
Outdated
|
|
||
| objectname := s.config.ObjectPrefix + timestamp + ".ndjson" | ||
| if len(dirs) > 1 { | ||
| objectname = dirs[len(dirs)-1] + "/" + s.config.ObjectPrefix + timestamp + ".ndjson" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
use fmt.Sprintf to build strings.
plugins/sinks/gcs/gcs.go
Outdated
| } | ||
|
|
||
| func (s *Sink) validateServiceAccountKey() error { | ||
|
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
unnecessary space?
plugins/sinks/gcs/gcs.go
Outdated
| if err := s.client.Close(); err != nil { | ||
| return err | ||
| } | ||
| return nil | ||
| } |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
return s.client.Close()
plugins/sinks/gcs/gcs.go
Outdated
| return nil | ||
| } | ||
|
|
||
| func (s *Sink) resolveBucketandObjectNames() (string, string) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A better name could be resolveBucketPath
plugins/sinks/gcs/gcs.go
Outdated
| dirs := strings.Split(s.config.Path, "/") | ||
| bucketname := dirs[0] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A notation pretty common in object storage is gcs://bucketname or s3://bucketname I guess this line would fail here. Can we use url.Parse(...)?
plugins/sinks/gcs/gcs.go
Outdated
| objectname := fmt.Sprintf("%s%s.ndjson", objectprefix, timestamp) | ||
|
|
||
| if len(dirs) > 1 { | ||
| objectname = fmt.Sprintf("%s/%s%s.ndjson", dirs[len(dirs)-1], objectprefix, timestamp) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
How will dirs[len(dirs)-1] work for a path like gcs://bucketname/path1/path2/
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The default path format I assumed was the one I got from GCS bucket/folder copy path options. If that's the required case, will handle the gcs prefix
plugins/sinks/gcs/gcs.go
Outdated
| return nil | ||
| } | ||
|
|
||
| func (s *Sink) Close() (err error) { |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
(err error) could be just error
|
@kushsharma done with the changes, taking the input as a URL in the sink config, now URL format it accepts in config is |
|
LGTM. Nice work. |
feat: add GCS sink (raystack#469)
No description provided.