Skip to content

Commit cb7b0a1

Browse files
authored
feat(storage/dataflux): add dataflux interface (#10748)
feat: add dataflux interface and helper functions to storage/dataflux. Dataflux fast-listing will be used to quickly list objects in a bucket in parallel. Fixes #10731
1 parent 9199843 commit cb7b0a1

File tree

9 files changed

+831
-1
lines changed

9 files changed

+831
-1
lines changed

storage/dataflux/README.md

Lines changed: 65 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,65 @@
1+
# Dataflux for Google Cloud Storage Go client library
2+
3+
## Overview
4+
The purpose of this client is to quickly list data stored in GCS.
5+
6+
## Fast List
7+
The fast list component of this client leverages GCS API to parallelize the listing of files within a GCS bucket. It does this by implementing a workstealing algorithm, where each worker in the list operation is able to steal work from its siblings once it has finished all currently stated listing work. This parallelization leads to a significant real world speed increase than sequential listing. Note that paralellization is limited by the machine on which the client runs.
8+
9+
Benchmarking has demonstrated that the larger the object count, the better Dataflux performs when compared to a linear listing. Around 100k objects, users will see improvemement in listing speed.
10+
11+
### Example Usage
12+
13+
First create a `storage.Client` to use throughout your application:
14+
15+
[snip]:# (storage-1)
16+
```go
17+
ctx := context.Background()
18+
client, err := storage.NewClient(ctx)
19+
if err != nil {
20+
log.Fatal(err)
21+
}
22+
```
23+
24+
[snip]:# (storage-2)
25+
```go
26+
27+
// storage.Query to filter objects that the user wants to list.
28+
query := storage.Query{}
29+
// Input for fast-listing.
30+
dfopts := dataflux.ListerInput{
31+
BucketName: "bucket",
32+
Parallelism: 500,
33+
BatchSize: 500000,
34+
Query: query,
35+
}
36+
37+
// Construct a dataflux lister.
38+
df, close = dataflux.NewLister(sc, dfopts)
39+
defer close()
40+
41+
// List objects in GCS bucket.
42+
for {
43+
objects, err := df.NextBatch(ctx)
44+
45+
if err == iterator.Done {
46+
// No more objects in the bucket to list.
47+
break
48+
}
49+
if err != nil {
50+
log.Fatal(err)
51+
}
52+
// TODO: process objects
53+
}
54+
```
55+
56+
### Fast List Benchmark Results
57+
VM used : n2d-standard-48
58+
Region: us-central1-a
59+
NIC type: gVNIC
60+
|File Count|VM Core Count|List Time Without Dataflux |List Time With Dataflux|
61+
|------------|-------------|--------------------------|-----------------------|
62+
|5000000 Obj |48 Core |319.72s |17.35s |
63+
|1999032 Obj |48 Core |139.54s |8.98s |
64+
|578703 Obj |48 Core |32.90s |5.71s |
65+
|10448 Obj |48 Core |750.50ms |637.17ms |

storage/dataflux/doc.go

Lines changed: 27 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,27 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
/*
16+
Package dataflux provides an easy way to parallelize listing in Google
17+
Cloud Storage.
18+
19+
More information about Google Cloud Storage is available at
20+
https://cloud.google.com/storage/docs.
21+
22+
See https://pkg.go.dev/cloud.google.com/go for authentication, timeouts,
23+
connection pooling and similar aspects of this package.
24+
25+
NOTE: This package is in preview. It is not stable, and is likely to change.
26+
*/
27+
package dataflux // import "cloud.google.com/go/storage/dataflux"

storage/dataflux/example_test.go

Lines changed: 69 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,69 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dataflux_test
16+
17+
import (
18+
"context"
19+
"log"
20+
21+
"cloud.google.com/go/storage"
22+
"cloud.google.com/go/storage/dataflux"
23+
"google.golang.org/api/iterator"
24+
)
25+
26+
func ExampleNextBatch_batch() {
27+
ctx := context.Background()
28+
// Pass in any client opts or set retry policy here.
29+
client, err := storage.NewClient(ctx)
30+
if err != nil {
31+
// handle error
32+
}
33+
34+
// Create dataflux fast-list input and provide desired options,
35+
// including number of workers, batch size, query to filer objects, etc.
36+
in := &dataflux.ListerInput{
37+
BucketName: "mybucket",
38+
// Optionally specify params to apply to lister.
39+
Parallelism: 100,
40+
BatchSize: 500000,
41+
Query: storage.Query{},
42+
SkipDirectoryObjects: false,
43+
}
44+
45+
// Create Lister with desired options, including number of workers,
46+
// part size, per operation timeout, etc.
47+
df := dataflux.NewLister(client, in)
48+
defer df.Close()
49+
50+
var numOfObjects int
51+
52+
for {
53+
objects, err := df.NextBatch(ctx)
54+
if err != nil {
55+
// handle error
56+
}
57+
58+
if err == iterator.Done {
59+
numOfObjects += len(objects)
60+
// No more objects in the bucket to list.
61+
break
62+
}
63+
if err != nil {
64+
// handle error
65+
}
66+
numOfObjects += len(objects)
67+
}
68+
log.Printf("listing %d objects in bucket %q is complete.", numOfObjects, in.BucketName)
69+
}

storage/dataflux/fast_list.go

Lines changed: 156 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,156 @@
1+
// Copyright 2024 Google LLC
2+
//
3+
// Licensed under the Apache License, Version 2.0 (the "License");
4+
// you may not use this file except in compliance with the License.
5+
// You may obtain a copy of the License at
6+
//
7+
// http://www.apache.org/licenses/LICENSE-2.0
8+
//
9+
// Unless required by applicable law or agreed to in writing, software
10+
// distributed under the License is distributed on an "AS IS" BASIS,
11+
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12+
// See the License for the specific language governing permissions and
13+
// limitations under the License.
14+
15+
package dataflux
16+
17+
import (
18+
"context"
19+
"errors"
20+
"fmt"
21+
22+
"cloud.google.com/go/storage"
23+
"golang.org/x/sync/errgroup"
24+
"google.golang.org/api/iterator"
25+
)
26+
27+
// listingMethod represents the method of listing.
28+
type listingMethod int
29+
30+
const (
31+
// open when any method can be used to list.
32+
open listingMethod = iota
33+
// sequential when the listing is done sequentially.
34+
sequential
35+
// worksteal when the listing is done using work stealing algorithm.
36+
worksteal
37+
)
38+
39+
// ListerInput contains options for listing objects.
40+
type ListerInput struct {
41+
// BucketName is the name of the bucket to list objects from. Required.
42+
BucketName string
43+
44+
// Parallelism is number of parallel workers to use for listing. Default value is 10x number of available CPU. Optional.
45+
Parallelism int
46+
47+
// BatchSize is the number of objects to list. Default value returns all objects at once. Optional.
48+
// The number of objects returned will be rounded up to a multiple of gcs page size.
49+
BatchSize int
50+
51+
// Query is the query to filter objects for listing. Default value is nil. Optional.
52+
//Use ProjectionNoACL For faster listing. ACL is expensive and this results in fewer objects
53+
// to be returned from GCS in each API call.
54+
Query storage.Query
55+
56+
// SkipDirectoryObjects is to indicate whether to list directory objects. Default value is false. Optional.
57+
SkipDirectoryObjects bool
58+
}
59+
60+
// Lister is used for interacting with Dataflux fast-listing.
61+
// The caller should initialize it with NewLister() instead of creating it directly.
62+
type Lister struct {
63+
// method indicates the listing method(open, sequential, worksteal) to be used for listing.
64+
method listingMethod
65+
66+
// pageToken is the token to use for sequential listing.
67+
pageToken string
68+
69+
// bucket is the bucket handle to list objects from.
70+
bucket *storage.BucketHandle
71+
72+
// batchSize is the number of objects to list.
73+
batchSize int
74+
75+
// query is the query to filter objects for listing.
76+
query storage.Query
77+
78+
// skipDirectoryObjects is to indicate whether to list directory objects.
79+
skipDirectoryObjects bool
80+
}
81+
82+
// NewLister creates a new dataflux Lister to list objects in the give bucket.
83+
func NewLister(c *storage.Client, in *ListerInput) *Lister {
84+
bucket := c.Bucket(in.BucketName)
85+
lister := &Lister{
86+
method: open,
87+
pageToken: "",
88+
bucket: bucket,
89+
batchSize: in.BatchSize,
90+
query: in.Query,
91+
skipDirectoryObjects: in.SkipDirectoryObjects,
92+
}
93+
return lister
94+
}
95+
96+
// NextBatch runs worksteal algorithm and sequential listing in parallel to quickly
97+
// return a list of objects in the bucket. For smaller dataset,
98+
// sequential listing is expected to be faster. For larger dataset,
99+
// worksteal listing is expected to be faster.
100+
func (c *Lister) NextBatch(ctx context.Context) ([]*storage.ObjectAttrs, error) {
101+
// countError tracks the number of failed listing methods.
102+
countError := 0
103+
var results []*storage.ObjectAttrs
104+
ctx, cancel := context.WithCancel(ctx)
105+
defer cancel()
106+
// Errgroup takes care of running both methods in parallel. As soon as one of the method
107+
// is complete, the running method also stops.
108+
g, childCtx := errgroup.WithContext(ctx)
109+
110+
// To start listing method is Open and runs both worksteal and sequential listing in parallel.
111+
// The method which completes first is used for all subsequent runs.
112+
// TODO: Run worksteal listing when method is Open or WorkSteal.
113+
// Run sequential listing when method is Open or Sequential.
114+
if c.method != worksteal {
115+
116+
g.Go(func() error {
117+
objects, nextToken, err := c.sequentialListing(childCtx)
118+
if err != nil {
119+
countError++
120+
return fmt.Errorf("error in running sequential listing: %w", err)
121+
}
122+
// If sequential listing completes first, set method to sequential listing and ranges to nil.
123+
// The nextToken will be used to continue sequential listing.
124+
results = objects
125+
c.pageToken = nextToken
126+
c.method = sequential
127+
// Close context when sequential listing is complete.
128+
cancel()
129+
return nil
130+
})
131+
}
132+
133+
// Close all functions if either sequential listing or worksteal listing is complete.
134+
err := g.Wait()
135+
136+
// If the error is not context.Canceled, then return error instead of falling back
137+
// to the other method. This is so that the error can be fixed and user can take
138+
// advantage of fast-listing.
139+
// As one of the listing method completes, it is expected to cancel context for the other method.
140+
// If both sequential and worksteal listing fail due to context canceled, only then return error.
141+
if err != nil && (!errors.Is(err, context.Canceled) || countError > 1) {
142+
return nil, fmt.Errorf("failed waiting for sequntial and work steal lister : %w", err)
143+
}
144+
145+
// If ranges for worksteal and pageToken for sequential listing is empty, then listing is complete.
146+
if c.pageToken == "" {
147+
return results, iterator.Done
148+
}
149+
return results, nil
150+
}
151+
152+
// Close closes the range channel of the Lister.
153+
func (c *Lister) Close() {
154+
155+
// TODO: Close range channel for worksteal lister.
156+
}

0 commit comments

Comments
 (0)