Skip to content

Commit af52279

Browse files
committed
chore: update BlobDescriptorState to own both GrpcCallContext and BidiReadObjectRequest building for opening
Add a test to ensure BlobDescriptorState correctly builds the request and context.
1 parent 72491e7 commit af52279

File tree

6 files changed

+195
-71
lines changed

6 files changed

+195
-71
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorImpl.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -124,9 +124,9 @@ static ApiFuture<BlobDescriptor> create(
124124
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
125125
ScheduledExecutorService executor,
126126
RetryContextProvider retryContextProvider) {
127-
BlobDescriptorState state = new BlobDescriptorState(openRequest);
127+
BlobDescriptorState state = new BlobDescriptorState(context, openRequest);
128128

129-
BlobDescriptorStream stream = BlobDescriptorStream.create(executor, callable, context, state);
129+
BlobDescriptorStream stream = BlobDescriptorStream.create(executor, callable, state);
130130

131131
ApiFuture<BlobDescriptor> blobDescriptorFuture =
132132
ApiFutures.transform(

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorState.java

Lines changed: 57 additions & 30 deletions
Original file line numberDiff line numberDiff line change
@@ -18,32 +18,38 @@
1818

1919
import static com.google.common.base.Preconditions.checkState;
2020

21+
import com.google.api.gax.grpc.GrpcCallContext;
2122
import com.google.cloud.storage.RetryContext.OnFailure;
22-
import com.google.common.collect.ImmutableList;
2323
import com.google.storage.v2.BidiReadHandle;
2424
import com.google.storage.v2.BidiReadObjectRequest;
2525
import com.google.storage.v2.Object;
26-
import com.google.storage.v2.ReadRange;
2726
import java.util.HashMap;
28-
import java.util.List;
2927
import java.util.Map;
3028
import java.util.concurrent.atomic.AtomicLong;
3129
import java.util.concurrent.atomic.AtomicReference;
3230
import java.util.concurrent.locks.ReentrantLock;
31+
import org.checkerframework.checker.lock.qual.GuardedBy;
3332
import org.checkerframework.checker.nullness.qual.MonotonicNonNull;
33+
import org.checkerframework.checker.nullness.qual.NonNull;
3434
import org.checkerframework.checker.nullness.qual.Nullable;
3535

3636
final class BlobDescriptorState {
3737

38+
private final GrpcCallContext baseContext;
3839
private final BidiReadObjectRequest openRequest;
3940
private final AtomicReference<@Nullable BidiReadHandle> bidiReadHandle;
4041
private final AtomicReference<@Nullable String> routingToken;
4142
private final AtomicReference<@MonotonicNonNull Object> metadata;
4243
private final AtomicLong readIdSeq;
44+
45+
@GuardedBy("this.lock") // https://errorprone.info/bugpattern/GuardedBy
4346
private final Map<Long, BlobDescriptorStreamRead> outstandingReads;
47+
4448
private final ReentrantLock lock;
4549

46-
BlobDescriptorState(BidiReadObjectRequest openRequest) {
50+
BlobDescriptorState(
51+
@NonNull GrpcCallContext baseContext, @NonNull BidiReadObjectRequest openRequest) {
52+
this.baseContext = baseContext;
4753
this.openRequest = openRequest;
4854
this.bidiReadHandle = new AtomicReference<>();
4955
this.routingToken = new AtomicReference<>();
@@ -53,19 +59,35 @@ final class BlobDescriptorState {
5359
this.lock = new ReentrantLock();
5460
}
5561

56-
BidiReadObjectRequest getOpenRequest() {
57-
Object obj = metadata.get();
58-
if (obj != null && obj.getGeneration() != openRequest.getReadObjectSpec().getGeneration()) {
59-
BidiReadObjectRequest.Builder b = openRequest.toBuilder();
60-
b.getReadObjectSpecBuilder().setGeneration(obj.getGeneration());
61-
return b.build();
62-
}
63-
return openRequest;
64-
}
62+
OpenArguments getOpenArguments() {
63+
lock.lock();
64+
try {
65+
BidiReadObjectRequest.Builder b = openRequest.toBuilder().clearReadRanges();
6566

66-
@Nullable
67-
BidiReadHandle getBidiReadHandle() {
68-
return bidiReadHandle.get();
67+
Object obj = metadata.get();
68+
if (obj != null && obj.getGeneration() != openRequest.getReadObjectSpec().getGeneration()) {
69+
b.getReadObjectSpecBuilder().setGeneration(obj.getGeneration());
70+
}
71+
72+
String routingToken = this.routingToken.get();
73+
if (routingToken != null) {
74+
b.getReadObjectSpecBuilder().setRoutingToken(routingToken);
75+
}
76+
77+
BidiReadHandle bidiReadHandle = this.bidiReadHandle.get();
78+
if (bidiReadHandle != null) {
79+
b.getReadObjectSpecBuilder().setReadHandle(bidiReadHandle);
80+
}
81+
82+
outstandingReads.values().stream()
83+
.filter(BlobDescriptorStreamRead::readyToSend)
84+
.map(BlobDescriptorStreamRead::makeReadRange)
85+
.forEach(b::addReadRanges);
86+
87+
return OpenArguments.of(baseContext, b.build());
88+
} finally {
89+
lock.unlock();
90+
}
6991
}
7092

7193
void setBidiReadHandle(BidiReadHandle newValue) {
@@ -123,11 +145,6 @@ void setRoutingToken(String routingToken) {
123145
this.routingToken.set(routingToken);
124146
}
125147

126-
@Nullable
127-
String getRoutingToken() {
128-
return this.routingToken.get();
129-
}
130-
131148
BlobDescriptorStreamRead assignNewReadId(long oldReadId) {
132149
lock.lock();
133150
try {
@@ -142,15 +159,25 @@ BlobDescriptorStreamRead assignNewReadId(long oldReadId) {
142159
}
143160
}
144161

145-
List<ReadRange> getOutstandingReads() {
146-
lock.lock();
147-
try {
148-
return outstandingReads.values().stream()
149-
.filter(BlobDescriptorStreamRead::readyToSend)
150-
.map(BlobDescriptorStreamRead::makeReadRange)
151-
.collect(ImmutableList.toImmutableList());
152-
} finally {
153-
lock.unlock();
162+
static final class OpenArguments {
163+
private final GrpcCallContext ctx;
164+
private final BidiReadObjectRequest req;
165+
166+
private OpenArguments(GrpcCallContext ctx, BidiReadObjectRequest req) {
167+
this.ctx = ctx;
168+
this.req = req;
169+
}
170+
171+
public GrpcCallContext getCtx() {
172+
return ctx;
173+
}
174+
175+
public BidiReadObjectRequest getReq() {
176+
return req;
177+
}
178+
179+
public static OpenArguments of(GrpcCallContext ctx, BidiReadObjectRequest req) {
180+
return new OpenArguments(ctx, req);
154181
}
155182
}
156183
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStream.java

Lines changed: 15 additions & 38 deletions
Original file line numberDiff line numberDiff line change
@@ -25,20 +25,19 @@
2525
import com.google.api.gax.rpc.ClientStream;
2626
import com.google.api.gax.rpc.ResponseObserver;
2727
import com.google.api.gax.rpc.StreamController;
28+
import com.google.cloud.storage.BlobDescriptorState.OpenArguments;
2829
import com.google.cloud.storage.GrpcUtils.ZeroCopyBidiStreamingCallable;
2930
import com.google.cloud.storage.Hasher.UncheckedChecksumMismatchException;
3031
import com.google.cloud.storage.ResponseContentLifecycleHandle.ChildRef;
3132
import com.google.cloud.storage.RetryContext.OnSuccess;
3233
import com.google.common.base.Preconditions;
3334
import com.google.protobuf.ByteString;
3435
import com.google.rpc.Status;
35-
import com.google.storage.v2.BidiReadHandle;
3636
import com.google.storage.v2.BidiReadObjectError;
3737
import com.google.storage.v2.BidiReadObjectRedirectedError;
3838
import com.google.storage.v2.BidiReadObjectRequest;
3939
import com.google.storage.v2.BidiReadObjectResponse;
4040
import com.google.storage.v2.ChecksummedData;
41-
import com.google.storage.v2.Object;
4241
import com.google.storage.v2.ObjectRangeData;
4342
import com.google.storage.v2.ReadRange;
4443
import com.google.storage.v2.ReadRangeError;
@@ -51,6 +50,7 @@
5150
import java.util.concurrent.TimeUnit;
5251
import java.util.concurrent.TimeoutException;
5352
import java.util.concurrent.atomic.AtomicInteger;
53+
import org.checkerframework.checker.nullness.qual.Nullable;
5454

5555
final class BlobDescriptorStream
5656
implements ClientStream<BidiReadObjectRequest>, ApiFuture<Void>, AutoCloseable {
@@ -61,7 +61,6 @@ final class BlobDescriptorStream
6161
private final ScheduledExecutorService executor;
6262
private final ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse>
6363
callable;
64-
private final GrpcCallContext context;
6564
private final int maxRedirectsAllowed;
6665

6766
private volatile boolean open;
@@ -75,19 +74,18 @@ private BlobDescriptorStream(
7574
BlobDescriptorState state,
7675
ScheduledExecutorService executor,
7776
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
78-
GrpcCallContext context,
7977
int maxRedirectsAllowed) {
8078
this.state = state;
8179
this.executor = executor;
8280
this.callable = callable;
83-
this.context = context;
8481
this.blobDescriptorResolveFuture = SettableApiFuture.create();
8582
this.open = true;
8683
this.redirectCounter = new AtomicInteger();
8784
this.maxRedirectsAllowed = maxRedirectsAllowed;
8885
}
8986

90-
public ClientStream<BidiReadObjectRequest> getRequestStream() {
87+
// TODO: make this more elegant
88+
private ClientStream<BidiReadObjectRequest> getRequestStream(@Nullable GrpcCallContext context) {
9189
if (requestStream != null) {
9290
return requestStream;
9391
} else {
@@ -127,30 +125,31 @@ public void close() throws IOException {
127125
public void send(BidiReadObjectRequest request) {
128126
checkOpen();
129127
if (requestStream == null) {
128+
OpenArguments openArguments = state.getOpenArguments();
130129
BidiReadObjectRequest merged =
131-
state.getOpenRequest().toBuilder().clearReadRanges().mergeFrom(request).build();
132-
getRequestStream().send(merged);
130+
openArguments.getReq().toBuilder().clearReadRanges().mergeFrom(request).build();
131+
getRequestStream(openArguments.getCtx()).send(merged);
133132
} else {
134-
getRequestStream().send(request);
133+
getRequestStream(null).send(request);
135134
}
136135
}
137136

138137
@Override
139138
public void closeSendWithError(Throwable t) {
140139
checkOpen();
141-
getRequestStream().closeSendWithError(t);
140+
getRequestStream(null).closeSendWithError(t);
142141
}
143142

144143
@Override
145144
public void closeSend() {
146145
checkOpen();
147-
getRequestStream().closeSend();
146+
getRequestStream(null).closeSend();
148147
}
149148

150149
@Override
151150
public boolean isSendReady() {
152151
checkOpen();
153-
return getRequestStream().isSendReady();
152+
return getRequestStream(null).isSendReady();
154153
}
155154

156155
@Override
@@ -191,30 +190,9 @@ private void checkOpen() {
191190
private void restart() {
192191
reset();
193192

194-
BidiReadObjectRequest openRequest = state.getOpenRequest();
195-
BidiReadObjectRequest.Builder b = openRequest.toBuilder().clearReadRanges();
196-
197-
String routingToken = state.getRoutingToken();
198-
if (routingToken != null) {
199-
b.getReadObjectSpecBuilder().setRoutingToken(routingToken);
200-
}
201-
202-
BidiReadHandle bidiReadHandle = state.getBidiReadHandle();
203-
if (bidiReadHandle != null) {
204-
b.getReadObjectSpecBuilder().setReadHandle(bidiReadHandle);
205-
}
206-
207-
b.addAllReadRanges(state.getOutstandingReads());
208-
if (openRequest.getReadObjectSpec().getGeneration() <= 0) {
209-
Object metadata = state.getMetadata();
210-
if (metadata != null) {
211-
b.getReadObjectSpecBuilder().setGeneration(metadata.getGeneration());
212-
}
213-
}
214-
215-
BidiReadObjectRequest restartRequest = b.build();
216-
ClientStream<BidiReadObjectRequest> requestStream1 = getRequestStream();
217-
requestStream1.send(restartRequest);
193+
OpenArguments openArguments = state.getOpenArguments();
194+
ClientStream<BidiReadObjectRequest> requestStream1 = getRequestStream(openArguments.getCtx());
195+
requestStream1.send(openArguments.getReq());
218196
}
219197

220198
private void reset() {
@@ -484,11 +462,10 @@ public void onComplete() {
484462
static BlobDescriptorStream create(
485463
ScheduledExecutorService executor,
486464
ZeroCopyBidiStreamingCallable<BidiReadObjectRequest, BidiReadObjectResponse> callable,
487-
GrpcCallContext context,
488465
BlobDescriptorState state) {
489466

490467
int maxRedirectsAllowed = 3; // TODO: make this configurable in the ultimate public surface
491-
return new BlobDescriptorStream(state, executor, callable, context, maxRedirectsAllowed);
468+
return new BlobDescriptorStream(state, executor, callable, maxRedirectsAllowed);
492469
}
493470

494471
static final class MaxRedirectsExceededException extends RuntimeException {

google-cloud-storage/src/test/java/com/google/cloud/storage/BlobDescriptorImplTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -107,7 +107,7 @@ private static RangeSpec spec(@Nullable Long begin, @Nullable Long length) {
107107
}
108108

109109
private static BlobDescriptorState stateWithObjectSize(long objectSize) {
110-
BlobDescriptorState state = new BlobDescriptorState(null);
110+
BlobDescriptorState state = new BlobDescriptorState(null, null);
111111
state.setMetadata(
112112
Object.newBuilder()
113113
.setBucket("projects/_/buckets/b")

0 commit comments

Comments
 (0)