Skip to content

Commit b8dda33

Browse files
committed
chore: start encapsulating stream and open signaling for BlobDescriptorImpl
Currently, a BlobDescriptorImpl it given its stream. This change shifts things so that BlobDescriptorImpl can create its stream from the requisite inputs. This is needed to allow retries/handling redirects when opening a stream. This is a first pass in what will be at least a few changes to reach a point where a stream can retry. Add a test to induce a 404 when attempting to open a BlobDescriptor.
1 parent 264683e commit b8dda33

File tree

2 files changed

+144
-34
lines changed

2 files changed

+144
-34
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 128 additions & 32 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.api.gax.grpc.GrpcCallContext;
2323
import com.google.api.gax.rpc.BidiStreamingCallable;
2424
import com.google.api.gax.rpc.ClientStream;
25+
import com.google.api.gax.rpc.ResponseObserver;
2526
import com.google.api.gax.rpc.StateCheckingResponseObserver;
2627
import com.google.api.gax.rpc.StreamController;
2728
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
@@ -38,17 +39,20 @@
3839
import java.util.List;
3940
import java.util.Map;
4041
import java.util.concurrent.ConcurrentHashMap;
42+
import java.util.concurrent.ExecutionException;
4143
import java.util.concurrent.Executor;
44+
import java.util.concurrent.TimeUnit;
45+
import java.util.concurrent.TimeoutException;
4246
import java.util.concurrent.atomic.AtomicLong;
4347
import java.util.concurrent.atomic.AtomicReference;
4448

4549
final class BlobDescriptorImpl implements BlobDescriptor {
4650

47-
private final BlobDescriptorStreamPair stream;
51+
private final BlobDescriptorStream stream;
4852
private final BlobDescriptorState state;
4953
private final BlobInfo info;
5054

51-
private BlobDescriptorImpl(BlobDescriptorStreamPair stream, BlobDescriptorState state) {
55+
private BlobDescriptorImpl(BlobDescriptorStream stream, BlobDescriptorState state) {
5256
this.stream = stream;
5357
this.state = state;
5458
this.info = Conversions.grpc().blobInfo().decode(state.metadata.get());
@@ -63,7 +67,7 @@ public ApiFuture<byte[]> readRangeAsBytes(ByteRangeSpec range) {
6367
BidiReadObjectRequest request =
6468
BidiReadObjectRequest.newBuilder().addReadRanges(value.makeReadRange()).build();
6569
state.outstandingReads.put(readId, value);
66-
stream.requestStream.send(request);
70+
stream.send(request);
6771
return future;
6872
}
6973

@@ -78,34 +82,136 @@ static ApiFuture<BlobDescriptor> create(
7882
BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
7983
ResponseContentLifecycleManager<BidiReadObjectResponse> bidiResponseContentLifecycleManager,
8084
Executor executor) {
81-
SettableApiFuture<Void> pendingOpen = SettableApiFuture.create();
8285
BlobDescriptorState state = new BlobDescriptorState(openRequest);
86+
8387
BlobDescriptorResponseObserver responseObserver =
84-
new BlobDescriptorResponseObserver(
85-
pendingOpen, state, executor, bidiResponseContentLifecycleManager);
86-
ClientStream<BidiReadObjectRequest> requestStream =
87-
callable.splitCall(responseObserver, context);
88-
BlobDescriptorStreamPair stream = new BlobDescriptorStreamPair(requestStream, responseObserver);
88+
new BlobDescriptorResponseObserver(state, executor, bidiResponseContentLifecycleManager);
89+
90+
BlobDescriptorStream stream = new BlobDescriptorStream(callable, context, responseObserver);
91+
8992
ApiFuture<BlobDescriptor> blobDescriptorFuture =
90-
ApiFutures.transform(
91-
pendingOpen, nowOpen -> new BlobDescriptorImpl(stream, state), executor);
92-
stream.getRequestStream().send(openRequest);
93+
ApiFutures.transform(stream, nowOpen -> new BlobDescriptorImpl(stream, state), executor);
94+
stream.send(openRequest);
9395
return StorageException.coalesceAsync(blobDescriptorFuture);
9496
}
9597

96-
private static final class BlobDescriptorStreamPair {
97-
private final ClientStream<BidiReadObjectRequest> requestStream;
98-
private final BlobDescriptorResponseObserver responseObserver;
98+
private static final class BlobDescriptorStream
99+
implements ClientStream<BidiReadObjectRequest>, ApiFuture<Void> {
100+
private final SettableApiFuture<Void> openSignal;
99101

100-
BlobDescriptorStreamPair(
101-
ClientStream<BidiReadObjectRequest> requestStream,
102+
private final BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable;
103+
private final GrpcCallContext context;
104+
private final ResponseObserver<BidiReadObjectResponse> responseObserver;
105+
private final OpenMonitorResponseObserver openMonitorResponseObserver;
106+
107+
private volatile ClientStream<BidiReadObjectRequest> requestStream;
108+
109+
public BlobDescriptorStream(
110+
BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
111+
GrpcCallContext context,
102112
BlobDescriptorResponseObserver responseObserver) {
103-
this.requestStream = requestStream;
113+
this.callable = callable;
114+
this.context = context;
104115
this.responseObserver = responseObserver;
116+
this.openMonitorResponseObserver = new OpenMonitorResponseObserver(responseObserver);
117+
this.openSignal = SettableApiFuture.create();
105118
}
106119

107120
public ClientStream<BidiReadObjectRequest> getRequestStream() {
108-
return requestStream;
121+
if (requestStream != null) {
122+
return requestStream;
123+
} else {
124+
synchronized (this) {
125+
if (requestStream == null) {
126+
requestStream = callable.splitCall(openMonitorResponseObserver, context);
127+
}
128+
return requestStream;
129+
}
130+
}
131+
}
132+
133+
@Override
134+
public void send(BidiReadObjectRequest request) {
135+
getRequestStream().send(request);
136+
}
137+
138+
@Override
139+
public void closeSendWithError(Throwable t) {
140+
getRequestStream().closeSendWithError(t);
141+
}
142+
143+
@Override
144+
public void closeSend() {
145+
getRequestStream().closeSend();
146+
}
147+
148+
@Override
149+
public boolean isSendReady() {
150+
return getRequestStream().isSendReady();
151+
}
152+
153+
@Override
154+
public void addListener(Runnable listener, Executor executor) {
155+
openSignal.addListener(listener, executor);
156+
}
157+
158+
@Override
159+
public boolean cancel(boolean mayInterruptIfRunning) {
160+
return openSignal.cancel(mayInterruptIfRunning);
161+
}
162+
163+
@Override
164+
public Void get() throws InterruptedException, ExecutionException {
165+
return openSignal.get();
166+
}
167+
168+
@Override
169+
public Void get(long timeout, TimeUnit unit)
170+
throws InterruptedException, ExecutionException, TimeoutException {
171+
return openSignal.get(timeout, unit);
172+
}
173+
174+
@Override
175+
public boolean isCancelled() {
176+
return openSignal.isCancelled();
177+
}
178+
179+
@Override
180+
public boolean isDone() {
181+
return openSignal.isDone();
182+
}
183+
184+
private class OpenMonitorResponseObserver
185+
extends StateCheckingResponseObserver<BidiReadObjectResponse> {
186+
187+
private final BlobDescriptorResponseObserver responseObserver;
188+
189+
private OpenMonitorResponseObserver(BlobDescriptorResponseObserver responseObserver) {
190+
this.responseObserver = responseObserver;
191+
}
192+
193+
@Override
194+
protected void onStartImpl(StreamController controller) {
195+
responseObserver.onStartImpl(controller);
196+
}
197+
198+
@Override
199+
protected void onResponseImpl(BidiReadObjectResponse response) {
200+
responseObserver.onResponseImpl(response);
201+
openSignal.set(null);
202+
}
203+
204+
@Override
205+
protected void onErrorImpl(Throwable t) {
206+
responseObserver.onErrorImpl(t);
207+
openSignal.setException(t);
208+
}
209+
210+
@Override
211+
protected void onCompleteImpl() {
212+
responseObserver.onCompleteImpl();
213+
openSignal.set(null);
214+
}
109215
}
110216
}
111217

@@ -118,15 +224,11 @@ private static final class BlobDescriptorResponseObserver
118224
private final ResponseContentLifecycleManager<BidiReadObjectResponse>
119225
bidiResponseContentLifecycleManager;
120226

121-
private final SettableApiFuture<Void> openSignal;
122-
123-
public BlobDescriptorResponseObserver(
124-
SettableApiFuture<Void> openSignal,
227+
private BlobDescriptorResponseObserver(
125228
BlobDescriptorState state,
126229
Executor exec,
127230
ResponseContentLifecycleManager<BidiReadObjectResponse>
128231
bidiResponseContentLifecycleManager) {
129-
this.openSignal = openSignal;
130232
this.state = state;
131233
this.exec = exec;
132234
this.bidiResponseContentLifecycleManager = bidiResponseContentLifecycleManager;
@@ -146,11 +248,9 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
146248
bidiResponseContentLifecycleManager.get(response)) {
147249
if (response.hasMetadata()) {
148250
state.metadata.set(response.getMetadata());
149-
openSignal.set(null);
150251
}
151252
if (response.hasReadHandle()) {
152253
state.ref.set(response.getReadHandle());
153-
openSignal.set(null);
154254
}
155255
List<ObjectRangeData> rangeData = response.getObjectDataRangesList();
156256
if (rangeData.isEmpty()) {
@@ -180,14 +280,10 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
180280
}
181281

182282
@Override
183-
protected void onErrorImpl(Throwable t) {
184-
openSignal.setException(t);
185-
}
283+
protected void onErrorImpl(Throwable t) {}
186284

187285
@Override
188-
protected void onCompleteImpl() {
189-
openSignal.set(null);
190-
}
286+
protected void onCompleteImpl() {}
191287
}
192288

193289
@VisibleForTesting

google-cloud-storage/src/test/java/com/google/cloud/storage/ITBlobDescriptorTest.java

Lines changed: 16 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -19,6 +19,7 @@
1919
import static com.google.cloud.storage.ByteSizeConstants._2MiB;
2020
import static com.google.cloud.storage.TestUtils.assertAll;
2121
import static com.google.common.truth.Truth.assertThat;
22+
import static org.junit.Assert.assertThrows;
2223

2324
import com.google.api.core.ApiFuture;
2425
import com.google.api.core.ApiFutures;
@@ -72,8 +73,7 @@ public void lotsOfBytes() throws Exception {
7273
BlobId blobId = BlobId.of("ping", "someobject");
7374
for (int j = 0; j < 2; j++) {
7475

75-
BlobDescriptor blobDescriptor =
76-
storage.getBlobDescriptor(blobId).get(30, TimeUnit.SECONDS);
76+
BlobDescriptor blobDescriptor = storage.getBlobDescriptor(blobId).get(30, TimeUnit.SECONDS);
7777

7878
Stopwatch sw = Stopwatch.createStarted();
7979
int numRangesToRead = 256;
@@ -112,4 +112,18 @@ public void lotsOfBytes() throws Exception {
112112
() -> assertThat(finalLength).isEqualTo(numRangesToRead * _2MiB));
113113
}
114114
}
115+
116+
@Test
117+
public void readFromBucketThatDoesNotExistShouldRaiseStorageExceptionWith404() {
118+
BlobId blobId = BlobId.of("gcs-grpc-team-bucket-that-does-not-exist", "someobject");
119+
120+
ApiFuture<BlobDescriptor> futureBlobDescriptor = storage.getBlobDescriptor(blobId);
121+
122+
ExecutionException ee =
123+
assertThrows(ExecutionException.class, () -> futureBlobDescriptor.get(5, TimeUnit.SECONDS));
124+
125+
assertThat(ee).hasCauseThat().isInstanceOf(StorageException.class);
126+
StorageException cause = (StorageException) ee.getCause();
127+
assertThat(cause.getCode()).isEqualTo(404);
128+
}
115129
}

0 commit comments

Comments
 (0)