-
Notifications
You must be signed in to change notification settings - Fork 89
feat: add reader and writer to GrpcStorageImpl #1413
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Conversation
Add initial implementation of a GrpcBlobWriteChannel and GrpcBlobReadChannel ### GrpcStorageImpl GrpcStorageImpl#reader and GrpcStorageImpl#writer have been defined to construct the respective channel. Other read/write methods on GrpcStorageImpl have been implemented in terms of `#reader` and `#writer`, more appropriate specific implementations will be implemented later on. #### Wired up methods * `GrpcStorageImpl#create(BlobInfo blobInfo, BlobTargetOption... options)` * `GrpcStorageImpl#create(BlobInfo blobInfo, byte[] content, BlobTargetOption... options)` * `GrpcStorageImpl#create(BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options)` * `GrpcStorageImpl#create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options)` * `GrpcStorageImpl#createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options)` * `GrpcStorageImpl#createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options)` * `GrpcStorageImpl#createFrom(BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options)` * `GrpcStorageImpl#readAllBytes(String bucket, String blob, BlobSourceOption... options)` * `GrpcStorageImpl#readAllBytes(BlobId blob, BlobSourceOption... options)` * `GrpcStorageImpl#reader(String bucket, String blob, BlobSourceOption... options)` * `GrpcStorageImpl#reader(BlobId blob, BlobSourceOption... options)` * `GrpcStorageImpl#downloadTo(BlobId blob, Path path, BlobSourceOption... options)` * `GrpcStorageImpl#downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options)` * `GrpcStorageImpl#writer(BlobInfo blobInfo, BlobWriteOption... options)` ### New channel session data types `BlobWriteChannel`s existing implementation provides an internal handle to access the resulting `StorageObject` which is created as part of a resumable upload session. Additionally, performing a resumable upload is a multi-leg set of operations. The new `*ByteChannelSession` interfaces and implementations model this multi-leg lifecycle. A `*ByteChannelSession` allows for opening of the channel for either read or write along with keeping a reference to the underlying object associated with the operation (the initial object including in the first message on read, or the final object returned upon completing a write operation). ### New channels approach Channels are inherently buffered already, in order to facilitate more efficient and appropriate use of additional buffering all channels have Unbuffered and Buffered implementations, where the buffered channels sole responsibility is narrowed to the intermediate buffering otherwise delegating all operations to an underlying unbuffered channel. Buffers which are allocated for use with a BufferedChannel are appropriately reused, rather than thrown away and/or resized in an ad-hoc manner. This should provide a more predictable behavior for memory use. All Channels now properly synchronize their read/write/flush methods to ensure proper ordering from multiple threads. ### move over DataGeneration classes from benchmarks Add DataGenerator, DataChain and TmpFile to test classes from benchmarks. Various tools to dynamically generate data sets either in memory or on disk.
frankyn
left a comment
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Adding a few questions.
google-cloud-storage/src/main/java/com/google/cloud/storage/Chunker.java
Outdated
Show resolved
Hide resolved
google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java
Outdated
Show resolved
Hide resolved
...cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedReadableByteChannel.java
Outdated
Show resolved
Hide resolved
| } | ||
| } | ||
|
|
||
| return data.toArray(new Data[0]); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
this is a strange fragment new Data[0] what does this do?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is boilerplate to be able to pass the type of array the Dequeue should be turned into. As of java 8, it is not possible to directly instantiate an array for a generic type, so this argument needs to be provided to the method. It's shorthand for data.toArray(new Data[data.size()]).
toArray(T[]) is defined on java.util.Collection which means an invocation of size() may not be constant time. By delegating the actual array sizing to the collections implementation of toArray that implementation can use an appropriate means of initializing the array for us.
| import org.checkerframework.checker.nullness.qual.Nullable; | ||
|
|
||
| // TODO: Better name, externally we're treating a chunk to mean a set of messages for which | ||
| // they all must commit before proceeding. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Clarify what this class does; IIUC, this is chunking data for gRPC writes of 2MiB
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
added some javadocs to the class, and one of the two methods (since they're overloads). I've also renamed the class from Chunker to ChunkSegmenter to be more accurate with our shared terminology where a Chunk results in multiple messages.
* Add javadocs to Buffers * Rename Chunker to ChunkSegmenter * Add javadocs to ChunkSegmenter * Inline `enqueuedBytes()` variable in DefaultBufferedReadableByteChannel *
Add initial implementation of a GrpcBlobWriteChannel and GrpcBlobReadChannel
GrpcStorageImpl
GrpcStorageImpl#reader and GrpcStorageImpl#writer have been defined to construct
the respective channel.
Other read/write methods on GrpcStorageImpl have been implemented in terms of
#readerand#writer, more appropriate specific implementations will beimplemented later on.
Wired up methods
GrpcStorageImpl#create(BlobInfo blobInfo, BlobTargetOption... options)GrpcStorageImpl#create(BlobInfo blobInfo, byte[] content, BlobTargetOption... options)GrpcStorageImpl#create(BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options)GrpcStorageImpl#create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options)GrpcStorageImpl#createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options)GrpcStorageImpl#createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options)GrpcStorageImpl#createFrom(BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options)GrpcStorageImpl#readAllBytes(String bucket, String blob, BlobSourceOption... options)GrpcStorageImpl#readAllBytes(BlobId blob, BlobSourceOption... options)GrpcStorageImpl#reader(String bucket, String blob, BlobSourceOption... options)GrpcStorageImpl#reader(BlobId blob, BlobSourceOption... options)GrpcStorageImpl#downloadTo(BlobId blob, Path path, BlobSourceOption... options)GrpcStorageImpl#downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options)GrpcStorageImpl#writer(BlobInfo blobInfo, BlobWriteOption... options)New channel session data types
BlobWriteChannels existing implementation provides an internal handle to accessthe resulting
StorageObjectwhich is created as part of a resumable uploadsession. Additionally, performing a resumable upload is a multi-leg set of
operations. The new
*ByteChannelSessioninterfaces and implementationsmodel this multi-leg lifecycle.
A
*ByteChannelSessionallows for opening of the channel for either read or writealong with keeping a reference to the underlying object associated with the
operation (the initial object including in the first message on read, or the
final object returned upon completing a write operation).
New channels approach
Channels are inherently buffered already, in order to facilitate more efficient
and appropriate use of additional buffering all channels have Unbuffered
and Buffered implementations, where the buffered channels sole responsibility
is narrowed to the intermediate buffering otherwise delegating all operations
to an underlying unbuffered channel.
Buffers which are allocated for use with a BufferedChannel are appropriately
reused, rather than thrown away and/or resized in an ad-hoc manner. This should
provide a more predictable behavior for memory use.
All Channels now properly synchronize their read/write/flush methods to ensure
proper ordering from multiple threads.
move over DataGeneration classes from benchmarks
Add DataGenerator, DataChain and TmpFile to test classes from benchmarks.
Various tools to dynamically generate data sets either in memory or on disk.