2525import com .google .api .gax .rpc .ClientStream ;
2626import com .google .api .gax .rpc .ResponseObserver ;
2727import com .google .api .gax .rpc .StreamController ;
28+ import com .google .cloud .storage .BlobDescriptorState .OpenArguments ;
2829import com .google .cloud .storage .GrpcUtils .ZeroCopyBidiStreamingCallable ;
2930import com .google .cloud .storage .Hasher .UncheckedChecksumMismatchException ;
3031import com .google .cloud .storage .ResponseContentLifecycleHandle .ChildRef ;
3132import com .google .cloud .storage .RetryContext .OnSuccess ;
3233import com .google .common .base .Preconditions ;
3334import com .google .protobuf .ByteString ;
3435import com .google .rpc .Status ;
35- import com .google .storage .v2 .BidiReadHandle ;
3636import com .google .storage .v2 .BidiReadObjectError ;
3737import com .google .storage .v2 .BidiReadObjectRedirectedError ;
3838import com .google .storage .v2 .BidiReadObjectRequest ;
3939import com .google .storage .v2 .BidiReadObjectResponse ;
4040import com .google .storage .v2 .ChecksummedData ;
41- import com .google .storage .v2 .Object ;
4241import com .google .storage .v2 .ObjectRangeData ;
4342import com .google .storage .v2 .ReadRange ;
4443import com .google .storage .v2 .ReadRangeError ;
5150import java .util .concurrent .TimeUnit ;
5251import java .util .concurrent .TimeoutException ;
5352import java .util .concurrent .atomic .AtomicInteger ;
53+ import org .checkerframework .checker .nullness .qual .Nullable ;
5454
5555final class BlobDescriptorStream
5656 implements ClientStream <BidiReadObjectRequest >, ApiFuture <Void >, AutoCloseable {
@@ -61,7 +61,6 @@ final class BlobDescriptorStream
6161 private final ScheduledExecutorService executor ;
6262 private final ZeroCopyBidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse >
6363 callable ;
64- private final GrpcCallContext context ;
6564 private final int maxRedirectsAllowed ;
6665
6766 private volatile boolean open ;
@@ -75,19 +74,18 @@ private BlobDescriptorStream(
7574 BlobDescriptorState state ,
7675 ScheduledExecutorService executor ,
7776 ZeroCopyBidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > callable ,
78- GrpcCallContext context ,
7977 int maxRedirectsAllowed ) {
8078 this .state = state ;
8179 this .executor = executor ;
8280 this .callable = callable ;
83- this .context = context ;
8481 this .blobDescriptorResolveFuture = SettableApiFuture .create ();
8582 this .open = true ;
8683 this .redirectCounter = new AtomicInteger ();
8784 this .maxRedirectsAllowed = maxRedirectsAllowed ;
8885 }
8986
90- public ClientStream <BidiReadObjectRequest > getRequestStream () {
87+ // TODO: make this more elegant
88+ private ClientStream <BidiReadObjectRequest > getRequestStream (@ Nullable GrpcCallContext context ) {
9189 if (requestStream != null ) {
9290 return requestStream ;
9391 } else {
@@ -127,30 +125,31 @@ public void close() throws IOException {
127125 public void send (BidiReadObjectRequest request ) {
128126 checkOpen ();
129127 if (requestStream == null ) {
128+ OpenArguments openArguments = state .getOpenArguments ();
130129 BidiReadObjectRequest merged =
131- state . getOpenRequest ().toBuilder ().clearReadRanges ().mergeFrom (request ).build ();
132- getRequestStream ().send (merged );
130+ openArguments . getReq ().toBuilder ().clearReadRanges ().mergeFrom (request ).build ();
131+ getRequestStream (openArguments . getCtx () ).send (merged );
133132 } else {
134- getRequestStream ().send (request );
133+ getRequestStream (null ).send (request );
135134 }
136135 }
137136
138137 @ Override
139138 public void closeSendWithError (Throwable t ) {
140139 checkOpen ();
141- getRequestStream ().closeSendWithError (t );
140+ getRequestStream (null ).closeSendWithError (t );
142141 }
143142
144143 @ Override
145144 public void closeSend () {
146145 checkOpen ();
147- getRequestStream ().closeSend ();
146+ getRequestStream (null ).closeSend ();
148147 }
149148
150149 @ Override
151150 public boolean isSendReady () {
152151 checkOpen ();
153- return getRequestStream ().isSendReady ();
152+ return getRequestStream (null ).isSendReady ();
154153 }
155154
156155 @ Override
@@ -191,30 +190,9 @@ private void checkOpen() {
191190 private void restart () {
192191 reset ();
193192
194- BidiReadObjectRequest openRequest = state .getOpenRequest ();
195- BidiReadObjectRequest .Builder b = openRequest .toBuilder ().clearReadRanges ();
196-
197- String routingToken = state .getRoutingToken ();
198- if (routingToken != null ) {
199- b .getReadObjectSpecBuilder ().setRoutingToken (routingToken );
200- }
201-
202- BidiReadHandle bidiReadHandle = state .getBidiReadHandle ();
203- if (bidiReadHandle != null ) {
204- b .getReadObjectSpecBuilder ().setReadHandle (bidiReadHandle );
205- }
206-
207- b .addAllReadRanges (state .getOutstandingReads ());
208- if (openRequest .getReadObjectSpec ().getGeneration () <= 0 ) {
209- Object metadata = state .getMetadata ();
210- if (metadata != null ) {
211- b .getReadObjectSpecBuilder ().setGeneration (metadata .getGeneration ());
212- }
213- }
214-
215- BidiReadObjectRequest restartRequest = b .build ();
216- ClientStream <BidiReadObjectRequest > requestStream1 = getRequestStream ();
217- requestStream1 .send (restartRequest );
193+ OpenArguments openArguments = state .getOpenArguments ();
194+ ClientStream <BidiReadObjectRequest > requestStream1 = getRequestStream (openArguments .getCtx ());
195+ requestStream1 .send (openArguments .getReq ());
218196 }
219197
220198 private void reset () {
@@ -484,11 +462,10 @@ public void onComplete() {
484462 static BlobDescriptorStream create (
485463 ScheduledExecutorService executor ,
486464 ZeroCopyBidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > callable ,
487- GrpcCallContext context ,
488465 BlobDescriptorState state ) {
489466
490467 int maxRedirectsAllowed = 3 ; // TODO: make this configurable in the ultimate public surface
491- return new BlobDescriptorStream (state , executor , callable , context , maxRedirectsAllowed );
468+ return new BlobDescriptorStream (state , executor , callable , maxRedirectsAllowed );
492469 }
493470
494471 static final class MaxRedirectsExceededException extends RuntimeException {
0 commit comments