Skip to content

Conversation

@BenWhitehead
Copy link
Collaborator

Add initial implementation of a GrpcBlobWriteChannel and GrpcBlobReadChannel

GrpcStorageImpl

GrpcStorageImpl#reader and GrpcStorageImpl#writer have been defined to construct
the respective channel.

Other read/write methods on GrpcStorageImpl have been implemented in terms of
#reader and #writer, more appropriate specific implementations will be
implemented later on.

Wired up methods

  • GrpcStorageImpl#create(BlobInfo blobInfo, BlobTargetOption... options)
  • GrpcStorageImpl#create(BlobInfo blobInfo, byte[] content, BlobTargetOption... options)
  • GrpcStorageImpl#create(BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options)
  • GrpcStorageImpl#create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options)
  • GrpcStorageImpl#createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options)
  • GrpcStorageImpl#createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options)
  • GrpcStorageImpl#createFrom(BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options)
  • GrpcStorageImpl#readAllBytes(String bucket, String blob, BlobSourceOption... options)
  • GrpcStorageImpl#readAllBytes(BlobId blob, BlobSourceOption... options)
  • GrpcStorageImpl#reader(String bucket, String blob, BlobSourceOption... options)
  • GrpcStorageImpl#reader(BlobId blob, BlobSourceOption... options)
  • GrpcStorageImpl#downloadTo(BlobId blob, Path path, BlobSourceOption... options)
  • GrpcStorageImpl#downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options)
  • GrpcStorageImpl#writer(BlobInfo blobInfo, BlobWriteOption... options)

New channel session data types

BlobWriteChannels existing implementation provides an internal handle to access
the resulting StorageObject which is created as part of a resumable upload
session. Additionally, performing a resumable upload is a multi-leg set of
operations. The new *ByteChannelSession interfaces and implementations
model this multi-leg lifecycle.

A *ByteChannelSession allows for opening of the channel for either read or write
along with keeping a reference to the underlying object associated with the
operation (the initial object including in the first message on read, or the
final object returned upon completing a write operation).

New channels approach

Channels are inherently buffered already, in order to facilitate more efficient
and appropriate use of additional buffering all channels have Unbuffered
and Buffered implementations, where the buffered channels sole responsibility
is narrowed to the intermediate buffering otherwise delegating all operations
to an underlying unbuffered channel.

Buffers which are allocated for use with a BufferedChannel are appropriately
reused, rather than thrown away and/or resized in an ad-hoc manner. This should
provide a more predictable behavior for memory use.

All Channels now properly synchronize their read/write/flush methods to ensure
proper ordering from multiple threads.

move over DataGeneration classes from benchmarks

Add DataGenerator, DataChain and TmpFile to test classes from benchmarks.

Various tools to dynamically generate data sets either in memory or on disk.

@BenWhitehead BenWhitehead requested a review from a team as a code owner May 25, 2022 16:56
@product-auto-label product-auto-label bot added size: xl Pull request size is extra large. api: storage Issues related to the googleapis/java-storage API. labels May 25, 2022
Add initial implementation of a GrpcBlobWriteChannel and GrpcBlobReadChannel

### GrpcStorageImpl
GrpcStorageImpl#reader and GrpcStorageImpl#writer have been defined to construct
the respective channel.

Other read/write methods on GrpcStorageImpl have been implemented in terms of
`#reader` and `#writer`, more appropriate specific implementations will be
implemented later on.

#### Wired up methods
* `GrpcStorageImpl#create(BlobInfo blobInfo, BlobTargetOption... options)`
* `GrpcStorageImpl#create(BlobInfo blobInfo, byte[] content, BlobTargetOption... options)`
* `GrpcStorageImpl#create(BlobInfo blobInfo, byte[] content, int offset, int length, BlobTargetOption... options)`
* `GrpcStorageImpl#create(BlobInfo blobInfo, InputStream content, BlobWriteOption... options)`
* `GrpcStorageImpl#createFrom(BlobInfo blobInfo, Path path, BlobWriteOption... options)`
* `GrpcStorageImpl#createFrom(BlobInfo blobInfo, Path path, int bufferSize, BlobWriteOption... options)`
* `GrpcStorageImpl#createFrom(BlobInfo blobInfo, InputStream content, int bufferSize, BlobWriteOption... options)`
* `GrpcStorageImpl#readAllBytes(String bucket, String blob, BlobSourceOption... options)`
* `GrpcStorageImpl#readAllBytes(BlobId blob, BlobSourceOption... options)`
* `GrpcStorageImpl#reader(String bucket, String blob, BlobSourceOption... options)`
* `GrpcStorageImpl#reader(BlobId blob, BlobSourceOption... options)`
* `GrpcStorageImpl#downloadTo(BlobId blob, Path path, BlobSourceOption... options)`
* `GrpcStorageImpl#downloadTo(BlobId blob, OutputStream outputStream, BlobSourceOption... options)`
* `GrpcStorageImpl#writer(BlobInfo blobInfo, BlobWriteOption... options)`

### New channel session data types
`BlobWriteChannel`s existing implementation provides an internal handle to access
the resulting `StorageObject` which is created as part of a resumable upload
session. Additionally, performing a resumable upload is a multi-leg set of
operations. The new `*ByteChannelSession` interfaces and implementations
model this multi-leg lifecycle.

A `*ByteChannelSession` allows for opening of the channel for either read or write
along with keeping a reference to the underlying object associated with the
operation (the initial object including in the first message on read, or the
final object returned upon completing a write operation).

### New channels approach
Channels are inherently buffered already, in order to facilitate more efficient
and appropriate use of additional buffering all channels have Unbuffered
and Buffered implementations, where the buffered channels sole responsibility
is narrowed to the intermediate buffering otherwise delegating all operations
to an underlying unbuffered channel.

Buffers which are allocated for use with a BufferedChannel are appropriately
reused, rather than thrown away and/or resized in an ad-hoc manner. This should
provide a more predictable behavior for memory use.

All Channels now properly synchronize their read/write/flush methods to ensure
proper ordering from multiple threads.

### move over DataGeneration classes from benchmarks
Add DataGenerator, DataChain and TmpFile to test classes from benchmarks.

Various tools to dynamically generate data sets either in memory or on disk.
Copy link
Contributor

@frankyn frankyn left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Adding a few questions.

}
}

return data.toArray(new Data[0]);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is a strange fragment new Data[0] what does this do?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is boilerplate to be able to pass the type of array the Dequeue should be turned into. As of java 8, it is not possible to directly instantiate an array for a generic type, so this argument needs to be provided to the method. It's shorthand for data.toArray(new Data[data.size()]).

toArray(T[]) is defined on java.util.Collection which means an invocation of size() may not be constant time. By delegating the actual array sizing to the collections implementation of toArray that implementation can use an appropriate means of initializing the array for us.

import org.checkerframework.checker.nullness.qual.Nullable;

// TODO: Better name, externally we're treating a chunk to mean a set of messages for which
// they all must commit before proceeding.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clarify what this class does; IIUC, this is chunking data for gRPC writes of 2MiB

Copy link
Collaborator Author

@BenWhitehead BenWhitehead Jun 2, 2022

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

added some javadocs to the class, and one of the two methods (since they're overloads). I've also renamed the class from Chunker to ChunkSegmenter to be more accurate with our shared terminology where a Chunk results in multiple messages.

* Add javadocs to Buffers
* Rename Chunker to ChunkSegmenter
* Add javadocs to ChunkSegmenter
* Inline `enqueuedBytes()` variable in DefaultBufferedReadableByteChannel
*
@BenWhitehead BenWhitehead merged commit 023b2be into feat/grpc-storage Jun 2, 2022
@BenWhitehead BenWhitehead deleted the grpc-media branch June 2, 2022 22:01
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

api: storage Issues related to the googleapis/java-storage API. size: xl Pull request size is extra large.

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants