Skip to content

Commit f8e54b5

Browse files
committed
chore: implement StorageDataClient#fastOpen
To allow opening an ObjectReadSession along with a read range. This saves the round trip of opening the session then sending the read.
1 parent 85049b9 commit f8e54b5

File tree

4 files changed

+260
-42
lines changed

4 files changed

+260
-42
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionImpl.java

Lines changed: 2 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -20,7 +20,6 @@
2020

2121
import com.google.api.core.ApiFuture;
2222
import com.google.api.core.ApiFutures;
23-
import com.google.api.gax.grpc.GrpcCallContext;
2423
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
2524
import com.google.cloud.storage.RetryContext.RetryContextProvider;
2625
import com.google.common.annotations.VisibleForTesting;
@@ -56,7 +55,7 @@ final class ObjectReadSessionImpl implements ObjectReadSession {
5655
@GuardedBy("this.lock")
5756
private volatile boolean open;
5857

59-
private ObjectReadSessionImpl(
58+
ObjectReadSessionImpl(
6059
ScheduledExecutorService executor,
6160
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
6261
ObjectReadSessionStream stream,
@@ -120,7 +119,7 @@ public void close() throws IOException {
120119
}
121120
}
122121

123-
private void registerReadInState(long readId, ObjectReadSessionStreamRead read) {
122+
private void registerReadInState(long readId, ObjectReadSessionStreamRead<?, ?> read) {
124123
BidiReadObjectRequest request =
125124
BidiReadObjectRequest.newBuilder().addReadRanges(read.makeReadRange()).build();
126125
if (state.canHandleNewRead(read)) {
@@ -140,25 +139,4 @@ private void registerReadInState(long readId, ObjectReadSessionStreamRead read)
140139
newStream.send(request);
141140
}
142141
}
143-
144-
static ApiFuture<ObjectReadSession> create(
145-
BidiReadObjectRequest openRequest,
146-
GrpcCallContext context,
147-
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
148-
ScheduledExecutorService executor,
149-
RetryContextProvider retryContextProvider) {
150-
ObjectReadSessionState state = new ObjectReadSessionState(context, openRequest);
151-
152-
ObjectReadSessionStream stream =
153-
ObjectReadSessionStream.create(executor, callable, state, retryContextProvider.create());
154-
155-
ApiFuture<ObjectReadSession> objectReadSessionFuture =
156-
ApiFutures.transform(
157-
stream,
158-
nowOpen ->
159-
new ObjectReadSessionImpl(executor, callable, stream, state, retryContextProvider),
160-
executor);
161-
stream.send(openRequest);
162-
return objectReadSessionFuture;
163-
}
164142
}

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageDataClient.java

Lines changed: 94 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -19,8 +19,10 @@
1919
import static com.google.common.base.Preconditions.checkArgument;
2020

2121
import com.google.api.core.ApiFuture;
22+
import com.google.api.core.ApiFutures;
2223
import com.google.api.gax.grpc.GrpcCallContext;
2324
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
25+
import com.google.cloud.storage.ObjectReadSessionState.OpenArguments;
2426
import com.google.cloud.storage.RetryContext.RetryContextProvider;
2527
import com.google.storage.v2.BidiReadObjectRequest;
2628
import com.google.storage.v2.BidiReadObjectResponse;
@@ -30,26 +32,76 @@
3032
final class StorageDataClient implements IOAutoCloseable {
3133

3234
private final ScheduledExecutorService executor;
33-
private final ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> read;
35+
private final ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse>
36+
bidiReadObject;
3437
private final RetryContextProvider retryContextProvider;
3538
private final IOAutoCloseable onClose;
3639

3740
private StorageDataClient(
3841
ScheduledExecutorService executor,
39-
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> read,
42+
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> bidiReadObject,
4043
RetryContextProvider retryContextProvider,
4144
IOAutoCloseable onClose) {
4245
this.executor = executor;
43-
this.read = read;
46+
this.bidiReadObject = bidiReadObject;
4447
this.retryContextProvider = retryContextProvider;
4548
this.onClose = onClose;
4649
}
4750

4851
ApiFuture<ObjectReadSession> readSession(BidiReadObjectRequest req, GrpcCallContext ctx) {
4952
checkArgument(
5053
req.getReadRangesList().isEmpty(),
51-
"ranged included in the initial request are not supported");
52-
return ObjectReadSessionImpl.create(req, ctx, read, executor, retryContextProvider);
54+
"ranges included in the initial request are not supported");
55+
ObjectReadSessionState state = new ObjectReadSessionState(ctx, req);
56+
57+
ObjectReadSessionStream stream =
58+
ObjectReadSessionStream.create(
59+
executor, bidiReadObject, state, retryContextProvider.create());
60+
61+
ApiFuture<ObjectReadSession> objectReadSessionFuture =
62+
ApiFutures.transform(
63+
stream,
64+
nowOpen ->
65+
new ObjectReadSessionImpl(
66+
executor, bidiReadObject, stream, state, retryContextProvider),
67+
executor);
68+
stream.send(req);
69+
return objectReadSessionFuture;
70+
}
71+
72+
<Projection> ApiFuture<FastOpenObjectReadSession<Projection>> fastOpenReadSession(
73+
BidiReadObjectRequest openRequest,
74+
GrpcCallContext ctx,
75+
RangeSpec range,
76+
RangeProjectionConfig<Projection> config) {
77+
checkArgument(
78+
openRequest.getReadRangesList().isEmpty(),
79+
"ranges included in the initial request are not supported");
80+
ObjectReadSessionState state = new ObjectReadSessionState(ctx, openRequest);
81+
82+
ObjectReadSessionStream stream =
83+
ObjectReadSessionStream.create(
84+
executor, bidiReadObject, state, retryContextProvider.create());
85+
86+
long readId = state.newReadId();
87+
ObjectReadSessionStreamRead<Projection, ?> read =
88+
config.cast().newRead(readId, range, retryContextProvider.create());
89+
state.putOutstandingRead(readId, read);
90+
91+
ApiFuture<FastOpenObjectReadSession<Projection>> objectReadSessionFuture =
92+
ApiFutures.transform(
93+
stream,
94+
nowOpen ->
95+
new FastOpenObjectReadSession<>(
96+
new ObjectReadSessionImpl(
97+
executor, bidiReadObject, stream, state, retryContextProvider),
98+
read),
99+
executor);
100+
OpenArguments openArguments = state.getOpenArguments();
101+
BidiReadObjectRequest req = openArguments.getReq();
102+
stream.send(req);
103+
read.setOnCloseCallback(stream);
104+
return objectReadSessionFuture;
53105
}
54106

55107
@Override
@@ -67,4 +119,41 @@ static StorageDataClient create(
67119
IOAutoCloseable onClose) {
68120
return new StorageDataClient(executor, read, retryContextProvider, onClose);
69121
}
122+
123+
static final class FastOpenObjectReadSession<Projection> implements IOAutoCloseable {
124+
private final ObjectReadSession session;
125+
private final ObjectReadSessionStreamRead<Projection, ?> read;
126+
127+
private FastOpenObjectReadSession(
128+
ObjectReadSession session, ObjectReadSessionStreamRead<Projection, ?> read) {
129+
this.session = session;
130+
this.read = read;
131+
}
132+
133+
ObjectReadSession getSession() {
134+
return session;
135+
}
136+
137+
ObjectReadSessionStreamRead<Projection, ?> getRead() {
138+
return read;
139+
}
140+
141+
Projection getProjection() {
142+
return read.project();
143+
}
144+
145+
@Override
146+
public void close() throws IOException {
147+
//noinspection EmptyTryBlock
148+
try (IOAutoCloseable ignore1 = session;
149+
IOAutoCloseable ignore2 = read) {
150+
// use try-with to ensure full cleanup
151+
}
152+
}
153+
154+
public static <Projection> FastOpenObjectReadSession<Projection> of(
155+
ObjectReadSession session, ObjectReadSessionStreamRead<Projection, ?> read) {
156+
return new FastOpenObjectReadSession<>(session, read);
157+
}
158+
}
70159
}

google-cloud-storage/src/test/java/com/google/cloud/storage/ITObjectReadSessionFakeTest.java

Lines changed: 12 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -1332,38 +1332,37 @@ private static void runTestAgainstFakeServer(
13321332
}
13331333
}
13341334

1335-
private static BidiReadObjectRequest read(int readId, int readOffset, int readLimit) {
1335+
static BidiReadObjectRequest read(int readId, int readOffset, int readLength) {
13361336
return BidiReadObjectRequest.newBuilder()
1337-
.addReadRanges(getReadRange(readId, readOffset, readLimit))
1337+
.addReadRanges(getReadRange(readId, readOffset, readLength))
13381338
.build();
13391339
}
13401340

1341-
private static ReadRange getReadRange(
1342-
int readId, int readOffset, ChecksummedTestContent content) {
1341+
static ReadRange getReadRange(int readId, int readOffset, ChecksummedTestContent content) {
13431342
return getReadRange(readId, readOffset, content.asChecksummedData().getContent().size());
13441343
}
13451344

1346-
private static ReadRange getReadRange(int readId, int readOffset, int readLimit) {
1345+
static ReadRange getReadRange(int readId, int readOffset, int readLength) {
13471346
return ReadRange.newBuilder()
13481347
.setReadId(readId)
13491348
.setReadOffset(readOffset)
1350-
.setReadLength(readLimit)
1349+
.setReadLength(readLength)
13511350
.build();
13521351
}
13531352

1354-
private static ThrowingRunnable assert503(ApiFuture<?> f) {
1353+
static ThrowingRunnable assert503(ApiFuture<?> f) {
13551354
return assertStatusCodeIs(f, 503);
13561355
}
13571356

1358-
private static ThrowingRunnable assertStatusCodeIs(ApiFuture<?> f, int expected) {
1357+
static ThrowingRunnable assertStatusCodeIs(ApiFuture<?> f, int expected) {
13591358
return () -> {
13601359
StorageException se =
13611360
assertThrows(StorageException.class, () -> TestUtils.await(f, 5, TimeUnit.SECONDS));
13621361
assertThat(se.getCode()).isEqualTo(expected);
13631362
};
13641363
}
13651364

1366-
private static String fmt(ReadRange r) {
1365+
static String fmt(ReadRange r) {
13671366
return String.format(
13681367
"ReadRange{id: %d, offset: %d, length: %d}",
13691368
r.getReadId(), r.getReadOffset(), r.getReadLength());
@@ -1384,7 +1383,7 @@ static ObjectReadSessionImpl getObjectReadSessionImpl(BlobReadSession bd) {
13841383
return orsi;
13851384
}
13861385

1387-
private static final class FakeStorage extends StorageImplBase {
1386+
static final class FakeStorage extends StorageImplBase {
13881387

13891388
private final Map<BidiReadObjectRequest, Consumer<StreamObserver<BidiReadObjectResponse>>> db;
13901389

@@ -1408,17 +1407,17 @@ public void onNext(BidiReadObjectRequest req) {
14081407
};
14091408
}
14101409

1411-
private static FakeStorage of(
1410+
static FakeStorage of(
14121411
Map<BidiReadObjectRequest, Consumer<StreamObserver<BidiReadObjectResponse>>> db) {
14131412
return new FakeStorage(db);
14141413
}
14151414

1416-
private static FakeStorage from(Map<BidiReadObjectRequest, BidiReadObjectResponse> db) {
1415+
static FakeStorage from(Map<BidiReadObjectRequest, BidiReadObjectResponse> db) {
14171416
return new FakeStorage(Maps.transformValues(db, resp -> (respond) -> respond.onNext(resp)));
14181417
}
14191418
}
14201419

1421-
private abstract static class AbstractObserver implements StreamObserver<BidiReadObjectRequest> {
1420+
abstract static class AbstractObserver implements StreamObserver<BidiReadObjectRequest> {
14221421

14231422
protected final StreamObserver<BidiReadObjectResponse> respond;
14241423

0 commit comments

Comments
 (0)