Skip to content

Commit 4170803

Browse files
committed
chore: allow a fast opened ObjectReadSession to be used for additional reads
Currently, a FastOpenObjectReadSession will be closed when it's resource is closed. This change will allow for the underlying ObjectReadSession to be held open even when the initial projection is closed. This allows for the same stream to be used in the case when a "seek" might need to be performed.
1 parent f8e54b5 commit 4170803

File tree

3 files changed

+179
-19
lines changed

3 files changed

+179
-19
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionStream.java

Lines changed: 22 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -31,6 +31,7 @@
3131
import com.google.cloud.storage.ObjectReadSessionState.OpenArguments;
3232
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
3333
import com.google.cloud.storage.RetryContext.OnSuccess;
34+
import com.google.cloud.storage.StorageDataClient.Borrowable;
3435
import com.google.common.annotations.VisibleForTesting;
3536
import com.google.common.base.Preconditions;
3637
import com.google.protobuf.ByteString;
@@ -58,7 +59,7 @@
5859
import org.checkerframework.checker.nullness.qual.Nullable;
5960

6061
final class ObjectReadSessionStream
61-
implements ClientStream<BidiReadObjectRequest>, ApiFuture<Void>, IOAutoCloseable {
62+
implements ClientStream<BidiReadObjectRequest>, ApiFuture<Void>, IOAutoCloseable, Borrowable {
6263

6364
private final SettableApiFuture<Void> objectReadSessionResolveFuture;
6465

@@ -69,7 +70,7 @@ final class ObjectReadSessionStream
6970
private final RetryContext streamRetryContext;
7071
private final int maxRedirectsAllowed;
7172

72-
private volatile boolean open;
73+
private final AtomicInteger openLeases;
7374
private volatile MonitoringResponseObserver monitoringResponseObserver;
7475
private volatile ResponseObserver<BidiReadObjectResponse> responseObserver;
7576
private volatile ClientStream<BidiReadObjectRequest> requestStream;
@@ -88,7 +89,7 @@ private ObjectReadSessionStream(
8889
this.streamRetryContext = backoff;
8990
this.objectReadSessionResolveFuture = SettableApiFuture.create();
9091
this.maxRedirectsAllowed = maxRedirectsAllowed;
91-
this.open = true;
92+
this.openLeases = new AtomicInteger(1);
9293
this.redirectCounter = new AtomicInteger();
9394
}
9495

@@ -118,14 +119,17 @@ public void close() {
118119
}
119120

120121
public ApiFuture<Void> closeAsync() {
121-
if (!open) {
122+
if (!isOpen()) {
123+
return ApiFutures.immediateFuture(null);
124+
}
125+
int updatedLeaseCount = openLeases.decrementAndGet();
126+
if (updatedLeaseCount == 0) {
127+
AsynchronousCloseException cause = new AsynchronousCloseException();
128+
ApiFuture<?> f = failAll(() -> new StorageException(0, "Parent stream shutdown", cause));
129+
return ApiFutures.transformAsync(f, ignore -> ApiFutures.immediateFuture(null), executor);
130+
} else {
122131
return ApiFutures.immediateFuture(null);
123132
}
124-
open = false;
125-
cleanUp();
126-
AsynchronousCloseException cause = new AsynchronousCloseException();
127-
ApiFuture<?> f = failAll(() -> new StorageException(0, "Parent stream shutdown", cause));
128-
return ApiFutures.transformAsync(f, ignore -> ApiFutures.immediateFuture(null), executor);
129133
}
130134

131135
private void cleanUp() {
@@ -200,11 +204,16 @@ public boolean isDone() {
200204
}
201205

202206
boolean isOpen() {
203-
return open;
207+
return openLeases.get() > 0;
208+
}
209+
210+
public void borrow() {
211+
checkOpen();
212+
openLeases.incrementAndGet();
204213
}
205214

206215
private void checkOpen() {
207-
Preconditions.checkState(open, "Stream closed");
216+
Preconditions.checkState(isOpen(), "Stream closed");
208217
}
209218

210219
@VisibleForTesting
@@ -221,7 +230,7 @@ void restart() {
221230
}
222231

223232
private void failAll(Throwable terminalFailure) {
224-
open = false;
233+
openLeases.set(0);
225234
try {
226235
objectReadSessionResolveFuture.setException(terminalFailure);
227236
state.failAll(executor, () -> terminalFailure);
@@ -231,7 +240,7 @@ private void failAll(Throwable terminalFailure) {
231240
}
232241

233242
private ApiFuture<?> failAll(Supplier<Throwable> terminalFailure) {
234-
open = false;
243+
openLeases.set(0);
235244
try {
236245
objectReadSessionResolveFuture.setException(terminalFailure.get());
237246
return state.failAll(executor, terminalFailure);

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageDataClient.java

Lines changed: 22 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -95,7 +95,8 @@ <Projection> ApiFuture<FastOpenObjectReadSession<Projection>> fastOpenReadSessio
9595
new FastOpenObjectReadSession<>(
9696
new ObjectReadSessionImpl(
9797
executor, bidiReadObject, stream, state, retryContextProvider),
98-
read),
98+
read,
99+
stream),
99100
executor);
100101
OpenArguments openArguments = state.getOpenArguments();
101102
BidiReadObjectRequest req = openArguments.getReq();
@@ -120,17 +121,32 @@ static StorageDataClient create(
120121
return new StorageDataClient(executor, read, retryContextProvider, onClose);
121122
}
122123

124+
@FunctionalInterface
125+
interface Borrowable {
126+
void borrow();
127+
}
128+
123129
static final class FastOpenObjectReadSession<Projection> implements IOAutoCloseable {
124130
private final ObjectReadSession session;
125131
private final ObjectReadSessionStreamRead<Projection, ?> read;
132+
private final Borrowable borrowable;
133+
private boolean sessionLeased;
126134

127135
private FastOpenObjectReadSession(
128-
ObjectReadSession session, ObjectReadSessionStreamRead<Projection, ?> read) {
136+
ObjectReadSession session,
137+
ObjectReadSessionStreamRead<Projection, ?> read,
138+
Borrowable borrowable) {
129139
this.session = session;
130140
this.read = read;
141+
this.borrowable = borrowable;
142+
this.sessionLeased = false;
131143
}
132144

133145
ObjectReadSession getSession() {
146+
if (!sessionLeased) {
147+
sessionLeased = true;
148+
borrowable.borrow();
149+
}
134150
return session;
135151
}
136152

@@ -152,8 +168,10 @@ public void close() throws IOException {
152168
}
153169

154170
public static <Projection> FastOpenObjectReadSession<Projection> of(
155-
ObjectReadSession session, ObjectReadSessionStreamRead<Projection, ?> read) {
156-
return new FastOpenObjectReadSession<>(session, read);
171+
ObjectReadSession session,
172+
ObjectReadSessionStreamRead<Projection, ?> read,
173+
Borrowable borrowable) {
174+
return new FastOpenObjectReadSession<>(session, read, borrowable);
157175
}
158176
}
159177
}

google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java

Lines changed: 135 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -28,6 +28,7 @@
2828

2929
import com.google.api.core.ApiFuture;
3030
import com.google.api.core.ApiFutures;
31+
import com.google.api.gax.grpc.GrpcCallContext;
3132
import com.google.api.gax.retrying.RetrySettings;
3233
import com.google.api.gax.rpc.AbortedException;
3334
import com.google.api.gax.rpc.ApiException;
@@ -37,6 +38,8 @@
3738
import com.google.cloud.storage.Crc32cValue.Crc32cLengthKnown;
3839
import com.google.cloud.storage.Hasher.UncheckedChecksumMismatchException;
3940
import com.google.cloud.storage.Storage.BlobSourceOption;
41+
import com.google.cloud.storage.StorageDataClient.FastOpenObjectReadSession;
42+
import com.google.cloud.storage.ZeroCopySupport.DisposableByteString;
4043
import com.google.cloud.storage.it.ChecksummedTestContent;
4144
import com.google.cloud.storage.it.GrpcPlainRequestLoggingInterceptor;
4245
import com.google.cloud.storage.it.GrpcRequestAuditing;
@@ -49,6 +52,7 @@
4952
import com.google.common.io.ByteStreams;
5053
import com.google.protobuf.Any;
5154
import com.google.protobuf.ByteString;
55+
import com.google.protobuf.TextFormat;
5256
import com.google.storage.v2.BidiReadHandle;
5357
import com.google.storage.v2.BidiReadObjectError;
5458
import com.google.storage.v2.BidiReadObjectRedirectedError;
@@ -986,6 +990,9 @@ public void onNext(BidiReadObjectRequest request) {
986990
ApiFuture<byte[]> f3 =
987991
bd.readRange(RangeSpec.of(3, 3), RangeProjectionConfigs.asFutureBytes());
988992

993+
// make sure the first read succeeded
994+
byte[] actual = TestUtils.await(f1, 5, TimeUnit.SECONDS);
995+
989996
// close the "parent"
990997
bd.close();
991998

@@ -1005,8 +1012,6 @@ public void onNext(BidiReadObjectRequest request) {
10051012
assertThat(readRanges).isEqualTo(expected);
10061013
},
10071014
() -> {
1008-
// make sure the first read succeeded
1009-
byte[] actual = TestUtils.await(f1, 5, TimeUnit.SECONDS);
10101015
assertThat(ByteString.copyFrom(actual)).isEqualTo(ByteString.copyFromUtf8("A"));
10111016
},
10121017
// make sure the other two pending reads fail
@@ -1305,6 +1310,134 @@ public void onCompleted() {
13051310
}
13061311
}
13071312

1313+
@Test
1314+
public void gettingSessionFromFastOpenKeepsTheSessionOpenUntilClosed() throws Exception {
1315+
ChecksummedTestContent expected = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 10, 30);
1316+
1317+
ChecksummedTestContent content1 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 10, 10);
1318+
ChecksummedTestContent content2 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 20, 10);
1319+
ChecksummedTestContent content3 = ChecksummedTestContent.of(ALL_OBJECT_BYTES, 30, 10);
1320+
BidiReadObjectRequest req1 =
1321+
BidiReadObjectRequest.newBuilder()
1322+
.setReadObjectSpec(
1323+
BidiReadObjectSpec.newBuilder()
1324+
.setBucket(METADATA.getBucket())
1325+
.setObject(METADATA.getName())
1326+
.build())
1327+
.addReadRanges(getReadRange(1, 10, 10))
1328+
.build();
1329+
BidiReadObjectResponse res1 =
1330+
BidiReadObjectResponse.newBuilder()
1331+
.setMetadata(METADATA)
1332+
.addObjectDataRanges(
1333+
ObjectRangeData.newBuilder()
1334+
.setReadRange(getReadRange(1, 10, content1))
1335+
.setChecksummedData(content1.asChecksummedData())
1336+
.setRangeEnd(true)
1337+
.build())
1338+
.build();
1339+
1340+
BidiReadObjectRequest req2 = read(2, 20, 10);
1341+
BidiReadObjectResponse res2 =
1342+
BidiReadObjectResponse.newBuilder()
1343+
.addObjectDataRanges(
1344+
ObjectRangeData.newBuilder()
1345+
.setReadRange(getReadRange(2, 20, content2))
1346+
.setChecksummedData(content2.asChecksummedData())
1347+
.setRangeEnd(true)
1348+
.build())
1349+
.build();
1350+
BidiReadObjectRequest req3 = read(3, 30, 10);
1351+
BidiReadObjectResponse res3 =
1352+
BidiReadObjectResponse.newBuilder()
1353+
.addObjectDataRanges(
1354+
ObjectRangeData.newBuilder()
1355+
.setReadRange(getReadRange(3, 30, content3))
1356+
.setChecksummedData(content3.asChecksummedData())
1357+
.setRangeEnd(true)
1358+
.build())
1359+
.build();
1360+
1361+
System.out.println("req1 = " + TextFormat.printer().shortDebugString(req1));
1362+
System.out.println("req2 = " + TextFormat.printer().shortDebugString(req2));
1363+
System.out.println("req3 = " + TextFormat.printer().shortDebugString(req3));
1364+
ImmutableMap<BidiReadObjectRequest, BidiReadObjectResponse> db =
1365+
ImmutableMap.<BidiReadObjectRequest, BidiReadObjectResponse>builder()
1366+
.put(req1, res1)
1367+
.put(req2, res2)
1368+
.put(req3, res3)
1369+
.buildOrThrow();
1370+
1371+
FakeStorage fakeStorage = FakeStorage.from(db);
1372+
1373+
try (FakeServer fakeServer = FakeServer.of(fakeStorage);
1374+
GrpcStorageImpl storage =
1375+
(GrpcStorageImpl)
1376+
fakeServer
1377+
.getGrpcStorageOptions()
1378+
.toBuilder()
1379+
.setRetrySettings(RetrySettings.newBuilder().setMaxAttempts(1).build())
1380+
.build()
1381+
.getService()) {
1382+
StorageDataClient dataClient = storage.storageDataClient;
1383+
1384+
BidiReadObjectRequest req =
1385+
BidiReadObjectRequest.newBuilder()
1386+
.setReadObjectSpec(
1387+
BidiReadObjectSpec.newBuilder()
1388+
.setBucket(METADATA.getBucket())
1389+
.setObject(METADATA.getName())
1390+
.build())
1391+
.build();
1392+
1393+
ApiFuture<FastOpenObjectReadSession<ApiFuture<DisposableByteString>>> future =
1394+
dataClient.fastOpenReadSession(
1395+
req,
1396+
GrpcCallContext.createDefault(),
1397+
RangeSpec.of(10, 10),
1398+
RangeProjectionConfigs.asFutureByteString());
1399+
1400+
ByteString bytes = ByteString.empty();
1401+
Exception caught = null;
1402+
1403+
try (FastOpenObjectReadSession<ApiFuture<DisposableByteString>> fastOpenChannel =
1404+
future.get(5, TimeUnit.SECONDS);
1405+
ObjectReadSession session = fastOpenChannel.getSession()) {
1406+
ApiFuture<DisposableByteString> futureBytes1 = fastOpenChannel.getProjection();
1407+
try (DisposableByteString disposableByteString = futureBytes1.get()) {
1408+
bytes = bytes.concat(disposableByteString.byteString());
1409+
}
1410+
1411+
ApiFuture<DisposableByteString> futureBytes2 =
1412+
session.readRange(RangeSpec.of(20, 10), RangeProjectionConfigs.asFutureByteString());
1413+
try (DisposableByteString disposableByteString = futureBytes2.get()) {
1414+
bytes = bytes.concat(disposableByteString.byteString());
1415+
}
1416+
1417+
ApiFuture<DisposableByteString> futureBytes3 =
1418+
session.readRange(RangeSpec.of(30, 10), RangeProjectionConfigs.asFutureByteString());
1419+
try (DisposableByteString disposableByteString = futureBytes3.get()) {
1420+
bytes = bytes.concat(disposableByteString.byteString());
1421+
}
1422+
1423+
} catch (Exception e) {
1424+
// stash off any runtime failure so we can still do our assertions to help determine
1425+
// the true failure
1426+
caught = e;
1427+
} finally {
1428+
final ByteString finalBytes = bytes;
1429+
final Exception finalCaught = caught;
1430+
assertAll(
1431+
() -> assertThat(xxd(finalBytes)).isEqualTo(xxd(expected.getBytes())),
1432+
() -> {
1433+
if (finalCaught != null) {
1434+
throw new Exception("exception during test", finalCaught);
1435+
}
1436+
});
1437+
}
1438+
}
1439+
}
1440+
13081441
private static void runTestAgainstFakeServer(
13091442
FakeStorage fakeStorage, RangeSpec range, ChecksummedTestContent expected) throws Exception {
13101443

0 commit comments

Comments
 (0)