Skip to content

Commit e5772a4

Browse files
authored
feat: port DefaultBlobWriteSessionConfig to work with HttpStorageOptions (#2472)
1 parent 7c86ad0 commit e5772a4

File tree

6 files changed

+115
-8
lines changed

6 files changed

+115
-8
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfig.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -53,6 +53,15 @@ WritableByteChannelSession<?, BlobInfo> writeSession(
5353
StorageInternal s, BlobInfo info, Opts<ObjectTargetOpt> opts);
5454
}
5555

56+
/**
57+
* Internal marker interface to signify an implementation of {@link BlobWriteSessionConfig} is
58+
* compatible with {@link com.google.cloud.storage.TransportCompatibility.Transport#HTTP}
59+
*
60+
* <p>We could evaluate the annotations, but the code for that is more complicated and probably
61+
* not worth the effort.
62+
*/
63+
interface HttpCompatible {}
64+
5665
/**
5766
* Internal marker interface to signify an implementation of {@link BlobWriteSessionConfig} is
5867
* compatible with {@link com.google.cloud.storage.TransportCompatibility.Transport#GRPC}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessionConfigs.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -55,7 +55,7 @@
5555
* full or close. Buffer size is configurable via
5656
* {@link DefaultBlobWriteSessionConfig#withChunkSize(int)}
5757
* </td>
58-
* <td>gRPC</td>
58+
* <td>gRPC, HTTP</td>
5959
* <td>The network will only be used for the following operations:
6060
* <ol>
6161
* <li>Creating the Resumable Upload Session</li>
@@ -241,7 +241,7 @@ private BlobWriteSessionConfigs() {}
241241
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
242242
*/
243243
@BetaApi
244-
@TransportCompatibility({Transport.GRPC})
244+
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
245245
public static DefaultBlobWriteSessionConfig getDefault() {
246246
return new DefaultBlobWriteSessionConfig(ByteSizeConstants._16MiB);
247247
}

google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java

Lines changed: 39 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,17 +21,21 @@
2121
import com.google.api.core.BetaApi;
2222
import com.google.api.core.InternalApi;
2323
import com.google.api.gax.grpc.GrpcCallContext;
24+
import com.google.api.services.storage.model.StorageObject;
2425
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
2526
import com.google.cloud.storage.Conversions.Decoder;
2627
import com.google.cloud.storage.TransportCompatibility.Transport;
2728
import com.google.cloud.storage.UnifiedOpts.ObjectTargetOpt;
2829
import com.google.cloud.storage.UnifiedOpts.Opts;
30+
import com.google.cloud.storage.spi.v1.StorageRpc;
2931
import com.google.common.base.Preconditions;
3032
import com.google.common.util.concurrent.MoreExecutors;
3133
import com.google.storage.v2.WriteObjectRequest;
3234
import com.google.storage.v2.WriteObjectResponse;
3335
import java.nio.channels.WritableByteChannel;
3436
import java.time.Clock;
37+
import java.util.Map;
38+
import java.util.function.Supplier;
3539
import javax.annotation.concurrent.Immutable;
3640

3741
/**
@@ -55,9 +59,9 @@
5559
*/
5660
@Immutable
5761
@BetaApi
58-
@TransportCompatibility({Transport.GRPC})
62+
@TransportCompatibility({Transport.GRPC, Transport.HTTP})
5963
public final class DefaultBlobWriteSessionConfig extends BlobWriteSessionConfig
60-
implements BlobWriteSessionConfig.GrpcCompatible {
64+
implements BlobWriteSessionConfig.HttpCompatible, BlobWriteSessionConfig.GrpcCompatible {
6165
private static final long serialVersionUID = -6873740918589930633L;
6266

6367
private final int chunkSize;
@@ -146,6 +150,39 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
146150
.build();
147151
})),
148152
WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER);
153+
} else if (s instanceof StorageImpl) {
154+
StorageImpl json = (StorageImpl) s;
155+
156+
return new DecoratedWritableByteChannelSession<>(
157+
new LazySession<>(
158+
new LazyWriteChannel<>(
159+
() -> {
160+
final Map<StorageRpc.Option, ?> optionsMap = opts.getRpcOptions();
161+
BlobInfo.Builder builder = info.toBuilder().setMd5(null).setCrc32c(null);
162+
BlobInfo updated = opts.blobInfoMapper().apply(builder).build();
163+
164+
StorageObject encode = Conversions.json().blobInfo().encode(updated);
165+
Supplier<String> uploadIdSupplier =
166+
ResumableMedia.startUploadForBlobInfo(
167+
json.getOptions(),
168+
updated,
169+
optionsMap,
170+
json.retryAlgorithmManager.getForResumableUploadSessionCreate(
171+
optionsMap));
172+
ApiFuture<JsonResumableWrite> startAsync =
173+
ApiFutures.immediateFuture(
174+
JsonResumableWrite.of(
175+
encode, optionsMap, uploadIdSupplier.get(), 0L));
176+
177+
return ResumableMedia.http()
178+
.write()
179+
.byteChannel(HttpClientContext.from(json.storageRpc))
180+
.resumable()
181+
.buffered(BufferHandle.allocate(chunkSize))
182+
.setStartAsync(startAsync)
183+
.build();
184+
})),
185+
Conversions.json().blobInfo());
149186
} else {
150187
throw new IllegalStateException(
151188
"Unknown Storage implementation: " + s.getClass().getName());

google-cloud-storage/src/main/java/com/google/cloud/storage/HttpStorageOptions.java

Lines changed: 40 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import static com.google.common.base.Preconditions.checkArgument;
1920
import static java.util.Objects.requireNonNull;
2021

2122
import com.google.api.core.ApiClock;
@@ -30,6 +31,7 @@
3031
import com.google.cloud.http.HttpTransportOptions;
3132
import com.google.cloud.spi.ServiceRpcFactory;
3233
import com.google.cloud.storage.Retrying.RetryingDependencies;
34+
import com.google.cloud.storage.Storage.BlobWriteOption;
3335
import com.google.cloud.storage.TransportCompatibility.Transport;
3436
import com.google.cloud.storage.spi.StorageRpcFactory;
3537
import com.google.cloud.storage.spi.v1.HttpStorageRpc;
@@ -39,7 +41,9 @@
3941
import java.io.IOException;
4042
import java.io.ObjectInputStream;
4143
import java.io.Serializable;
44+
import java.time.Clock;
4245
import java.util.Set;
46+
import org.checkerframework.checker.nullness.qual.NonNull;
4347

4448
/** @since 2.14.0 This new api is in preview and is subject to breaking changes. */
4549
@BetaApi
@@ -55,6 +59,7 @@ public class HttpStorageOptions extends StorageOptions {
5559

5660
private final HttpRetryAlgorithmManager retryAlgorithmManager;
5761
private transient RetryDependenciesAdapter retryDepsAdapter;
62+
private final BlobWriteSessionConfig blobWriteSessionConfig;
5863

5964
private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) {
6065
super(builder, serviceDefaults);
@@ -63,6 +68,7 @@ private HttpStorageOptions(Builder builder, StorageDefaults serviceDefaults) {
6368
MoreObjects.firstNonNull(
6469
builder.storageRetryStrategy, defaults().getStorageRetryStrategy()));
6570
retryDepsAdapter = new RetryDependenciesAdapter();
71+
blobWriteSessionConfig = builder.blobWriteSessionConfig;
6672
}
6773

6874
@Override
@@ -120,6 +126,8 @@ RetryingDependencies asRetryDependencies() {
120126
public static class Builder extends StorageOptions.Builder {
121127

122128
private StorageRetryStrategy storageRetryStrategy;
129+
private BlobWriteSessionConfig blobWriteSessionConfig =
130+
HttpStorageDefaults.INSTANCE.getDefaultStorageWriterConfig();
123131

124132
Builder() {}
125133

@@ -218,6 +226,24 @@ public HttpStorageOptions.Builder setQuotaProjectId(String quotaProjectId) {
218226
return this;
219227
}
220228

229+
/**
230+
* @see BlobWriteSessionConfig
231+
* @see BlobWriteSessionConfigs
232+
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
233+
* @see HttpStorageDefaults#getDefaultStorageWriterConfig()
234+
* @since 2.29.0 This new api is in preview and is subject to breaking changes.
235+
*/
236+
@BetaApi
237+
public HttpStorageOptions.Builder setBlobWriteSessionConfig(
238+
@NonNull BlobWriteSessionConfig blobWriteSessionConfig) {
239+
requireNonNull(blobWriteSessionConfig, "blobWriteSessionConfig must be non null");
240+
checkArgument(
241+
blobWriteSessionConfig instanceof BlobWriteSessionConfig.HttpCompatible,
242+
"The provided instance of BlobWriteSessionConfig is not compatible with this HTTP transport.");
243+
this.blobWriteSessionConfig = blobWriteSessionConfig;
244+
return this;
245+
}
246+
221247
@Override
222248
public HttpStorageOptions build() {
223249
return new HttpStorageOptions(this, defaults());
@@ -249,6 +275,12 @@ public HttpTransportOptions getDefaultTransportOptions() {
249275
public StorageRetryStrategy getStorageRetryStrategy() {
250276
return StorageRetryStrategy.getDefaultStorageRetryStrategy();
251277
}
278+
279+
/** @since 2.29.0 This new api is in preview and is subject to breaking changes. */
280+
@BetaApi
281+
public BlobWriteSessionConfig getDefaultStorageWriterConfig() {
282+
return BlobWriteSessionConfigs.getDefault();
283+
}
252284
}
253285

254286
/**
@@ -287,7 +319,14 @@ public HttpStorageFactory() {}
287319
public Storage create(StorageOptions options) {
288320
if (options instanceof HttpStorageOptions) {
289321
HttpStorageOptions httpStorageOptions = (HttpStorageOptions) options;
290-
return new StorageImpl(httpStorageOptions);
322+
Clock clock = Clock.systemUTC();
323+
try {
324+
return new StorageImpl(
325+
httpStorageOptions, httpStorageOptions.blobWriteSessionConfig.createFactory(clock));
326+
} catch (IOException e) {
327+
throw new IllegalStateException(
328+
"Unable to instantiate HTTP com.google.cloud.storage.Storage client.", e);
329+
}
291330
} else {
292331
throw new IllegalArgumentException("Only HttpStorageOptions supported");
293332
}

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageImpl.java

Lines changed: 14 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,7 @@
3838
import com.google.cloud.WriteChannel;
3939
import com.google.cloud.storage.Acl.Entity;
4040
import com.google.cloud.storage.BlobReadChannelV2.BlobReadChannelContext;
41+
import com.google.cloud.storage.BlobWriteSessionConfig.WriterFactory;
4142
import com.google.cloud.storage.HmacKey.HmacKeyMetadata;
4243
import com.google.cloud.storage.PostPolicyV4.ConditionV4Type;
4344
import com.google.cloud.storage.PostPolicyV4.PostConditionsV4;
@@ -92,7 +93,7 @@
9293
import java.util.function.Supplier;
9394
import org.checkerframework.checker.nullness.qual.Nullable;
9495

95-
final class StorageImpl extends BaseService<StorageOptions> implements Storage {
96+
final class StorageImpl extends BaseService<StorageOptions> implements Storage, StorageInternal {
9697

9798
private static final byte[] EMPTY_BYTE_ARRAY = {};
9899
private static final String EMPTY_BYTE_ARRAY_MD5 = "1B2M2Y8AsgTpgAmY7PhCfg==";
@@ -115,11 +116,13 @@ final class StorageImpl extends BaseService<StorageOptions> implements Storage {
115116

116117
final HttpRetryAlgorithmManager retryAlgorithmManager;
117118
final StorageRpc storageRpc;
119+
final WriterFactory writerFactory;
118120

119-
StorageImpl(HttpStorageOptions options) {
121+
StorageImpl(HttpStorageOptions options, WriterFactory writerFactory) {
120122
super(options);
121123
this.retryAlgorithmManager = options.getRetryAlgorithmManager();
122124
this.storageRpc = options.getStorageRpcV1();
125+
this.writerFactory = writerFactory;
123126
}
124127

125128
@Override
@@ -1635,4 +1638,13 @@ private Bucket internalBucketGet(String bucket, Map<StorageRpc.Option, ?> option
16351638
() -> storageRpc.get(bucketPb, optionsMap),
16361639
(b) -> Conversions.json().bucketInfo().decode(b).asBucket(this));
16371640
}
1641+
1642+
@Override
1643+
public BlobWriteSession blobWriteSession(BlobInfo blobInfo, BlobWriteOption... options) {
1644+
Opts<ObjectTargetOpt> opts = Opts.unwrap(options).resolveFrom(blobInfo);
1645+
1646+
WritableByteChannelSession<?, BlobInfo> writableByteChannelSession =
1647+
writerFactory.writeSession(this, blobInfo, opts);
1648+
return BlobWriteSessions.of(writableByteChannelSession);
1649+
}
16381650
}

google-cloud-storage/src/test/java/com/google/cloud/storage/it/ITBlobWriteSessionTest.java

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,6 +26,7 @@
2626
import com.google.cloud.storage.BucketInfo;
2727
import com.google.cloud.storage.DataGenerator;
2828
import com.google.cloud.storage.GrpcStorageOptions;
29+
import com.google.cloud.storage.HttpStorageOptions;
2930
import com.google.cloud.storage.Storage;
3031
import com.google.cloud.storage.Storage.BlobWriteOption;
3132
import com.google.cloud.storage.StorageException;
@@ -49,7 +50,7 @@
4950

5051
@RunWith(StorageITRunner.class)
5152
@CrossRun(
52-
transports = {Transport.GRPC},
53+
transports = {Transport.HTTP, Transport.GRPC},
5354
backends = {Backend.PROD})
5455
public final class ITBlobWriteSessionTest {
5556

@@ -68,6 +69,7 @@ public void allDefaults() throws Exception {
6869
}
6970

7071
@Test
72+
@CrossRun.Exclude(transports = Transport.HTTP)
7173
public void bufferToTempDirThenUpload() throws Exception {
7274
StorageOptions options = null;
7375
if (transport == Transport.GRPC) {
@@ -94,6 +96,13 @@ public void overrideDefaultBufferSize() throws Exception {
9496
.setBlobWriteSessionConfig(
9597
BlobWriteSessionConfigs.getDefault().withChunkSize(256 * 1024))
9698
.build();
99+
} else if (transport == Transport.HTTP) {
100+
options =
101+
((HttpStorageOptions) storage.getOptions())
102+
.toBuilder()
103+
.setBlobWriteSessionConfig(
104+
BlobWriteSessionConfigs.getDefault().withChunkSize(256 * 1024))
105+
.build();
97106
}
98107
assertWithMessage("unable to resolve options").that(options).isNotNull();
99108
//noinspection DataFlowIssue
@@ -103,6 +112,7 @@ public void overrideDefaultBufferSize() throws Exception {
103112
}
104113

105114
@Test
115+
@CrossRun.Exclude(transports = Transport.HTTP)
106116
public void bidiTest() throws Exception {
107117
StorageOptions options = null;
108118
if (transport == Transport.GRPC) {

0 commit comments

Comments
 (0)