Skip to content

Commit 6a80994

Browse files
committed
chore: add tests to validate an individual read fail due to stream closure cleans up properly
1 parent 69d01d4 commit 6a80994

File tree

2 files changed

+97
-0
lines changed

2 files changed

+97
-0
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStreamRead.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -340,6 +340,7 @@ public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
340340
break;
341341
} else if (poll instanceof SmuggledFailure) {
342342
SmuggledFailure throwable = (SmuggledFailure) poll;
343+
close();
343344
BaseServiceException coalesce = StorageException.coalesce(throwable.getSmuggled());
344345
throw new IOException(coalesce);
345346
} else {

google-cloud-storage/src/test/java/com/google/cloud/storage/BlobDescriptorStreamTest.java

Lines changed: 96 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -32,24 +32,30 @@
3232
import com.google.api.gax.rpc.ClientStreamReadyObserver;
3333
import com.google.api.gax.rpc.ResponseObserver;
3434
import com.google.cloud.storage.Backoff.Jitterer;
35+
import com.google.cloud.storage.BlobDescriptorStreamRead.AccumulatingRead;
36+
import com.google.cloud.storage.BlobDescriptorStreamRead.StreamingRead;
3537
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
3638
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
3739
import com.google.cloud.storage.RetryContext.OnFailure;
3840
import com.google.cloud.storage.RetryContext.OnSuccess;
3941
import com.google.cloud.storage.RetryContext.RetryContextProvider;
4042
import com.google.cloud.storage.RetryContextTest.BlockingOnSuccess;
4143
import com.google.cloud.storage.Retrying.RetryingDependencies;
44+
import com.google.protobuf.ByteString;
4245
import com.google.storage.v2.BidiReadObjectRequest;
4346
import com.google.storage.v2.BidiReadObjectResponse;
4447
import com.google.storage.v2.BidiReadObjectSpec;
4548
import com.google.storage.v2.BucketName;
4649
import com.google.storage.v2.Object;
4750
import java.io.IOException;
51+
import java.nio.ByteBuffer;
4852
import java.nio.channels.AsynchronousCloseException;
4953
import java.util.concurrent.Executors;
5054
import java.util.concurrent.ScheduledExecutorService;
5155
import java.util.concurrent.TimeUnit;
56+
import java.util.concurrent.atomic.AtomicBoolean;
5257
import java.util.concurrent.atomic.AtomicLong;
58+
import java.util.function.Function;
5359
import org.junit.AfterClass;
5460
import org.junit.BeforeClass;
5561
import org.junit.Test;
@@ -229,6 +235,96 @@ public void closingShouldFailPendingReads() throws Exception {
229235
});
230236
}
231237

238+
@Test
239+
public void streamingRead_mustCloseQueuedResponsesWhenFailed() throws Exception {
240+
try (StreamingRead read1 =
241+
BlobDescriptorStreamRead.streamingRead(1, RangeSpec.all(), RetryContext.neverRetry())) {
242+
state.putOutstandingRead(1, read1);
243+
BlobDescriptorStream stream =
244+
BlobDescriptorStream.create(exec, callable, state, RetryContext.neverRetry());
245+
246+
ByteString bytes1 = ByteString.copyFrom(DataGenerator.base64Characters().genBytes(9));
247+
ByteString bytes2 = ByteString.copyFrom(DataGenerator.base64Characters().genBytes(9));
248+
249+
AtomicBoolean bytes1Close = new AtomicBoolean(false);
250+
AtomicBoolean bytes2Close = new AtomicBoolean(false);
251+
252+
try (ResponseContentLifecycleHandle<ByteString> handle =
253+
ResponseContentLifecycleHandle.create(
254+
bytes1,
255+
ByteString::asReadOnlyByteBufferList,
256+
() -> bytes1Close.compareAndSet(false, true))) {
257+
read1.accept(handle.borrow(Function.identity()));
258+
}
259+
try (ResponseContentLifecycleHandle<ByteString> handle =
260+
ResponseContentLifecycleHandle.create(
261+
bytes2,
262+
ByteString::asReadOnlyByteBufferList,
263+
() -> bytes2Close.compareAndSet(false, true))) {
264+
read1.accept(handle.borrow(Function.identity()));
265+
}
266+
267+
// read some bytes, causing leftovers to be populated
268+
read1.read(ByteBuffer.allocate(1));
269+
stream.close();
270+
271+
// call read again to observe the async close that happens
272+
IOException ioe = assertThrows(IOException.class, () -> read1.read(ByteBuffer.allocate(32)));
273+
274+
assertAll(
275+
() -> assertThat(bytes1Close.get()).isTrue(),
276+
() -> assertThat(bytes2Close.get()).isTrue(),
277+
() -> assertThat(read1.acceptingBytes()).isFalse(),
278+
() -> assertThat(ioe).hasCauseThat().isInstanceOf(StorageException.class),
279+
() -> assertThat(ioe).hasCauseThat().hasMessageThat().contains("Parent stream shutdown"));
280+
}
281+
}
282+
283+
@Test
284+
public void accumulatingRead_mustCloseQueuedResponsesWhenFailed() throws Exception {
285+
SettableApiFuture<byte[]> complete = SettableApiFuture.create();
286+
try (AccumulatingRead<byte[]> read1 =
287+
BlobDescriptorStreamRead.createByteArrayAccumulatingRead(
288+
1, RangeSpec.all(), RetryContext.neverRetry(), complete)) {
289+
state.putOutstandingRead(1, read1);
290+
BlobDescriptorStream stream =
291+
BlobDescriptorStream.create(exec, callable, state, RetryContext.neverRetry());
292+
293+
ByteString bytes1 = ByteString.copyFrom(DataGenerator.base64Characters().genBytes(9));
294+
ByteString bytes2 = ByteString.copyFrom(DataGenerator.base64Characters().genBytes(9));
295+
296+
AtomicBoolean bytes1Close = new AtomicBoolean(false);
297+
AtomicBoolean bytes2Close = new AtomicBoolean(false);
298+
299+
try (ResponseContentLifecycleHandle<ByteString> handle =
300+
ResponseContentLifecycleHandle.create(
301+
bytes1,
302+
ByteString::asReadOnlyByteBufferList,
303+
() -> bytes1Close.compareAndSet(false, true))) {
304+
read1.accept(handle.borrow(Function.identity()));
305+
}
306+
try (ResponseContentLifecycleHandle<ByteString> handle =
307+
ResponseContentLifecycleHandle.create(
308+
bytes2,
309+
ByteString::asReadOnlyByteBufferList,
310+
() -> bytes2Close.compareAndSet(false, true))) {
311+
read1.accept(handle.borrow(Function.identity()));
312+
}
313+
314+
stream.close();
315+
316+
StorageException se =
317+
assertThrows(
318+
StorageException.class, () -> TestUtils.await(complete, 2, TimeUnit.SECONDS));
319+
assertAll(
320+
() -> assertThat(bytes1Close.get()).isTrue(),
321+
() -> assertThat(bytes2Close.get()).isTrue(),
322+
() -> assertThat(read1.acceptingBytes()).isFalse(),
323+
() -> assertThat(se).hasMessageThat().contains("Parent stream shutdown"),
324+
() -> assertThat(se).hasCauseThat().isInstanceOf(AsynchronousCloseException.class));
325+
}
326+
}
327+
232328
private static class TestBlobDescriptorStreamRead extends BlobDescriptorStreamRead {
233329

234330
private static final AtomicLong readIdSeq = new AtomicLong(1);

0 commit comments

Comments
 (0)