Skip to content

Commit d47afcf

Browse files
authored
fix: ensure all BlobWriteSession types conform to the semantics specified in BlobWriteSession (#2482)
* Improve error handling to cover more cases where errors should be converted to StorageException * Enforce BlobWriteSession#open only being able to be called once, subsequent calls will error * Make JsonResumableSessionPutTask more graceful when attempting to determine object size (testbench can omit `.size` from its response when the value is 0, possibly due to protobuf to json conversion where protobuf won't explicitly include a 0 value.) * Update com.google.cloud.storage.StorageException#coalesce to look for ApiException in causes the same way it does for BaseServiceException * Add com.google.cloud.storage.StorageOptions.Builder#setBlobWriteSessionConfig now that both Http and Grpc support these, having it on the base class is convenient * Add new integration test ITBlobWriteSessionCommonSemanticsTest which forces certain failure modes and ensures expected handling in accordance with the semantics outlined in BlobWriteSession
1 parent 5007e8f commit d47afcf

13 files changed

+377
-120
lines changed

google-cloud-storage/clirr-ignored-differences.xml

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -71,4 +71,11 @@
7171
<className>com/google/cloud/storage/transfermanager/TransferManagerConfig$Builder</className>
7272
<method>* setAllowDivideAndConquer(boolean)</method>
7373
</difference>
74+
75+
<difference>
76+
<differenceType>7013</differenceType>
77+
<className>com/google/cloud/storage/StorageOptions$Builder</className>
78+
<method>com.google.cloud.storage.StorageOptions$Builder setBlobWriteSessionConfig(com.google.cloud.storage.BlobWriteSessionConfig)</method>
79+
</difference>
80+
7481
</differences>

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSession.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.api.core.BetaApi;
2121
import java.io.IOException;
2222
import java.nio.channels.WritableByteChannel;
23+
import java.util.concurrent.TimeUnit;
2324

2425
/**
2526
* A session to write an object to Google Cloud Storage.
@@ -50,6 +51,10 @@ public interface BlobWriteSession {
5051
* <p>Upon calling {@link WritableByteChannel#close()} the object creation will be finalized, and
5152
* {@link #getResult()}s future should resolve.
5253
*
54+
* <p>The returned {@code WritableByteChannel} can throw IOExceptions from any of its usual
55+
* methods. Any {@link IOException} thrown can have a cause of a {@link StorageException}.
56+
* However, not all {@code IOExceptions} will have {@code StorageException}s.
57+
*
5358
* @throws IOException When creating the {@link WritableByteChannel} if an unrecoverable
5459
* underlying IOException occurs it can be rethrown
5560
* @throws IllegalStateException if open is called more than once
@@ -66,6 +71,10 @@ public interface BlobWriteSession {
6671
* Google Cloud Storage 2. A terminal failure occurs, the terminal failure will become the
6772
* exception result
6873
*
74+
* <p>If a terminal failure is encountered, calling either {@link ApiFuture#get()} or {@link
75+
* ApiFuture#get(long, TimeUnit)} will result in an {@link
76+
* java.util.concurrent.ExecutionException} with a cause that is the {@link StorageException}.
77+
*
6978
* @since 2.26.0 This new api is in preview and is subject to breaking changes.
7079
*/
7180
@BetaApi

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteSessions.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.core.ApiFuture;
20+
import com.google.common.base.Preconditions;
2021
import java.io.IOException;
2122
import java.nio.channels.WritableByteChannel;
2223

@@ -30,14 +31,20 @@ static BlobWriteSession of(WritableByteChannelSession<?, BlobInfo> s) {
3031

3132
static final class WritableByteChannelSessionAdapter implements BlobWriteSession {
3233
private final WritableByteChannelSession<?, BlobInfo> delegate;
34+
private boolean open;
3335

3436
private WritableByteChannelSessionAdapter(WritableByteChannelSession<?, BlobInfo> delegate) {
3537
this.delegate = delegate;
38+
open = false;
3639
}
3740

3841
@Override
3942
public WritableByteChannel open() throws IOException {
40-
return delegate.open();
43+
synchronized (this) {
44+
Preconditions.checkState(!open, "already open");
45+
open = true;
46+
return delegate.open();
47+
}
4148
}
4249

4350
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java

Lines changed: 64 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -32,6 +32,8 @@
3232
import com.google.common.util.concurrent.MoreExecutors;
3333
import com.google.storage.v2.WriteObjectRequest;
3434
import com.google.storage.v2.WriteObjectResponse;
35+
import java.io.IOException;
36+
import java.nio.ByteBuffer;
3537
import java.nio.channels.WritableByteChannel;
3638
import java.time.Clock;
3739
import java.util.Map;
@@ -202,24 +204,25 @@ static final class DecoratedWritableByteChannelSession<WBC extends WritableByteC
202204
this.decoder = decoder;
203205
}
204206

205-
@Override
206-
public WBC open() {
207-
try {
208-
return WritableByteChannelSession.super.open();
209-
} catch (Exception e) {
210-
throw StorageException.coalesce(e);
211-
}
212-
}
213-
214207
@Override
215208
public ApiFuture<WBC> openAsync() {
216-
return delegate.openAsync();
209+
return ApiFutures.catchingAsync(
210+
delegate.openAsync(),
211+
Throwable.class,
212+
throwable -> ApiFutures.immediateFailedFuture(StorageException.coalesce(throwable)),
213+
MoreExecutors.directExecutor());
217214
}
218215

219216
@Override
220217
public ApiFuture<BlobInfo> getResult() {
221-
return ApiFutures.transform(
222-
delegate.getResult(), decoder::decode, MoreExecutors.directExecutor());
218+
ApiFuture<BlobInfo> decodeResult =
219+
ApiFutures.transform(
220+
delegate.getResult(), decoder::decode, MoreExecutors.directExecutor());
221+
return ApiFutures.catchingAsync(
222+
decodeResult,
223+
Throwable.class,
224+
throwable -> ApiFutures.immediateFailedFuture(StorageException.coalesce(throwable)),
225+
MoreExecutors.directExecutor());
223226
}
224227
}
225228

@@ -233,7 +236,55 @@ static final class LazySession<R>
233236

234237
@Override
235238
public ApiFuture<BufferedWritableByteChannel> openAsync() {
236-
return lazy.getSession().openAsync();
239+
// make sure the errors coming out of the BufferedWritableByteChannel are either IOException
240+
// or StorageException
241+
return ApiFutures.transform(
242+
lazy.getSession().openAsync(),
243+
delegate ->
244+
new BufferedWritableByteChannel() {
245+
@Override
246+
public int write(ByteBuffer src) throws IOException {
247+
try {
248+
return delegate.write(src);
249+
} catch (IOException e) {
250+
throw e;
251+
} catch (Exception e) {
252+
throw StorageException.coalesce(e);
253+
}
254+
}
255+
256+
@Override
257+
public void flush() throws IOException {
258+
try {
259+
delegate.flush();
260+
} catch (IOException e) {
261+
throw e;
262+
} catch (Exception e) {
263+
throw StorageException.coalesce(e);
264+
}
265+
}
266+
267+
@Override
268+
public boolean isOpen() {
269+
try {
270+
return delegate.isOpen();
271+
} catch (Exception e) {
272+
throw StorageException.coalesce(e);
273+
}
274+
}
275+
276+
@Override
277+
public void close() throws IOException {
278+
try {
279+
delegate.close();
280+
} catch (IOException e) {
281+
throw e;
282+
} catch (Exception e) {
283+
throw StorageException.coalesce(e);
284+
}
285+
}
286+
},
287+
MoreExecutors.directExecutor());
237288
}
238289

239290
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUnbufferedWritableByteChannel.java

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -92,7 +92,12 @@ public void close() throws IOException {
9292
throw e;
9393
}
9494
} else {
95-
flusher.close(null);
95+
try {
96+
flusher.close(null);
97+
} catch (RuntimeException e) {
98+
resultFuture.setException(e);
99+
throw e;
100+
}
96101
}
97102
open = false;
98103
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicUploadSessionBuilder.java

Lines changed: 11 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -49,7 +49,7 @@ GapicBidiWritableByteChannelSessionBuilder bidiByteChannel(
4949
}
5050

5151
ApiFuture<ResumableWrite> resumableWrite(
52-
UnaryCallable<StartResumableWriteRequest, StartResumableWriteResponse> x,
52+
UnaryCallable<StartResumableWriteRequest, StartResumableWriteResponse> callable,
5353
WriteObjectRequest writeObjectRequest) {
5454
StartResumableWriteRequest.Builder b = StartResumableWriteRequest.newBuilder();
5555
if (writeObjectRequest.hasWriteObjectSpec()) {
@@ -65,9 +65,16 @@ ApiFuture<ResumableWrite> resumableWrite(
6565
Function<String, WriteObjectRequest> f =
6666
uploadId ->
6767
writeObjectRequest.toBuilder().clearWriteObjectSpec().setUploadId(uploadId).build();
68-
return ApiFutures.transform(
69-
x.futureCall(req),
70-
(resp) -> new ResumableWrite(req, resp, f),
68+
ApiFuture<ResumableWrite> futureResumableWrite =
69+
ApiFutures.transform(
70+
callable.futureCall(req),
71+
(resp) -> new ResumableWrite(req, resp, f),
72+
MoreExecutors.directExecutor());
73+
// make sure we wrap any failure as a storage exception
74+
return ApiFutures.catchingAsync(
75+
futureResumableWrite,
76+
Throwable.class,
77+
throwable -> ApiFutures.immediateFailedFuture(StorageException.coalesce(throwable)),
7178
MoreExecutors.directExecutor());
7279
}
7380

google-cloud-storage/src/main/java/com/google/cloud/storage/JsonResumableSessionPutTask.java

Lines changed: 7 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -120,7 +120,7 @@ public void rewindTo(long offset) {
120120
}
121121
} else if (finalizing && JsonResumableSessionFailureScenario.isOk(code)) {
122122
@Nullable StorageObject storageObject;
123-
@Nullable BigInteger actualSize;
123+
BigInteger actualSize = BigInteger.ZERO;
124124

125125
Long contentLength = response.getHeaders().getContentLength();
126126
String contentType = response.getHeaders().getContentType();
@@ -130,7 +130,12 @@ public void rewindTo(long offset) {
130130
boolean isJson = contentType != null && contentType.startsWith("application/json");
131131
if (isJson) {
132132
storageObject = response.parseAs(StorageObject.class);
133-
actualSize = storageObject != null ? storageObject.getSize() : null;
133+
if (storageObject != null) {
134+
BigInteger size = storageObject.getSize();
135+
if (size != null) {
136+
actualSize = size;
137+
}
138+
}
134139
} else if ((contentLength == null || contentLength == 0) && storedContentLength != null) {
135140
// when a signed url is used, the finalize response is empty
136141
response.ignore();
@@ -150,7 +155,6 @@ public void rewindTo(long offset) {
150155
int compare = expectedSize.compareTo(actualSize);
151156
if (compare == 0) {
152157
success = true;
153-
//noinspection DataFlowIssue compareTo result will filter out actualSize == null
154158
return ResumableOperationResult.complete(storageObject, actualSize.longValue());
155159
} else if (compare > 0) {
156160
StorageException se =

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageException.java

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -118,6 +118,9 @@ static BaseServiceException coalesce(Throwable t) {
118118
if (t instanceof ApiException) {
119119
return asStorageException((ApiException) t);
120120
}
121+
if (t.getCause() instanceof ApiException) {
122+
return asStorageException((ApiException) t.getCause());
123+
}
121124
return getStorageException(t);
122125
}
123126

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageOptions.java

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,14 +22,17 @@
2222
import com.google.cloud.ServiceDefaults;
2323
import com.google.cloud.ServiceOptions;
2424
import com.google.cloud.http.HttpTransportOptions;
25+
import com.google.cloud.storage.GrpcStorageOptions.GrpcStorageDefaults;
2526
import com.google.cloud.storage.HttpStorageOptions.HttpStorageDefaults;
2627
import com.google.cloud.storage.HttpStorageOptions.HttpStorageFactory;
2728
import com.google.cloud.storage.HttpStorageOptions.HttpStorageRpcFactory;
29+
import com.google.cloud.storage.Storage.BlobWriteOption;
2830
import com.google.cloud.storage.TransportCompatibility.Transport;
2931
import com.google.cloud.storage.spi.StorageRpcFactory;
3032
import java.io.IOException;
3133
import java.io.InputStream;
3234
import java.util.Properties;
35+
import org.checkerframework.checker.nullness.qual.NonNull;
3336

3437
public abstract class StorageOptions extends ServiceOptions<Storage, StorageOptions> {
3538

@@ -95,6 +98,18 @@ public abstract static class Builder
9598

9699
public abstract Builder setStorageRetryStrategy(StorageRetryStrategy storageRetryStrategy);
97100

101+
/**
102+
* @see BlobWriteSessionConfig
103+
* @see BlobWriteSessionConfigs
104+
* @see Storage#blobWriteSession(BlobInfo, BlobWriteOption...)
105+
* @see HttpStorageDefaults#getDefaultStorageWriterConfig()
106+
* @see GrpcStorageDefaults#getDefaultStorageWriterConfig()
107+
* @since 2.37.0 This new api is in preview and is subject to breaking changes.
108+
*/
109+
@BetaApi
110+
public abstract StorageOptions.Builder setBlobWriteSessionConfig(
111+
@NonNull BlobWriteSessionConfig blobWriteSessionConfig);
112+
98113
@Override
99114
public abstract StorageOptions build();
100115
}

0 commit comments

Comments
 (0)