Skip to content

Commit 2431ee1

Browse files
committed
chore: refactor usages of Retrying to use new Retrier
* chore: refactor retrying to be instance based rather than static utility method This is necessary to allow decoration on the retrying for things like open telemetry integration. * chore: replace simple unary invocations of Retrying.run with retrier.run * chore: migrate copy methods to use retrier * chore: migrate "download" code paths to use retrier * chore: migrate "upload" code paths to use retrier * chore: remove RetryHelper.run wrappers * chore: fix serialization test NPE
1 parent aeb621a commit 2431ee1

44 files changed

Lines changed: 641 additions & 582 deletions

File tree

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedReadableByteChannel.java

Lines changed: 6 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -29,6 +29,8 @@
2929
import com.google.api.services.storage.Storage.Objects;
3030
import com.google.api.services.storage.Storage.Objects.Get;
3131
import com.google.api.services.storage.model.StorageObject;
32+
import com.google.cloud.storage.Conversions.Decoder;
33+
import com.google.cloud.storage.Retrying.Retrier;
3234
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
3335
import com.google.cloud.storage.spi.v1.StorageRpc;
3436
import com.google.common.annotations.VisibleForTesting;
@@ -55,7 +57,6 @@
5557
import java.util.Locale;
5658
import java.util.Map;
5759
import java.util.Map.Entry;
58-
import java.util.function.Function;
5960
import javax.annotation.concurrent.Immutable;
6061
import org.checkerframework.checker.nullness.qual.NonNull;
6162
import org.checkerframework.checker.nullness.qual.Nullable;
@@ -65,8 +66,8 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann
6566
private final ApiaryReadRequest apiaryReadRequest;
6667
private final Storage storage;
6768
private final SettableApiFuture<StorageObject> result;
68-
private final HttpStorageOptions options;
6969
private final ResultRetryAlgorithm<?> resultRetryAlgorithm;
70+
private final Retrier retrier;
7071

7172
private long position;
7273
private ScatteringByteChannel sbc;
@@ -80,12 +81,12 @@ class ApiaryUnbufferedReadableByteChannel implements UnbufferedReadableByteChann
8081
ApiaryReadRequest apiaryReadRequest,
8182
Storage storage,
8283
SettableApiFuture<StorageObject> result,
83-
HttpStorageOptions options,
84+
Retrier retrier,
8485
ResultRetryAlgorithm<?> resultRetryAlgorithm) {
8586
this.apiaryReadRequest = apiaryReadRequest;
8687
this.storage = storage;
8788
this.result = result;
88-
this.options = options;
89+
this.retrier = retrier;
8990
this.resultRetryAlgorithm =
9091
new BasicResultRetryAlgorithm<Object>() {
9192
@Override
@@ -113,7 +114,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
113114
long totalRead = 0;
114115
do {
115116
if (sbc == null) {
116-
sbc = Retrying.run(options, resultRetryAlgorithm, this::open, Function.identity());
117+
sbc = retrier.run(resultRetryAlgorithm, this::open, Decoder.identity());
117118
}
118119

119120
long totalRemaining = Buffers.totalRemaining(dsts, offset, length);

google-cloud-storage/src/main/java/com/google/cloud/storage/ApiaryUnbufferedWritableByteChannel.java

Lines changed: 3 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,9 +17,8 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.core.SettableApiFuture;
20-
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2120
import com.google.api.services.storage.model.StorageObject;
22-
import com.google.cloud.storage.Retrying.RetryingDependencies;
21+
import com.google.cloud.storage.Retrying.RetrierWithAlg;
2322
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
2423
import java.io.IOException;
2524
import java.nio.ByteBuffer;
@@ -42,12 +41,11 @@ final class ApiaryUnbufferedWritableByteChannel implements UnbufferedWritableByt
4241

4342
ApiaryUnbufferedWritableByteChannel(
4443
HttpClientContext httpClientContext,
45-
RetryingDependencies deps,
46-
ResultRetryAlgorithm<?> alg,
44+
RetrierWithAlg retrier,
4745
JsonResumableWrite resumableWrite,
4846
SettableApiFuture<StorageObject> result,
4947
LongConsumer committedBytesCallback) {
50-
this.session = ResumableSession.json(httpClientContext, deps, alg, resumableWrite);
48+
this.session = ResumableSession.json(httpClientContext, retrier, resumableWrite);
5149
this.result = result;
5250
this.committedBytesCallback = committedBytesCallback;
5351
this.open = true;

google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,7 +122,7 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
122122
.setByteStringStrategy(ByteStringStrategy.copy())
123123
.resumable()
124124
.withRetryConfig(
125-
grpc.getOptions(), grpc.retryAlgorithmManager.idempotent())
125+
grpc.retrier.withAlg(grpc.retryAlgorithmManager.idempotent()))
126126
.buffered(BufferHandle.allocate(bufferSize))
127127
.setStartAsync(startResumableWrite)
128128
.build();

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadChannelV2.java

Lines changed: 34 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.cloud.RestorableState;
2323
import com.google.cloud.storage.ApiaryUnbufferedReadableByteChannel.ApiaryReadRequest;
2424
import com.google.cloud.storage.HttpDownloadSessionBuilder.ReadableByteChannelSessionBuilder;
25+
import com.google.cloud.storage.Retrying.Retrier;
2526
import com.google.cloud.storage.spi.v1.StorageRpc;
2627
import com.google.common.base.MoreObjects;
2728
import java.io.Serializable;
@@ -150,16 +151,19 @@ static final class BlobReadChannelContext {
150151
private final HttpRetryAlgorithmManager retryAlgorithmManager;
151152
private final HttpClientContext httpClientContext;
152153
private final Storage apiaryClient;
154+
private final Retrier retrier;
153155

154156
private BlobReadChannelContext(
155157
HttpStorageOptions storageOptions,
156158
HttpRetryAlgorithmManager retryAlgorithmManager,
157159
HttpClientContext httpClientContext,
158-
Storage apiaryClient) {
160+
Storage apiaryClient,
161+
Retrier retrier) {
159162
this.storageOptions = storageOptions;
160163
this.retryAlgorithmManager = retryAlgorithmManager;
161164
this.httpClientContext = httpClientContext;
162165
this.apiaryClient = apiaryClient;
166+
this.retrier = retrier;
163167
}
164168

165169
public HttpStorageOptions getStorageOptions() {
@@ -178,21 +182,40 @@ public Storage getApiaryClient() {
178182
return apiaryClient;
179183
}
180184

185+
public Retrier getRetrier() {
186+
return retrier;
187+
}
188+
189+
/**
190+
* This method is pretty unsafe, but so is all of the Capture/Restore API, and it leaks its
191+
* sludge all over everything. In general, prefer {@link
192+
* #from(com.google.cloud.storage.StorageImpl)} over this method.
193+
*
194+
* <p>Essentially, cause options to instantiate a StorageImpl if it hasn't done so already, then
195+
* root around to try and find its retrier.
196+
*/
181197
static BlobReadChannelContext from(HttpStorageOptions options) {
198+
com.google.cloud.storage.Storage storage = options.getService();
199+
if (storage instanceof OtelStorageDecorator) {
200+
OtelStorageDecorator decorator = (OtelStorageDecorator) storage;
201+
storage = decorator.delegate;
202+
}
203+
if (storage instanceof StorageImpl) {
204+
StorageImpl impl = (StorageImpl) storage;
205+
return from(impl);
206+
}
207+
throw new IllegalArgumentException(
208+
"Unable to restore context from provided options instance");
209+
}
210+
211+
static BlobReadChannelContext from(com.google.cloud.storage.StorageImpl s) {
212+
HttpStorageOptions options = s.getOptions();
182213
return new BlobReadChannelContext(
183214
options,
184215
options.getRetryAlgorithmManager(),
185216
HttpClientContext.from(options.getStorageRpcV1()),
186-
options.getStorageRpcV1().getStorage());
187-
}
188-
189-
static BlobReadChannelContext from(com.google.cloud.storage.Storage s) {
190-
StorageOptions options = s.getOptions();
191-
if (options instanceof HttpStorageOptions) {
192-
HttpStorageOptions httpStorageOptions = (HttpStorageOptions) options;
193-
return from(httpStorageOptions);
194-
}
195-
throw new IllegalArgumentException("Only HttpStorageOptions based instance supported");
217+
options.getStorageRpcV1().getStorage(),
218+
s.retrier);
196219
}
197220

198221
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobWriteChannelV2.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -76,8 +76,9 @@ protected LazyWriteChannel<StorageObject> newLazyWriteChannel() {
7676
.resumable()
7777
.setCommittedBytesCallback(this::setCommittedPosition)
7878
.withRetryConfig(
79-
blobChannelContext.getStorageOptions().asRetryDependencies(),
80-
blobChannelContext.getRetryAlgorithmManager().idempotent())
79+
blobChannelContext
80+
.getRetrier()
81+
.withAlg(blobChannelContext.getRetryAlgorithmManager().idempotent()))
8182
.buffered(getBufferHandle())
8283
.setStartAsync(ApiFutures.immediateFuture(start))
8384
.build());

google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -167,7 +167,7 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
167167
.setByteStringStrategy(ByteStringStrategy.copy())
168168
.resumable()
169169
.withRetryConfig(
170-
grpc.getOptions(), grpc.retryAlgorithmManager.idempotent())
170+
grpc.retrier.withAlg(grpc.retryAlgorithmManager.idempotent()))
171171
.buffered(BufferHandle.allocate(chunkSize))
172172
.setStartAsync(startResumableWrite)
173173
.build();
@@ -190,8 +190,9 @@ public WritableByteChannelSession<?, BlobInfo> writeSession(
190190
json.getOptions(),
191191
updated,
192192
optionsMap,
193-
json.retryAlgorithmManager.getForResumableUploadSessionCreate(
194-
optionsMap));
193+
json.retrier.withAlg(
194+
json.retryAlgorithmManager.getForResumableUploadSessionCreate(
195+
optionsMap)));
195196
ApiFuture<JsonResumableWrite> startAsync =
196197
ApiFutures.immediateFuture(
197198
JsonResumableWrite.of(

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedAppendableWriteableByteChannel.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -18,7 +18,6 @@
1818

1919
import com.google.api.core.SettableApiFuture;
2020
import com.google.api.gax.grpc.GrpcCallContext;
21-
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2221
import com.google.api.gax.rpc.ApiException;
2322
import com.google.api.gax.rpc.ApiStreamObserver;
2423
import com.google.api.gax.rpc.BidiStreamingCallable;
@@ -27,7 +26,7 @@
2726
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
2827
import com.google.cloud.storage.Conversions.Decoder;
2928
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
30-
import com.google.cloud.storage.Retrying.RetryingDependencies;
29+
import com.google.cloud.storage.Retrying.RetrierWithAlg;
3130
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
3231
import com.google.common.annotations.VisibleForTesting;
3332
import com.google.common.collect.ImmutableList;
@@ -51,8 +50,7 @@
5150
final class GapicBidiUnbufferedAppendableWritableByteChannel
5251
implements UnbufferedWritableByteChannel {
5352
private final BidiStreamingCallable<BidiWriteObjectRequest, BidiWriteObjectResponse> write;
54-
private final RetryingDependencies deps;
55-
private final ResultRetryAlgorithm<?> alg;
53+
private final RetrierWithAlg retrier;
5654
private final SettableApiFuture<BidiWriteObjectResponse> resultFuture;
5755
private final ChunkSegmenter chunkSegmenter;
5856

@@ -68,15 +66,13 @@ final class GapicBidiUnbufferedAppendableWritableByteChannel
6866

6967
GapicBidiUnbufferedAppendableWritableByteChannel(
7068
BidiStreamingCallable<BidiWriteObjectRequest, BidiWriteObjectResponse> write,
71-
RetryingDependencies deps,
72-
ResultRetryAlgorithm<?> alg,
69+
RetrierWithAlg retrier,
7370
SettableApiFuture<BidiWriteObjectResponse> resultFuture,
7471
ChunkSegmenter chunkSegmenter,
7572
BidiWriteCtx<BidiAppendableWrite> writeCtx,
7673
Supplier<GrpcCallContext> baseContextSupplier) {
7774
this.write = write;
78-
this.deps = deps;
79-
this.alg = alg;
75+
this.retrier = retrier;
8076
this.resultFuture = resultFuture;
8177
this.chunkSegmenter = chunkSegmenter;
8278

@@ -238,9 +234,7 @@ private ApiStreamObserver<BidiWriteObjectRequest> openedStream() {
238234
}
239235

240236
private void flush(@NonNull List<BidiWriteObjectRequest> segments) {
241-
Retrying.run(
242-
deps,
243-
alg,
237+
retrier.run(
244238
() -> {
245239
try {
246240
ApiStreamObserver<BidiWriteObjectRequest> opened = openedStream();

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java

Lines changed: 5 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import com.google.api.core.SettableApiFuture;
2222
import com.google.api.gax.grpc.GrpcCallContext;
23-
import com.google.api.gax.retrying.ResultRetryAlgorithm;
2423
import com.google.api.gax.rpc.ApiException;
2524
import com.google.api.gax.rpc.ApiStreamObserver;
2625
import com.google.api.gax.rpc.BidiStreamingCallable;
@@ -29,7 +28,7 @@
2928
import com.google.cloud.storage.ChunkSegmenter.ChunkSegment;
3029
import com.google.cloud.storage.Conversions.Decoder;
3130
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
32-
import com.google.cloud.storage.Retrying.RetryingDependencies;
31+
import com.google.cloud.storage.Retrying.RetrierWithAlg;
3332
import com.google.cloud.storage.UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel;
3433
import com.google.common.annotations.VisibleForTesting;
3534
import com.google.common.collect.ImmutableList;
@@ -51,8 +50,7 @@
5150

5251
final class GapicBidiUnbufferedWritableByteChannel implements UnbufferedWritableByteChannel {
5352
private final BidiStreamingCallable<BidiWriteObjectRequest, BidiWriteObjectResponse> write;
54-
private final RetryingDependencies deps;
55-
private final ResultRetryAlgorithm<?> alg;
53+
private final RetrierWithAlg retrier;
5654
private final SettableApiFuture<BidiWriteObjectResponse> resultFuture;
5755
private final ChunkSegmenter chunkSegmenter;
5856

@@ -69,15 +67,13 @@ final class GapicBidiUnbufferedWritableByteChannel implements UnbufferedWritable
6967

7068
GapicBidiUnbufferedWritableByteChannel(
7169
BidiStreamingCallable<BidiWriteObjectRequest, BidiWriteObjectResponse> write,
72-
RetryingDependencies deps,
73-
ResultRetryAlgorithm<?> alg,
70+
RetrierWithAlg retrier,
7471
SettableApiFuture<BidiWriteObjectResponse> resultFuture,
7572
ChunkSegmenter chunkSegmenter,
7673
BidiWriteCtx<BidiResumableWrite> writeCtx,
7774
Supplier<GrpcCallContext> baseContextSupplier) {
7875
this.write = write;
79-
this.deps = deps;
80-
this.alg = alg;
76+
this.retrier = retrier;
8177
this.resultFuture = resultFuture;
8278
this.chunkSegmenter = chunkSegmenter;
8379

@@ -241,9 +237,7 @@ private ApiStreamObserver<BidiWriteObjectRequest> openedStream() {
241237
}
242238

243239
private void flush(@NonNull List<BidiWriteObjectRequest> segments) {
244-
Retrying.run(
245-
deps,
246-
alg,
240+
retrier.run(
247241
() -> {
248242
try {
249243
ApiStreamObserver<BidiWriteObjectRequest> opened = openedStream();

0 commit comments

Comments
 (0)