Skip to content

Commit 69d01d4

Browse files
committed
chore: add BlobDescriptor#readRangeAsChannel
1 parent fa0df98 commit 69d01d4

File tree

7 files changed

+665
-7
lines changed

7 files changed

+665
-7
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptor.java

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import com.google.protobuf.ByteString;
2121
import java.io.Closeable;
2222
import java.io.IOException;
23+
import java.nio.channels.ScatteringByteChannel;
2324

2425
/** Blob Descriptor is to blob, what File Descriptor is to a file */
2526
public interface BlobDescriptor extends AutoCloseable, Closeable {
@@ -28,6 +29,15 @@ public interface BlobDescriptor extends AutoCloseable, Closeable {
2829

2930
ApiFuture<byte[]> readRangeAsBytes(RangeSpec range);
3031

32+
/**
33+
* Read the provided range as a non-blocking Channel.
34+
*
35+
* <p>The returned channel will be non-blocking for all read calls. If bytes have not yet
36+
* asynchronously been delivered from gcs the method will return rather than waiting for the bytes
37+
* to arrive.
38+
*/
39+
ScatteringByteChannel readRangeAsChannel(RangeSpec range);
40+
3141
@Override
3242
void close() throws IOException;
3343

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -24,13 +24,15 @@
2424
import com.google.api.gax.grpc.GrpcCallContext;
2525
import com.google.cloud.storage.BlobDescriptor.ZeroCopySupport.DisposableByteString;
2626
import com.google.cloud.storage.BlobDescriptorStreamRead.AccumulatingRead;
27+
import com.google.cloud.storage.BlobDescriptorStreamRead.StreamingRead;
2728
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
2829
import com.google.cloud.storage.RetryContext.RetryContextProvider;
2930
import com.google.common.annotations.VisibleForTesting;
3031
import com.google.protobuf.ByteString;
3132
import com.google.storage.v2.BidiReadObjectRequest;
3233
import com.google.storage.v2.BidiReadObjectResponse;
3334
import java.io.IOException;
35+
import java.nio.channels.ScatteringByteChannel;
3436
import java.util.concurrent.ScheduledExecutorService;
3537

3638
final class BlobDescriptorImpl implements BlobDescriptor {
@@ -65,6 +67,20 @@ public ApiFuture<byte[]> readRangeAsBytes(RangeSpec range) {
6567
return future;
6668
}
6769

70+
@Override
71+
public ScatteringByteChannel readRangeAsChannel(RangeSpec range) {
72+
checkState(stream.isOpen(), "stream already closed");
73+
long readId = state.newReadId();
74+
StreamingRead read =
75+
BlobDescriptorStreamRead.streamingRead(readId, range, retryContextProvider.create());
76+
BidiReadObjectRequest request =
77+
BidiReadObjectRequest.newBuilder().addReadRanges(read.makeReadRange()).build();
78+
// todo: fork the stream and state
79+
state.putOutstandingRead(readId, read);
80+
stream.send(request);
81+
return read;
82+
}
83+
6884
public ApiFuture<DisposableByteString> readRangeAsByteString(RangeSpec range) {
6985
checkState(stream.isOpen(), "stream already closed");
7086
long readId = state.newReadId();

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStreamRead.java

Lines changed: 228 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,28 @@
1818

1919
import com.google.api.core.ApiFuture;
2020
import com.google.api.core.SettableApiFuture;
21+
import com.google.cloud.BaseServiceException;
2122
import com.google.cloud.storage.BlobDescriptor.ZeroCopySupport.DisposableByteString;
2223
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
2324
import com.google.cloud.storage.RetryContext.OnFailure;
2425
import com.google.cloud.storage.RetryContext.OnSuccess;
26+
import com.google.cloud.storage.UnbufferedReadableByteChannelSession.UnbufferedReadableByteChannel;
27+
import com.google.common.base.Preconditions;
2528
import com.google.protobuf.ByteString;
2629
import com.google.storage.v2.ReadRange;
2730
import java.io.Closeable;
2831
import java.io.IOException;
32+
import java.io.InterruptedIOException;
33+
import java.nio.ByteBuffer;
34+
import java.nio.channels.ClosedChannelException;
35+
import java.nio.channels.ReadableByteChannel;
2936
import java.util.ArrayList;
3037
import java.util.Collections;
3138
import java.util.List;
39+
import java.util.concurrent.ArrayBlockingQueue;
40+
import java.util.concurrent.BlockingQueue;
3241
import java.util.concurrent.atomic.AtomicLong;
42+
import org.checkerframework.checker.nullness.qual.Nullable;
3343

3444
abstract class BlobDescriptorStreamRead implements AutoCloseable, Closeable {
3545

@@ -116,6 +126,10 @@ static ZeroCopyByteStringAccumulatingRead createZeroCopyByteStringAccumulatingRe
116126
return new ZeroCopyByteStringAccumulatingRead(readId, rangeSpec, retryContext, complete);
117127
}
118128

129+
static StreamingRead streamingRead(long readId, RangeSpec rangeSpec, RetryContext retryContext) {
130+
return new StreamingRead(readId, rangeSpec, retryContext, false);
131+
}
132+
119133
/** Base class of a read that will accumulate before completing by resolving a future */
120134
abstract static class AccumulatingRead<Result> extends BlobDescriptorStreamRead {
121135
protected final List<ChildRef> childRefs;
@@ -182,20 +196,227 @@ public void close() throws IOException {
182196

183197
/**
184198
* Base class of a read that will be processed in a streaming manner (e.g. {@link
185-
* java.nio.channels.ReadableByteChannel})
199+
* ReadableByteChannel})
186200
*/
187-
abstract static class StreamingRead extends BlobDescriptorStreamRead {
188-
private StreamingRead(long readId, RangeSpec range, RetryContext retryContext) {
189-
super(readId, range, retryContext);
201+
static class StreamingRead extends BlobDescriptorStreamRead
202+
implements UnbufferedReadableByteChannel {
203+
private final SettableApiFuture<Void> failFuture;
204+
private final BlockingQueue<Closeable> queue;
205+
206+
private boolean complete;
207+
@Nullable private ChildRefHelper leftovers;
208+
209+
private StreamingRead(
210+
long readId, RangeSpec rangeSpec, RetryContext retryContext, boolean closed) {
211+
this(
212+
readId,
213+
rangeSpec,
214+
new AtomicLong(rangeSpec.begin()),
215+
retryContext,
216+
closed,
217+
SettableApiFuture.create(),
218+
new ArrayBlockingQueue<>(2),
219+
false,
220+
null);
190221
}
191222

192223
private StreamingRead(
193-
long readId,
224+
long newReadId,
194225
RangeSpec rangeSpec,
195226
AtomicLong readOffset,
196227
RetryContext retryContext,
197-
boolean closed) {
198-
super(readId, rangeSpec, readOffset, retryContext, closed);
228+
boolean closed,
229+
SettableApiFuture<Void> failFuture,
230+
BlockingQueue<Closeable> queue,
231+
boolean complete,
232+
@Nullable ChildRefHelper leftovers) {
233+
super(newReadId, rangeSpec, readOffset, retryContext, closed);
234+
this.failFuture = failFuture;
235+
this.queue = queue;
236+
this.complete = complete;
237+
this.leftovers = leftovers;
238+
}
239+
240+
@Override
241+
boolean acceptingBytes() {
242+
return !closed && !tombstoned;
243+
}
244+
245+
@Override
246+
void accept(ChildRef childRef) throws IOException {
247+
retryContext.reset();
248+
int size = childRef.byteString().size();
249+
offer(childRef);
250+
readOffset.addAndGet(size);
251+
}
252+
253+
@Override
254+
void eof() throws IOException {
255+
retryContext.reset();
256+
offer(EofMarker.INSTANCE);
257+
}
258+
259+
@Override
260+
ApiFuture<?> fail(Throwable t) {
261+
try {
262+
offer(new SmuggledFailure(t));
263+
failFuture.set(null);
264+
} catch (InterruptedIOException e) {
265+
Thread.currentThread().interrupt();
266+
failFuture.setException(e);
267+
}
268+
return failFuture;
269+
}
270+
271+
@Override
272+
StreamingRead withNewReadId(long newReadId) {
273+
tombstoned = true;
274+
return new StreamingRead(
275+
newReadId,
276+
rangeSpec,
277+
readOffset,
278+
retryContext,
279+
closed,
280+
failFuture,
281+
queue,
282+
complete,
283+
leftovers);
284+
}
285+
286+
@Override
287+
public void close() throws IOException {
288+
if (!closed) {
289+
retryContext.reset();
290+
closed = true;
291+
if (leftovers != null) {
292+
leftovers.ref.close();
293+
}
294+
GrpcUtils.closeAll(queue);
295+
}
296+
}
297+
298+
@Override
299+
public boolean isOpen() {
300+
return !closed;
301+
}
302+
303+
@Override
304+
public long read(ByteBuffer[] dsts, int offset, int length) throws IOException {
305+
if (closed) {
306+
throw new ClosedChannelException();
307+
}
308+
if (complete) {
309+
close();
310+
return -1;
311+
}
312+
313+
long read = 0;
314+
long dstsRemaining = Buffers.totalRemaining(dsts, offset, length);
315+
if (leftovers != null) {
316+
read += leftovers.copy(dsts, offset, length);
317+
if (!leftovers.hasRemaining()) {
318+
leftovers.ref.close();
319+
leftovers = null;
320+
}
321+
}
322+
323+
java.lang.Object poll;
324+
while (read < dstsRemaining && (poll = queue.poll()) != null) {
325+
if (poll instanceof ChildRef) {
326+
ChildRefHelper ref = new ChildRefHelper((ChildRef) poll);
327+
read += ref.copy(dsts, offset, length);
328+
if (ref.hasRemaining()) {
329+
leftovers = ref;
330+
break;
331+
} else {
332+
ref.ref.close();
333+
}
334+
} else if (poll == EofMarker.INSTANCE) {
335+
complete = true;
336+
if (read == 0) {
337+
close();
338+
return -1;
339+
}
340+
break;
341+
} else if (poll instanceof SmuggledFailure) {
342+
SmuggledFailure throwable = (SmuggledFailure) poll;
343+
BaseServiceException coalesce = StorageException.coalesce(throwable.getSmuggled());
344+
throw new IOException(coalesce);
345+
} else {
346+
//noinspection DataFlowIssue
347+
Preconditions.checkState(
348+
false, "unhandled queue element type %s", poll.getClass().getName());
349+
}
350+
}
351+
352+
return read;
353+
}
354+
355+
private void offer(Closeable offer) throws InterruptedIOException {
356+
try {
357+
queue.put(offer);
358+
} catch (InterruptedException e) {
359+
Thread.currentThread().interrupt();
360+
throw new InterruptedIOException();
361+
}
362+
}
363+
364+
/**
365+
* The queue items are added to is a queue of {@link Closeable}. This class smuggles a Throwable
366+
* in a no-op Closable, such that the throwable can be in the queue.
367+
*
368+
* <p>Refer to {@link #fail(Throwable)} to see where this class is instantiated.
369+
*/
370+
static final class SmuggledFailure implements Closeable {
371+
private final Throwable smuggled;
372+
373+
private SmuggledFailure(Throwable smuggled) {
374+
this.smuggled = smuggled;
375+
}
376+
377+
Throwable getSmuggled() {
378+
return smuggled;
379+
}
380+
381+
@Override
382+
public void close() throws IOException {}
383+
}
384+
385+
static final class ChildRefHelper {
386+
private final ChildRef ref;
387+
388+
private final List<ByteBuffer> buffers;
389+
390+
private ChildRefHelper(ChildRef ref) {
391+
this.ref = ref;
392+
this.buffers = ref.byteString().asReadOnlyByteBufferList();
393+
}
394+
395+
long copy(ByteBuffer[] dsts, int offset, int length) {
396+
long copied = 0;
397+
for (ByteBuffer b : buffers) {
398+
long copiedBytes = Buffers.copy(b, dsts, offset, length);
399+
copied += copiedBytes;
400+
if (b.hasRemaining()) break;
401+
}
402+
return copied;
403+
}
404+
405+
boolean hasRemaining() {
406+
for (ByteBuffer b : buffers) {
407+
if (b.hasRemaining()) return true;
408+
}
409+
return false;
410+
}
411+
}
412+
413+
private static final class EofMarker implements Closeable {
414+
private static final EofMarker INSTANCE = new EofMarker();
415+
416+
private EofMarker() {}
417+
418+
@Override
419+
public void close() {}
199420
}
200421
}
201422

google-cloud-storage/src/main/java/com/google/cloud/storage/Buffers.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@
2020
import java.nio.Buffer;
2121
import java.nio.ByteBuffer;
2222
import java.nio.channels.ReadableByteChannel;
23+
import java.nio.channels.WritableByteChannel;
2324
import java.util.function.Consumer;
2425

2526
/**
@@ -168,4 +169,17 @@ static long totalRemaining(ByteBuffer[] buffers, int offset, int length) {
168169
}
169170
return totalRemaning;
170171
}
172+
173+
static long copyUsingBuffer(ByteBuffer buf, ReadableByteChannel r, WritableByteChannel w)
174+
throws IOException {
175+
long total = 0;
176+
while (r.read(buf) != -1) {
177+
buf.flip();
178+
while (buf.hasRemaining()) {
179+
total += w.write(buf);
180+
}
181+
buf.clear();
182+
}
183+
return total;
184+
}
171185
}

0 commit comments

Comments
 (0)