Skip to content

Commit 24be97d

Browse files
committed
chore: refactor ResponseContentLifecycleHandle to allow borrowing for multiple readers
1 parent a0535c5 commit 24be97d

File tree

3 files changed

+187
-6
lines changed

3 files changed

+187
-6
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -15,21 +15,28 @@
1515
*/
1616
package com.google.cloud.storage;
1717

18+
import com.google.common.base.Preconditions;
1819
import java.io.Closeable;
1920
import java.io.IOException;
2021
import java.nio.ByteBuffer;
2122
import java.util.List;
23+
import java.util.concurrent.atomic.AtomicBoolean;
24+
import java.util.concurrent.atomic.AtomicInteger;
2225
import java.util.function.Function;
2326
import org.checkerframework.checker.nullness.qual.Nullable;
2427

2528
final class ResponseContentLifecycleHandle implements Closeable {
2629
@Nullable private final Closeable dispose;
2730

2831
private final List<ByteBuffer> buffers;
32+
private final AtomicBoolean open;
33+
private final AtomicInteger refs;
2934

3035
private ResponseContentLifecycleHandle(List<ByteBuffer> buffers, @Nullable Closeable dispose) {
3136
this.dispose = dispose;
3237
this.buffers = buffers;
38+
this.open = new AtomicBoolean(true);
39+
this.refs = new AtomicInteger(1);
3340
}
3441

3542
static <Response> ResponseContentLifecycleHandle create(
@@ -40,6 +47,13 @@ static <Response> ResponseContentLifecycleHandle create(
4047
return new ResponseContentLifecycleHandle(buffers, dispose);
4148
}
4249

50+
ChildRef borrow() {
51+
Preconditions.checkState(open.get(), "only able to borrow when open");
52+
ChildRef childRef = new ChildRef();
53+
refs.incrementAndGet();
54+
return childRef;
55+
}
56+
4357
void copy(ReadCursor c, ByteBuffer[] dsts, int offset, int length) {
4458
for (ByteBuffer b : buffers) {
4559
long copiedBytes = Buffers.copy(b, dsts, offset, length);
@@ -57,8 +71,28 @@ boolean hasRemaining() {
5771

5872
@Override
5973
public void close() throws IOException {
74+
if (open.getAndSet(false)) {
75+
int newCount = refs.decrementAndGet();
76+
if (newCount == 0) {
77+
dispose();
78+
}
79+
}
80+
}
81+
82+
private void dispose() throws IOException {
6083
if (dispose != null) {
6184
dispose.close();
6285
}
6386
}
87+
88+
final class ChildRef implements Closeable {
89+
90+
@Override
91+
public void close() throws IOException {
92+
int newCount = refs.decrementAndGet();
93+
if (newCount == 0) {
94+
ResponseContentLifecycleHandle.this.dispose();
95+
}
96+
}
97+
}
6498
}

google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java

Lines changed: 145 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,13 +23,19 @@
2323
import static org.junit.Assert.assertTrue;
2424

2525
import com.google.cloud.storage.GrpcStorageOptions.ZeroCopyResponseMarshaller;
26+
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
27+
import com.google.cloud.storage.it.ChecksummedTestContent;
2628
import com.google.common.collect.ImmutableList;
2729
import com.google.common.hash.Hashing;
2830
import com.google.protobuf.ByteString;
31+
import com.google.storage.v2.ChecksummedData;
2932
import com.google.storage.v2.ContentRange;
3033
import com.google.storage.v2.Object;
3134
import com.google.storage.v2.ObjectChecksums;
3235
import com.google.storage.v2.ReadObjectResponse;
36+
import io.grpc.Detachable;
37+
import io.grpc.HasByteBuffer;
38+
import io.grpc.KnownLength;
3339
import io.grpc.StatusRuntimeException;
3440
import io.grpc.internal.ReadableBuffer;
3541
import io.grpc.internal.ReadableBuffers;
@@ -41,7 +47,9 @@
4147
import java.util.Arrays;
4248
import java.util.List;
4349
import java.util.concurrent.atomic.AtomicBoolean;
50+
import java.util.concurrent.atomic.AtomicInteger;
4451
import java.util.stream.Collectors;
52+
import javax.annotation.Nullable;
4553
import org.junit.Test;
4654

4755
public class ZeroCopyMarshallerTest {
@@ -71,10 +79,11 @@ private byte[] dropLastOneByte(byte[] bytes) {
7179
return Arrays.copyOfRange(bytes, 0, bytes.length - 1);
7280
}
7381

74-
private InputStream createInputStream(byte[] bytes, boolean isZeroCopyable) {
82+
private <IS extends InputStream & KnownLength & Detachable & HasByteBuffer> IS createInputStream(
83+
byte[] bytes, boolean isZeroCopyable) {
7584
ReadableBuffer buffer =
7685
isZeroCopyable ? ReadableBuffers.wrap(ByteBuffer.wrap(bytes)) : ReadableBuffers.wrap(bytes);
77-
return ReadableBuffers.openStream(buffer, true);
86+
return (IS) ReadableBuffers.openStream(buffer, true);
7887
}
7988

8089
@Test
@@ -205,16 +214,146 @@ void onClose() throws IOException {
205214
assertThat(messages).isEqualTo(ImmutableList.of("Kaboom stream2", "Kaboom stream3"));
206215
}
207216

208-
private static class CloseAuditingInputStream extends FilterInputStream {
217+
@Test
218+
public void refCounting_closingLastBorrowedChildRefShouldCloseHandleWhenHandlePreviouslyClosed()
219+
throws IOException {
220+
try (ZeroCopyResponseMarshaller<ChecksummedData> marshaller =
221+
new ZeroCopyResponseMarshaller<>(
222+
ChecksummedData.getDefaultInstance(),
223+
cd -> cd.getContent().asReadOnlyByteBufferList())) {
224+
225+
ChecksummedTestContent testContent =
226+
ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(17));
227+
228+
ChecksummedData orig = testContent.asChecksummedData();
229+
230+
// load our proto message into the marshaller
231+
byte[] serialized = orig.toByteArray();
232+
CloseAuditingInputStream<?> inputStream =
233+
CloseAuditingInputStream.of(createInputStream(serialized, true));
234+
235+
ChecksummedData parsed = marshaller.parse(inputStream);
236+
assertThat(inputStream.closed).isFalse();
237+
238+
// now get the lifecycle management handle for the parsed instance
239+
ResponseContentLifecycleHandle handle = marshaller.get(parsed);
240+
assertThat(inputStream.closed).isFalse();
241+
242+
ChildRef ref1 = handle.borrow();
243+
ChildRef ref2 = handle.borrow();
244+
ChildRef ref3 = handle.borrow();
245+
assertThat(inputStream.closed).isFalse();
246+
handle.close();
247+
assertThat(inputStream.closed).isFalse();
248+
ref1.close();
249+
assertThat(inputStream.closed).isFalse();
250+
ref2.close();
251+
assertThat(inputStream.closed).isFalse();
252+
ref3.close();
253+
assertThat(inputStream.closed).isTrue();
254+
}
255+
}
256+
257+
@Test
258+
public void refCounting_mustBeOpenToBorrow() throws IOException {
259+
try (ZeroCopyResponseMarshaller<ChecksummedData> marshaller =
260+
new ZeroCopyResponseMarshaller<>(
261+
ChecksummedData.getDefaultInstance(),
262+
cd -> cd.getContent().asReadOnlyByteBufferList())) {
263+
264+
ChecksummedTestContent testContent =
265+
ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(17));
266+
267+
ChecksummedData orig = testContent.asChecksummedData();
268+
269+
// load our proto message into the marshaller
270+
byte[] serialized = orig.toByteArray();
271+
CloseAuditingInputStream<?> inputStream =
272+
CloseAuditingInputStream.of(createInputStream(serialized, true));
273+
274+
ChecksummedData parsed = marshaller.parse(inputStream);
275+
assertThat(inputStream.closed).isFalse();
276+
277+
// now get the lifecycle management handle for the parsed instance
278+
ResponseContentLifecycleHandle handle = marshaller.get(parsed);
279+
handle.close();
280+
assertThat(inputStream.closed).isTrue();
281+
282+
assertThrows(IllegalStateException.class, handle::borrow);
283+
}
284+
}
285+
286+
@SuppressWarnings({"rawtypes", "unchecked"})
287+
@Test
288+
public void refCounting_handleCloseOnlyHappensIfOpen() throws IOException {
289+
try (ZeroCopyResponseMarshaller<ChecksummedData> marshaller =
290+
new ZeroCopyResponseMarshaller<>(
291+
ChecksummedData.getDefaultInstance(),
292+
cd -> cd.getContent().asReadOnlyByteBufferList())) {
293+
294+
AtomicInteger closeCount = new AtomicInteger(0);
295+
ChecksummedTestContent testContent =
296+
ChecksummedTestContent.of(DataGenerator.base64Characters().genBytes(17));
297+
298+
ChecksummedData orig = testContent.asChecksummedData();
299+
300+
// load our proto message into the marshaller
301+
byte[] serialized = orig.toByteArray();
302+
CloseAuditingInputStream inputStream =
303+
new CloseAuditingInputStream(createInputStream(serialized, true)) {
304+
@Override
305+
public void close() throws IOException {
306+
closeCount.getAndIncrement();
307+
super.close();
308+
}
309+
};
310+
311+
ChecksummedData parsed = marshaller.parse(inputStream);
312+
assertThat(inputStream.closed).isFalse();
313+
314+
// now get the lifecycle management handle for the parsed instance
315+
ResponseContentLifecycleHandle handle = marshaller.get(parsed);
316+
handle.close();
317+
assertThat(inputStream.closed).isTrue();
318+
handle.close();
319+
assertThat(closeCount.get()).isEqualTo(1);
320+
}
321+
}
322+
323+
// gRPC doesn't have a public InputStream subclass that implements all of these interfaces
324+
// use generics to constrain things using multiple inheritance notation. Then, our class
325+
// implements the same interfaces to allow use within zero-copy marshaller.
326+
private static class CloseAuditingInputStream<
327+
IS extends InputStream & KnownLength & Detachable & HasByteBuffer>
328+
extends FilterInputStream implements KnownLength, Detachable, HasByteBuffer {
209329

210330
private boolean closed = false;
331+
private final IS delegate;
211332

212-
private CloseAuditingInputStream(InputStream in) {
333+
private CloseAuditingInputStream(IS in) {
213334
super(in);
335+
this.delegate = in;
336+
}
337+
338+
@Override
339+
public InputStream detach() {
340+
return this;
341+
}
342+
343+
@Override
344+
public boolean byteBufferSupported() {
345+
return delegate.byteBufferSupported();
346+
}
347+
348+
@Nullable
349+
@Override
350+
public ByteBuffer getByteBuffer() {
351+
return delegate.getByteBuffer();
214352
}
215353

216-
public static CloseAuditingInputStream of(InputStream in) {
217-
return new CloseAuditingInputStream(in);
354+
public static <IS extends InputStream & KnownLength & Detachable & HasByteBuffer>
355+
CloseAuditingInputStream<IS> of(IS in) {
356+
return new CloseAuditingInputStream<>(in);
218357
}
219358

220359
@Override

google-cloud-storage/src/test/java/com/google/cloud/storage/it/ChecksummedTestContent.java

Lines changed: 8 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import com.google.common.primitives.Ints;
2323
import com.google.protobuf.ByteString;
2424
import com.google.protobuf.UnsafeByteOperations;
25+
import com.google.storage.v2.ChecksummedData;
2526
import java.io.ByteArrayInputStream;
2627
import java.nio.charset.StandardCharsets;
2728
import java.util.Arrays;
@@ -85,6 +86,13 @@ public ByteArrayInputStream bytesAsInputStream() {
8586
return new ByteArrayInputStream(bytes);
8687
}
8788

89+
public ChecksummedData asChecksummedData() {
90+
return ChecksummedData.newBuilder()
91+
.setContent(ByteString.copyFrom(bytes))
92+
.setCrc32C(crc32c)
93+
.build();
94+
}
95+
8896
@Override
8997
public String toString() {
9098
return MoreObjects.toStringHelper(this)

0 commit comments

Comments
 (0)