Skip to content

Commit c653bb6

Browse files
committed
chore: plumb zero-copy for BidiReadObject
1 parent 24be97d commit c653bb6

File tree

9 files changed

+258
-64
lines changed

9 files changed

+258
-64
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 53 additions & 42 deletions
Original file line numberDiff line numberDiff line change
@@ -24,6 +24,8 @@
2424
import com.google.api.gax.rpc.ClientStream;
2525
import com.google.api.gax.rpc.StateCheckingResponseObserver;
2626
import com.google.api.gax.rpc.StreamController;
27+
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
28+
import com.google.common.annotations.VisibleForTesting;
2729
import com.google.protobuf.ByteString;
2830
import com.google.storage.v2.BidiReadHandle;
2931
import com.google.storage.v2.BidiReadObjectRequest;
@@ -57,8 +59,7 @@ public ApiFuture<byte[]> readRangeAsBytes(ByteRangeSpec range) {
5759
long readId = state.readIdSeq.getAndIncrement();
5860
SettableApiFuture<byte[]> future = SettableApiFuture.create();
5961
OutstandingReadToArray value =
60-
new OutstandingReadToArray(
61-
readId, range.beginOffset(), range.length(), new ByteArrayOutputStream(), future);
62+
new OutstandingReadToArray(readId, range.beginOffset(), range.length(), future);
6263
BidiReadObjectRequest request =
6364
BidiReadObjectRequest.newBuilder()
6465
.addReadRanges(
@@ -82,11 +83,13 @@ static ApiFuture<BlobDescriptor> create(
8283
BidiReadObjectRequest openRequest,
8384
GrpcCallContext context,
8485
BidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
86+
ResponseContentLifecycleManager<BidiReadObjectResponse> bidiResponseContentLifecycleManager,
8587
Executor executor) {
8688
SettableApiFuture<Void> pendingOpen = SettableApiFuture.create();
8789
BlobDescriptorState state = new BlobDescriptorState(openRequest);
8890
BlobDescriptorResponseObserver responseObserver =
89-
new BlobDescriptorResponseObserver(pendingOpen, state, executor);
91+
new BlobDescriptorResponseObserver(
92+
pendingOpen, state, executor, bidiResponseContentLifecycleManager);
9093
ClientStream<BidiReadObjectRequest> requestStream =
9194
callable.splitCall(responseObserver, context);
9295
BlobDescriptorStreamPair stream = new BlobDescriptorStreamPair(requestStream, responseObserver);
@@ -119,14 +122,21 @@ private static final class BlobDescriptorResponseObserver
119122
private StreamController controller;
120123
private final BlobDescriptorState state;
121124
private final Executor exec;
125+
private final ResponseContentLifecycleManager<BidiReadObjectResponse>
126+
bidiResponseContentLifecycleManager;
122127

123128
private final SettableApiFuture<Void> openSignal;
124129

125130
public BlobDescriptorResponseObserver(
126-
SettableApiFuture<Void> openSignal, BlobDescriptorState state, Executor exec) {
131+
SettableApiFuture<Void> openSignal,
132+
BlobDescriptorState state,
133+
Executor exec,
134+
ResponseContentLifecycleManager<BidiReadObjectResponse>
135+
bidiResponseContentLifecycleManager) {
127136
this.openSignal = openSignal;
128137
this.state = state;
129138
this.exec = exec;
139+
this.bidiResponseContentLifecycleManager = bidiResponseContentLifecycleManager;
130140
}
131141

132142
@Override
@@ -139,32 +149,39 @@ protected void onStartImpl(StreamController controller) {
139149
@Override
140150
protected void onResponseImpl(BidiReadObjectResponse response) {
141151
controller.request(1);
142-
if (response.hasMetadata()) {
143-
state.metadata.set(response.getMetadata());
144-
openSignal.set(null);
145-
}
146-
if (response.hasReadHandle()) {
147-
state.ref.set(response.getReadHandle());
148-
openSignal.set(null);
149-
}
150-
List<ObjectRangeData> rangeData = response.getObjectDataRangesList();
151-
if (rangeData.isEmpty()) {
152-
return;
153-
}
154-
for (ObjectRangeData d : rangeData) {
155-
long id = d.getReadRange().getReadId();
156-
OutstandingReadToArray read = state.outstandingReads.get(id);
157-
if (read == null) {
158-
continue;
152+
try (ResponseContentLifecycleHandle handle =
153+
bidiResponseContentLifecycleManager.get(response)) {
154+
if (response.hasMetadata()) {
155+
state.metadata.set(response.getMetadata());
156+
openSignal.set(null);
157+
}
158+
if (response.hasReadHandle()) {
159+
state.ref.set(response.getReadHandle());
160+
openSignal.set(null);
159161
}
160-
ByteString content = d.getChecksummedData().getContent();
161-
read.accept(content);
162-
if (d.getRangeEnd()) {
163-
state.outstandingReads.remove(id);
164-
// invoke eof on exec, the resolving future could have a downstream callback
165-
// that we don't want to block this grpc thread
166-
exec.execute(read::eof);
162+
List<ObjectRangeData> rangeData = response.getObjectDataRangesList();
163+
if (rangeData.isEmpty()) {
164+
return;
167165
}
166+
for (ObjectRangeData d : rangeData) {
167+
long id = d.getReadRange().getReadId();
168+
OutstandingReadToArray read = state.outstandingReads.get(id);
169+
if (read == null) {
170+
continue;
171+
}
172+
ByteString content = d.getChecksummedData().getContent();
173+
ChildRef childRef = handle.borrow();
174+
read.accept(childRef, content);
175+
if (d.getRangeEnd()) {
176+
state.outstandingReads.remove(id);
177+
// invoke eof on exec, the resolving future could have a downstream callback
178+
// that we don't want to block this grpc thread
179+
exec.execute(read::eof);
180+
}
181+
}
182+
} catch (IOException e) {
183+
// TODO: sync this up with stream restarts when the time comes
184+
throw StorageException.coalesce(e);
168185
}
169186
}
170187

@@ -179,33 +196,27 @@ protected void onCompleteImpl() {
179196
}
180197
}
181198

182-
// todo: handle zero-copy lifecycle integration
183-
private static final class OutstandingReadToArray {
199+
@VisibleForTesting
200+
static final class OutstandingReadToArray {
184201
private final long readId;
185202
private final long readOffset;
186203
private final long readLimit;
187204
private final ByteArrayOutputStream bytes;
188205
private final SettableApiFuture<byte[]> complete;
189206

190-
private OutstandingReadToArray(
191-
long readId,
192-
long readOffset,
193-
long readLimit,
194-
ByteArrayOutputStream bytes,
195-
SettableApiFuture<byte[]> complete) {
207+
@VisibleForTesting
208+
OutstandingReadToArray(
209+
long readId, long readOffset, long readLimit, SettableApiFuture<byte[]> complete) {
196210
this.readId = readId;
197211
this.readOffset = readOffset;
198212
this.readLimit = readLimit;
199-
this.bytes = bytes;
213+
this.bytes = new ByteArrayOutputStream();
200214
this.complete = complete;
201215
}
202216

203-
public void accept(ByteString bytes) {
204-
try {
217+
public void accept(ChildRef childRef, ByteString bytes) throws IOException {
218+
try (ChildRef autoclose = childRef) {
205219
bytes.writeTo(this.bytes);
206-
} catch (IOException e) {
207-
// ByteArrayOutputStream doesn't throw IOException
208-
throw new RuntimeException(e);
209220
}
210221
}
211222

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageImpl.java

Lines changed: 8 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,6 +99,7 @@
9999
import com.google.storage.v2.Object;
100100
import com.google.storage.v2.ObjectAccessControl;
101101
import com.google.storage.v2.ReadObjectRequest;
102+
import com.google.storage.v2.ReadObjectResponse;
102103
import com.google.storage.v2.RestoreObjectRequest;
103104
import com.google.storage.v2.RewriteObjectRequest;
104105
import com.google.storage.v2.RewriteResponse;
@@ -167,7 +168,8 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
167168
.collect(ImmutableSet.toImmutableSet())));
168169

169170
final StorageClient storageClient;
170-
final ResponseContentLifecycleManager responseContentLifecycleManager;
171+
final ResponseContentLifecycleManager<ReadObjectResponse> responseContentLifecycleManager;
172+
final ResponseContentLifecycleManager<BidiReadObjectResponse> bidiResponseContentLifecycleManager;
171173
final WriterFactory writerFactory;
172174
final GrpcConversions codecs;
173175
final GrpcRetryAlgorithmManager retryAlgorithmManager;
@@ -181,12 +183,14 @@ final class GrpcStorageImpl extends BaseService<StorageOptions>
181183
GrpcStorageImpl(
182184
GrpcStorageOptions options,
183185
StorageClient storageClient,
184-
ResponseContentLifecycleManager responseContentLifecycleManager,
186+
ResponseContentLifecycleManager<ReadObjectResponse> responseContentLifecycleManager,
187+
ResponseContentLifecycleManager<BidiReadObjectResponse> bidiResponseContentLifecycleManager,
185188
WriterFactory writerFactory,
186189
Opts<UserProject> defaultOpts) {
187190
super(options);
188191
this.storageClient = storageClient;
189192
this.responseContentLifecycleManager = responseContentLifecycleManager;
193+
this.bidiResponseContentLifecycleManager = bidiResponseContentLifecycleManager;
190194
this.writerFactory = writerFactory;
191195
this.defaultOpts = defaultOpts;
192196
this.codecs = Conversions.grpc();
@@ -1477,7 +1481,8 @@ public ApiFuture<BlobDescriptor> getBlobDescriptor(BlobId id, BlobSourceOption..
14771481
GrpcCallContext context =
14781482
GrpcUtils.contextWithBucketName(object.getBucket(), GrpcCallContext.createDefault());
14791483

1480-
return BlobDescriptorImpl.create(req, context, callable, executor);
1484+
return BlobDescriptorImpl.create(
1485+
req, context, callable, bidiResponseContentLifecycleManager, executor);
14811486
}
14821487

14831488
@Override

0 commit comments

Comments
 (0)