Skip to content

Commit 264683e

Browse files
committed
chore: update OutstandingReadToArray to track read progress
1 parent c653bb6 commit 264683e

File tree

5 files changed

+105
-29
lines changed

5 files changed

+105
-29
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 15 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -61,14 +61,7 @@ public ApiFuture<byte[]> readRangeAsBytes(ByteRangeSpec range) {
6161
OutstandingReadToArray value =
6262
new OutstandingReadToArray(readId, range.beginOffset(), range.length(), future);
6363
BidiReadObjectRequest request =
64-
BidiReadObjectRequest.newBuilder()
65-
.addReadRanges(
66-
ReadRange.newBuilder()
67-
.setReadId(readId)
68-
.setReadOffset(range.beginOffset())
69-
.setReadLength(range.length())
70-
.build())
71-
.build();
64+
BidiReadObjectRequest.newBuilder().addReadRanges(value.makeReadRange()).build();
7265
state.outstandingReads.put(readId, value);
7366
stream.requestStream.send(request);
7467
return future;
@@ -173,10 +166,11 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
173166
ChildRef childRef = handle.borrow();
174167
read.accept(childRef, content);
175168
if (d.getRangeEnd()) {
176-
state.outstandingReads.remove(id);
177169
// invoke eof on exec, the resolving future could have a downstream callback
178170
// that we don't want to block this grpc thread
179171
exec.execute(read::eof);
172+
// don't remove the outstanding read until the future has been resolved
173+
state.outstandingReads.remove(id);
180174
}
181175
}
182176
} catch (IOException e) {
@@ -199,30 +193,38 @@ protected void onCompleteImpl() {
199193
@VisibleForTesting
200194
static final class OutstandingReadToArray {
201195
private final long readId;
202-
private final long readOffset;
203-
private final long readLimit;
196+
private final ReadCursor readCursor;
204197
private final ByteArrayOutputStream bytes;
205198
private final SettableApiFuture<byte[]> complete;
206199

207200
@VisibleForTesting
208201
OutstandingReadToArray(
209202
long readId, long readOffset, long readLimit, SettableApiFuture<byte[]> complete) {
210203
this.readId = readId;
211-
this.readOffset = readOffset;
212-
this.readLimit = readLimit;
204+
this.readCursor = new ReadCursor(readOffset, readOffset + readLimit);
213205
this.bytes = new ByteArrayOutputStream();
214206
this.complete = complete;
215207
}
216208

217209
public void accept(ChildRef childRef, ByteString bytes) throws IOException {
218210
try (ChildRef autoclose = childRef) {
211+
int size = bytes.size();
219212
bytes.writeTo(this.bytes);
213+
readCursor.advance(size);
220214
}
221215
}
222216

223217
public void eof() {
224218
complete.set(bytes.toByteArray());
225219
}
220+
221+
public ReadRange makeReadRange() {
222+
return ReadRange.newBuilder()
223+
.setReadId(readId)
224+
.setReadOffset(readCursor.position())
225+
.setReadLength(readCursor.remaining())
226+
.build();
227+
}
226228
}
227229

228230
private static final class BlobDescriptorState {

google-cloud-storage/src/main/java/com/google/cloud/storage/GrpcStorageOptions.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -924,8 +924,10 @@ public void shutdownNow() {
924924
// GrpcStorageStub#close() is final and we can't override it
925925
// instead hook in here to close out the zero-copy marshaller
926926
//noinspection EmptyTryBlock
927-
try (ZeroCopyResponseMarshaller<ReadObjectResponse> ignore1 = getStub().readObjectResponseMarshaller;
928-
ZeroCopyResponseMarshaller<BidiReadObjectResponse> ignore2 = getStub().bidiReadObjectResponseMarshaller) {
927+
try (ZeroCopyResponseMarshaller<ReadObjectResponse> ignore1 =
928+
getStub().readObjectResponseMarshaller;
929+
ZeroCopyResponseMarshaller<BidiReadObjectResponse> ignore2 =
930+
getStub().bidiReadObjectResponseMarshaller) {
929931
// use try-with to do the close dance for us
930932
}
931933
} catch (IOException e) {

google-cloud-storage/src/main/java/com/google/cloud/storage/ReadCursor.java

Lines changed: 28 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -20,36 +20,52 @@
2020
import java.util.Locale;
2121

2222
/**
23-
* Shrink wraps a beginning, offset and limit for tracking state of an individual invocation of
23+
* Shrink wraps a beginning, offset and ending for tracking state of an individual invocation of
2424
* {@link #read}
2525
*/
2626
final class ReadCursor {
27-
private final long beginning;
28-
private long offset;
29-
private final long limit;
27+
private final long begin;
28+
private long position;
29+
private final long end;
3030

31-
ReadCursor(long beginning, long limit) {
32-
this.limit = limit;
33-
this.beginning = beginning;
34-
this.offset = beginning;
31+
ReadCursor(long begin, long end) {
32+
this.end = end;
33+
this.begin = begin;
34+
this.position = begin;
3535
}
3636

3737
public boolean hasRemaining() {
38-
return limit - offset > 0;
38+
return remaining() > 0;
39+
}
40+
41+
public long remaining() {
42+
return end - position;
3943
}
4044

4145
public void advance(long incr) {
4246
checkArgument(incr >= 0);
43-
offset += incr;
47+
position += incr;
4448
}
4549

4650
public long read() {
47-
return offset - beginning;
51+
return position - begin;
52+
}
53+
54+
public long begin() {
55+
return begin;
56+
}
57+
58+
public long position() {
59+
return position;
60+
}
61+
62+
public long end() {
63+
return end;
4864
}
4965

5066
@Override
5167
public String toString() {
5268
return String.format(
53-
Locale.US, "ReadCursor{begin=%d, offset=%d, limit=%d}", beginning, offset, limit);
69+
Locale.US, "ReadCursor{begin=%d, position=%d, end=%d}", begin, position, end);
5470
}
5571
}

google-cloud-storage/src/test/java/com/google/cloud/storage/BlobDescriptorTest.java

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,14 +23,17 @@
2323
import com.google.api.core.SettableApiFuture;
2424
import com.google.cloud.storage.BlobDescriptorImpl.OutstandingReadToArray;
2525
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
26+
import com.google.common.collect.ImmutableList;
2627
import com.google.protobuf.ByteString;
2728
import com.google.protobuf.UnsafeByteOperations;
29+
import com.google.storage.v2.ReadRange;
2830
import java.io.Closeable;
2931
import java.io.IOException;
3032
import java.util.concurrent.ExecutionException;
3133
import java.util.concurrent.TimeUnit;
3234
import java.util.concurrent.TimeoutException;
3335
import java.util.concurrent.atomic.AtomicBoolean;
36+
import java.util.function.Function;
3437
import org.junit.Test;
3538

3639
public final class BlobDescriptorTest {
@@ -83,6 +86,57 @@ public void outstandingReadToArray_childRef_close_ioException_propagated() throw
8386
assertThat(ioException).hasCauseThat().isInstanceOf(Kaboom.class);
8487
}
8588

89+
@Test
90+
public void outstandingReadToArray_producesAnAccurateReadRange()
91+
throws IOException, ExecutionException, InterruptedException, TimeoutException {
92+
SettableApiFuture<byte[]> complete = SettableApiFuture.create();
93+
int readId = 1;
94+
OutstandingReadToArray read = new OutstandingReadToArray(readId, 0, 137, complete);
95+
96+
ReadRange readRange1 = read.makeReadRange();
97+
ReadRange expectedReadRange1 =
98+
ReadRange.newBuilder().setReadId(readId).setReadOffset(0).setReadLength(137).build();
99+
assertThat(readRange1).isEqualTo(expectedReadRange1);
100+
101+
try (ResponseContentLifecycleHandle handle = noopContentHandle()) {
102+
ByteString bytes = ByteString.copyFrom(DataGenerator.base64Characters().genBytes(64));
103+
read.accept(handle.borrow(), bytes);
104+
}
105+
106+
ReadRange readRange2 = read.makeReadRange();
107+
ReadRange expectedReadRange2 =
108+
ReadRange.newBuilder().setReadId(readId).setReadOffset(64).setReadLength(73).build();
109+
assertThat(readRange2).isEqualTo(expectedReadRange2);
110+
111+
try (ResponseContentLifecycleHandle handle = noopContentHandle()) {
112+
ByteString bytes = ByteString.copyFrom(DataGenerator.base64Characters().genBytes(64));
113+
read.accept(handle.borrow(), bytes);
114+
}
115+
116+
ReadRange readRange3 = read.makeReadRange();
117+
ReadRange expectedReadRange3 =
118+
ReadRange.newBuilder().setReadId(readId).setReadOffset(128).setReadLength(9).build();
119+
assertThat(readRange3).isEqualTo(expectedReadRange3);
120+
121+
try (ResponseContentLifecycleHandle handle = noopContentHandle()) {
122+
ByteString bytes = ByteString.copyFrom(DataGenerator.base64Characters().genBytes(9));
123+
read.accept(handle.borrow(), bytes);
124+
read.eof();
125+
}
126+
127+
ReadRange readRange4 = read.makeReadRange();
128+
ReadRange expectedReadRange4 =
129+
ReadRange.newBuilder().setReadId(readId).setReadOffset(137).setReadLength(0).build();
130+
assertThat(readRange4).isEqualTo(expectedReadRange4);
131+
132+
byte[] actualBytes = complete.get(1, TimeUnit.SECONDS);
133+
assertThat(xxd(actualBytes)).isEqualTo(xxd(DataGenerator.base64Characters().genBytes(137)));
134+
}
135+
136+
private static ResponseContentLifecycleHandle noopContentHandle() {
137+
return ResponseContentLifecycleHandle.create(ImmutableList.of(), Function.identity(), () -> {});
138+
}
139+
86140
private static final class Kaboom extends RuntimeException {
87141
private Kaboom() {
88142
super("Kaboom!!!");

google-cloud-storage/src/test/java/com/google/cloud/storage/TransportCompatibilityTest.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -40,10 +40,12 @@ public void verifyUnsupportedMethodsGenerateMeaningfulException() {
4040
@SuppressWarnings("resource")
4141
Storage s =
4242
new GrpcStorageImpl(
43-
options, null,
43+
options,
44+
null,
4445
ResponseContentLifecycleManager.noop(),
4546
ResponseContentLifecycleManager.noopBidiReadObjectResponse(),
46-
null, Opts.empty());
47+
null,
48+
Opts.empty());
4749
ImmutableList<String> messages =
4850
Stream.<Supplier<?>>of(
4951
s::batch,

0 commit comments

Comments
 (0)