Skip to content

Commit a0535c5

Browse files
committed
chore: generify zero-copy marshaller
Zero-copy marshaller does not need to be ReadObjectResponse specific. This changes makes it generic over any subclass of `com.google.protobuf.Message`. ResponseContentLifecycleHandle has been updated to use a factory with a generic bounds rather than the constructor
1 parent be3be3f commit a0535c5

File tree

5 files changed

+78
-42
lines changed

5 files changed

+78
-42
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java

Lines changed: 28 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -63,6 +63,7 @@
6363
import com.google.protobuf.ByteString;
6464
import com.google.protobuf.CodedInputStream;
6565
import com.google.protobuf.InvalidProtocolBufferException;
66+
import com.google.protobuf.Message;
6667
import com.google.protobuf.MessageLite;
6768
import com.google.protobuf.Parser;
6869
import com.google.protobuf.UnsafeByteOperations;
@@ -101,6 +102,7 @@
101102
import java.util.Map.Entry;
102103
import java.util.Objects;
103104
import java.util.Set;
105+
import java.util.function.Function;
104106
import org.checkerframework.checker.nullness.qual.NonNull;
105107

106108
/** @since 2.14.0 */
@@ -925,7 +927,7 @@ public InternalZeroCopyGrpcStorageStub getStub() {
925927

926928
private static final class InternalZeroCopyGrpcStorageStub extends GrpcStorageStub
927929
implements AutoCloseable {
928-
private final ReadObjectResponseZeroCopyMessageMarshaller getObjectMediaResponseMarshaller;
930+
private final ZeroCopyResponseMarshaller<ReadObjectResponse> getObjectMediaResponseMarshaller;
929931

930932
private final ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse>
931933
serverStreamingCallable;
@@ -938,7 +940,9 @@ private InternalZeroCopyGrpcStorageStub(
938940
super(settings, clientContext, callableFactory);
939941

940942
this.getObjectMediaResponseMarshaller =
941-
new ReadObjectResponseZeroCopyMessageMarshaller(ReadObjectResponse.getDefaultInstance());
943+
new ZeroCopyResponseMarshaller<>(
944+
ReadObjectResponse.getDefaultInstance(),
945+
StorageV2ProtoUtils.READ_OBJECT_RESPONSE_TO_BYTE_BUFFERS_FUNCTION);
942946

943947
MethodDescriptor<ReadObjectRequest, ReadObjectResponse> readObjectMethodDescriptor =
944948
MethodDescriptor.<ReadObjectRequest, ReadObjectResponse>newBuilder()
@@ -973,39 +977,41 @@ public ServerStreamingCallable<ReadObjectRequest, ReadObjectResponse> readObject
973977
}
974978

975979
@VisibleForTesting
976-
static class ReadObjectResponseZeroCopyMessageMarshaller
977-
implements MethodDescriptor.PrototypeMarshaller<ReadObjectResponse>,
978-
ResponseContentLifecycleManager,
980+
static class ZeroCopyResponseMarshaller<Response extends Message>
981+
implements MethodDescriptor.PrototypeMarshaller<Response>,
982+
ResponseContentLifecycleManager<Response>,
979983
Closeable {
980-
private final Map<ReadObjectResponse, InputStream> unclosedStreams;
981-
private final Parser<ReadObjectResponse> parser;
982-
private final MethodDescriptor.PrototypeMarshaller<ReadObjectResponse> baseMarshaller;
983-
984-
ReadObjectResponseZeroCopyMessageMarshaller(ReadObjectResponse defaultInstance) {
985-
parser = defaultInstance.getParserForType();
984+
private final Map<Response, InputStream> unclosedStreams;
985+
private final Parser<Response> parser;
986+
private final MethodDescriptor.PrototypeMarshaller<Response> baseMarshaller;
987+
private final Function<Response, List<ByteBuffer>> toByteBuffersFunction;
988+
989+
ZeroCopyResponseMarshaller(
990+
Response defaultInstance, Function<Response, List<ByteBuffer>> toByteBuffersFunction) {
991+
parser = (Parser<Response>) defaultInstance.getParserForType();
986992
baseMarshaller =
987-
(MethodDescriptor.PrototypeMarshaller<ReadObjectResponse>)
988-
ProtoUtils.marshaller(defaultInstance);
993+
(MethodDescriptor.PrototypeMarshaller<Response>) ProtoUtils.marshaller(defaultInstance);
994+
this.toByteBuffersFunction = toByteBuffersFunction;
989995
unclosedStreams = Collections.synchronizedMap(new IdentityHashMap<>());
990996
}
991997

992998
@Override
993-
public Class<ReadObjectResponse> getMessageClass() {
999+
public Class<Response> getMessageClass() {
9941000
return baseMarshaller.getMessageClass();
9951001
}
9961002

9971003
@Override
998-
public ReadObjectResponse getMessagePrototype() {
1004+
public Response getMessagePrototype() {
9991005
return baseMarshaller.getMessagePrototype();
10001006
}
10011007

10021008
@Override
1003-
public InputStream stream(ReadObjectResponse value) {
1009+
public InputStream stream(Response value) {
10041010
return baseMarshaller.stream(value);
10051011
}
10061012

10071013
@Override
1008-
public ReadObjectResponse parse(InputStream stream) {
1014+
public Response parse(InputStream stream) {
10091015
CodedInputStream cis = null;
10101016
try {
10111017
if (stream instanceof KnownLength
@@ -1036,7 +1042,7 @@ public ReadObjectResponse parse(InputStream stream) {
10361042
}
10371043
if (cis != null) {
10381044
// fast path (no memory copy)
1039-
ReadObjectResponse message;
1045+
Response message;
10401046
try {
10411047
message = parseFrom(cis);
10421048
} catch (InvalidProtocolBufferException ipbe) {
@@ -1053,9 +1059,8 @@ public ReadObjectResponse parse(InputStream stream) {
10531059
}
10541060
}
10551061

1056-
private ReadObjectResponse parseFrom(CodedInputStream stream)
1057-
throws InvalidProtocolBufferException {
1058-
ReadObjectResponse message = parser.parseFrom(stream);
1062+
private Response parseFrom(CodedInputStream stream) throws InvalidProtocolBufferException {
1063+
Response message = parser.parseFrom(stream);
10591064
try {
10601065
stream.checkLastTagWas(0);
10611066
return message;
@@ -1066,9 +1071,9 @@ private ReadObjectResponse parseFrom(CodedInputStream stream)
10661071
}
10671072

10681073
@Override
1069-
public ResponseContentLifecycleHandle get(ReadObjectResponse response) {
1074+
public ResponseContentLifecycleHandle get(Response response) {
10701075
InputStream stream = unclosedStreams.remove(response);
1071-
return new ResponseContentLifecycleHandle(response, stream);
1076+
return ResponseContentLifecycleHandle.create(response, toByteBuffersFunction, stream);
10721077
}
10731078

10741079
@Override

google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleHandle.java

Lines changed: 10 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -15,22 +15,29 @@
1515
*/
1616
package com.google.cloud.storage;
1717

18-
import com.google.storage.v2.ReadObjectResponse;
1918
import java.io.Closeable;
2019
import java.io.IOException;
2120
import java.nio.ByteBuffer;
2221
import java.util.List;
22+
import java.util.function.Function;
2323
import org.checkerframework.checker.nullness.qual.Nullable;
2424

2525
final class ResponseContentLifecycleHandle implements Closeable {
2626
@Nullable private final Closeable dispose;
2727

2828
private final List<ByteBuffer> buffers;
2929

30-
ResponseContentLifecycleHandle(ReadObjectResponse resp, @Nullable Closeable dispose) {
30+
private ResponseContentLifecycleHandle(List<ByteBuffer> buffers, @Nullable Closeable dispose) {
3131
this.dispose = dispose;
32+
this.buffers = buffers;
33+
}
3234

33-
this.buffers = resp.getChecksummedData().getContent().asReadOnlyByteBufferList();
35+
static <Response> ResponseContentLifecycleHandle create(
36+
Response response,
37+
Function<Response, List<ByteBuffer>> toBuffersFunction,
38+
@Nullable Closeable dispose) {
39+
List<ByteBuffer> buffers = toBuffersFunction.apply(response);
40+
return new ResponseContentLifecycleHandle(buffers, dispose);
3441
}
3542

3643
void copy(ReadCursor c, ByteBuffer[] dsts, int offset, int length) {

google-cloud-storage/src/main/java/com/google/cloud/storage/ResponseContentLifecycleManager.java

Lines changed: 5 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -19,16 +19,17 @@
1919
import java.io.Closeable;
2020
import java.io.IOException;
2121

22-
interface ResponseContentLifecycleManager extends Closeable {
23-
ResponseContentLifecycleHandle get(ReadObjectResponse response);
22+
interface ResponseContentLifecycleManager<Response> extends Closeable {
23+
ResponseContentLifecycleHandle get(Response response);
2424

2525
@Override
2626
default void close() throws IOException {}
2727

28-
static ResponseContentLifecycleManager noop() {
28+
static ResponseContentLifecycleManager<ReadObjectResponse> noop() {
2929
return response ->
30-
new ResponseContentLifecycleHandle(
30+
ResponseContentLifecycleHandle.create(
3131
response,
32+
StorageV2ProtoUtils.READ_OBJECT_RESPONSE_TO_BYTE_BUFFERS_FUNCTION,
3233
() -> {
3334
// no-op
3435
});

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageV2ProtoUtils.java

Lines changed: 17 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,18 +16,35 @@
1616

1717
package com.google.cloud.storage;
1818

19+
import com.google.common.collect.ImmutableList;
1920
import com.google.protobuf.InvalidProtocolBufferException;
2021
import com.google.protobuf.MessageOrBuilder;
2122
import com.google.protobuf.util.JsonFormat;
2223
import com.google.protobuf.util.JsonFormat.Printer;
2324
import com.google.storage.v2.BucketAccessControl;
25+
import com.google.storage.v2.ChecksummedData;
2426
import com.google.storage.v2.ObjectAccessControl;
2527
import com.google.storage.v2.ReadObjectRequest;
28+
import com.google.storage.v2.ReadObjectResponse;
29+
import java.nio.ByteBuffer;
30+
import java.util.List;
31+
import java.util.function.Function;
2632
import java.util.function.Predicate;
2733
import org.checkerframework.checker.nullness.qual.NonNull;
2834

2935
final class StorageV2ProtoUtils {
3036

37+
static final Function<ReadObjectResponse, List<ByteBuffer>>
38+
READ_OBJECT_RESPONSE_TO_BYTE_BUFFERS_FUNCTION =
39+
response -> {
40+
if (response.hasChecksummedData()) {
41+
ChecksummedData checksummedData = response.getChecksummedData();
42+
if (!checksummedData.getContent().isEmpty()) {
43+
return response.getChecksummedData().getContent().asReadOnlyByteBufferList();
44+
}
45+
}
46+
return ImmutableList.of();
47+
};
3148
private static final String VALIDATION_TEMPLATE =
3249
"offset >= 0 && limit >= 0 (%s >= 0 && %s >= 0)";
3350

google-cloud-storage/src/test/java/com/google/cloud/storage/ZeroCopyMarshallerTest.java

Lines changed: 18 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -22,7 +22,7 @@
2222
import static org.junit.Assert.assertThrows;
2323
import static org.junit.Assert.assertTrue;
2424

25-
import com.google.cloud.storage.GrpcStorageOptions.ReadObjectResponseZeroCopyMessageMarshaller;
25+
import com.google.cloud.storage.GrpcStorageOptions.ZeroCopyResponseMarshaller;
2626
import com.google.common.collect.ImmutableList;
2727
import com.google.common.hash.Hashing;
2828
import com.google.protobuf.ByteString;
@@ -61,8 +61,10 @@ public class ZeroCopyMarshallerTest {
6161
.setChecksummedData(getChecksummedData(data, Hasher.enabled()))
6262
.build();
6363

64-
private ReadObjectResponseZeroCopyMessageMarshaller createMarshaller() {
65-
return new ReadObjectResponseZeroCopyMessageMarshaller(ReadObjectResponse.getDefaultInstance());
64+
private ZeroCopyResponseMarshaller<ReadObjectResponse> createMarshaller() {
65+
return new ZeroCopyResponseMarshaller<>(
66+
ReadObjectResponse.getDefaultInstance(),
67+
StorageV2ProtoUtils.READ_OBJECT_RESPONSE_TO_BYTE_BUFFERS_FUNCTION);
6668
}
6769

6870
private byte[] dropLastOneByte(byte[] bytes) {
@@ -78,7 +80,7 @@ private InputStream createInputStream(byte[] bytes, boolean isZeroCopyable) {
7880
@Test
7981
public void testParseOnFastPath() throws IOException {
8082
InputStream stream = createInputStream(response.toByteArray(), true);
81-
ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller();
83+
ZeroCopyResponseMarshaller<ReadObjectResponse> marshaller = createMarshaller();
8284
ReadObjectResponse response = marshaller.parse(stream);
8385
assertEquals(response, this.response);
8486
ResponseContentLifecycleHandle stream2 = marshaller.get(response);
@@ -92,7 +94,7 @@ public void testParseOnFastPath() throws IOException {
9294
@Test
9395
public void testParseOnSlowPath() throws IOException {
9496
InputStream stream = createInputStream(response.toByteArray(), false);
95-
ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller();
97+
ZeroCopyResponseMarshaller<ReadObjectResponse> marshaller = createMarshaller();
9698
ReadObjectResponse response = marshaller.parse(stream);
9799
assertEquals(response, this.response);
98100
ResponseContentLifecycleHandle stream2 = marshaller.get(response);
@@ -103,7 +105,7 @@ public void testParseOnSlowPath() throws IOException {
103105
@Test
104106
public void testParseBrokenMessageOnFastPath() {
105107
InputStream stream = createInputStream(dropLastOneByte(response.toByteArray()), true);
106-
ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller();
108+
ZeroCopyResponseMarshaller<ReadObjectResponse> marshaller = createMarshaller();
107109
assertThrows(
108110
StatusRuntimeException.class,
109111
() -> {
@@ -114,7 +116,7 @@ public void testParseBrokenMessageOnFastPath() {
114116
@Test
115117
public void testParseBrokenMessageOnSlowPath() {
116118
InputStream stream = createInputStream(dropLastOneByte(response.toByteArray()), false);
117-
ReadObjectResponseZeroCopyMessageMarshaller marshaller = createMarshaller();
119+
ZeroCopyResponseMarshaller<ReadObjectResponse> marshaller = createMarshaller();
118120
assertThrows(
119121
StatusRuntimeException.class,
120122
() -> {
@@ -128,12 +130,17 @@ public void testResponseContentLifecycleHandle() throws IOException {
128130
Closeable verifyClosed = () -> wasClosedCalled.set(true);
129131

130132
ResponseContentLifecycleHandle handle =
131-
new ResponseContentLifecycleHandle(response, verifyClosed);
133+
ResponseContentLifecycleHandle.create(
134+
response,
135+
StorageV2ProtoUtils.READ_OBJECT_RESPONSE_TO_BYTE_BUFFERS_FUNCTION,
136+
verifyClosed);
132137
handle.close();
133138

134139
assertTrue(wasClosedCalled.get());
135140

136-
ResponseContentLifecycleHandle nullHandle = new ResponseContentLifecycleHandle(response, null);
141+
ResponseContentLifecycleHandle nullHandle =
142+
ResponseContentLifecycleHandle.create(
143+
response, StorageV2ProtoUtils.READ_OBJECT_RESPONSE_TO_BYTE_BUFFERS_FUNCTION, null);
137144
nullHandle.close();
138145
// No NullPointerException means test passes
139146
}
@@ -147,8 +154,7 @@ public void testMarshallerClose_clean() throws IOException {
147154
CloseAuditingInputStream stream3 =
148155
CloseAuditingInputStream.of(createInputStream(response.toByteArray(), true));
149156

150-
ReadObjectResponseZeroCopyMessageMarshaller.closeAllStreams(
151-
ImmutableList.of(stream1, stream2, stream3));
157+
ZeroCopyResponseMarshaller.closeAllStreams(ImmutableList.of(stream1, stream2, stream3));
152158

153159
assertThat(stream1.closed).isTrue();
154160
assertThat(stream2.closed).isTrue();
@@ -184,7 +190,7 @@ void onClose() throws IOException {
184190
assertThrows(
185191
IOException.class,
186192
() ->
187-
ReadObjectResponseZeroCopyMessageMarshaller.closeAllStreams(
193+
ZeroCopyResponseMarshaller.closeAllStreams(
188194
ImmutableList.of(stream1, stream2, stream3)));
189195

190196
assertThat(stream1.closed).isTrue();

0 commit comments

Comments
 (0)