2222import com .google .api .gax .grpc .GrpcCallContext ;
2323import com .google .api .gax .rpc .BidiStreamingCallable ;
2424import com .google .api .gax .rpc .ClientStream ;
25+ import com .google .api .gax .rpc .ResponseObserver ;
2526import com .google .api .gax .rpc .StateCheckingResponseObserver ;
2627import com .google .api .gax .rpc .StreamController ;
2728import com .google .cloud .storage .ResponseContentLifecycleHandle .ChildRef ;
3839import java .util .List ;
3940import java .util .Map ;
4041import java .util .concurrent .ConcurrentHashMap ;
42+ import java .util .concurrent .ExecutionException ;
4143import java .util .concurrent .Executor ;
44+ import java .util .concurrent .TimeUnit ;
45+ import java .util .concurrent .TimeoutException ;
4246import java .util .concurrent .atomic .AtomicLong ;
4347import java .util .concurrent .atomic .AtomicReference ;
4448
4549final class BlobDescriptorImpl implements BlobDescriptor {
4650
47- private final BlobDescriptorStreamPair stream ;
51+ private final BlobDescriptorStream stream ;
4852 private final BlobDescriptorState state ;
4953 private final BlobInfo info ;
5054
51- private BlobDescriptorImpl (BlobDescriptorStreamPair stream , BlobDescriptorState state ) {
55+ private BlobDescriptorImpl (BlobDescriptorStream stream , BlobDescriptorState state ) {
5256 this .stream = stream ;
5357 this .state = state ;
5458 this .info = Conversions .grpc ().blobInfo ().decode (state .metadata .get ());
@@ -63,7 +67,7 @@ public ApiFuture<byte[]> readRangeAsBytes(ByteRangeSpec range) {
6367 BidiReadObjectRequest request =
6468 BidiReadObjectRequest .newBuilder ().addReadRanges (value .makeReadRange ()).build ();
6569 state .outstandingReads .put (readId , value );
66- stream .requestStream . send (request );
70+ stream .send (request );
6771 return future ;
6872 }
6973
@@ -78,34 +82,136 @@ static ApiFuture<BlobDescriptor> create(
7882 BidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > callable ,
7983 ResponseContentLifecycleManager <BidiReadObjectResponse > bidiResponseContentLifecycleManager ,
8084 Executor executor ) {
81- SettableApiFuture <Void > pendingOpen = SettableApiFuture .create ();
8285 BlobDescriptorState state = new BlobDescriptorState (openRequest );
86+
8387 BlobDescriptorResponseObserver responseObserver =
84- new BlobDescriptorResponseObserver (
85- pendingOpen , state , executor , bidiResponseContentLifecycleManager );
86- ClientStream <BidiReadObjectRequest > requestStream =
87- callable .splitCall (responseObserver , context );
88- BlobDescriptorStreamPair stream = new BlobDescriptorStreamPair (requestStream , responseObserver );
88+ new BlobDescriptorResponseObserver (state , executor , bidiResponseContentLifecycleManager );
89+
90+ BlobDescriptorStream stream = new BlobDescriptorStream (callable , context , responseObserver );
91+
8992 ApiFuture <BlobDescriptor > blobDescriptorFuture =
90- ApiFutures .transform (
91- pendingOpen , nowOpen -> new BlobDescriptorImpl (stream , state ), executor );
92- stream .getRequestStream ().send (openRequest );
93+ ApiFutures .transform (stream , nowOpen -> new BlobDescriptorImpl (stream , state ), executor );
94+ stream .send (openRequest );
9395 return StorageException .coalesceAsync (blobDescriptorFuture );
9496 }
9597
96- private static final class BlobDescriptorStreamPair {
97- private final ClientStream <BidiReadObjectRequest > requestStream ;
98- private final BlobDescriptorResponseObserver responseObserver ;
98+ private static final class BlobDescriptorStream
99+ implements ClientStream <BidiReadObjectRequest >, ApiFuture < Void > {
100+ private final SettableApiFuture < Void > openSignal ;
99101
100- BlobDescriptorStreamPair (
101- ClientStream <BidiReadObjectRequest > requestStream ,
102+ private final BidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > callable ;
103+ private final GrpcCallContext context ;
104+ private final ResponseObserver <BidiReadObjectResponse > responseObserver ;
105+ private final OpenMonitorResponseObserver openMonitorResponseObserver ;
106+
107+ private volatile ClientStream <BidiReadObjectRequest > requestStream ;
108+
109+ public BlobDescriptorStream (
110+ BidiStreamingCallable <BidiReadObjectRequest , BidiReadObjectResponse > callable ,
111+ GrpcCallContext context ,
102112 BlobDescriptorResponseObserver responseObserver ) {
103- this .requestStream = requestStream ;
113+ this .callable = callable ;
114+ this .context = context ;
104115 this .responseObserver = responseObserver ;
116+ this .openMonitorResponseObserver = new OpenMonitorResponseObserver (responseObserver );
117+ this .openSignal = SettableApiFuture .create ();
105118 }
106119
107120 public ClientStream <BidiReadObjectRequest > getRequestStream () {
108- return requestStream ;
121+ if (requestStream != null ) {
122+ return requestStream ;
123+ } else {
124+ synchronized (this ) {
125+ if (requestStream == null ) {
126+ requestStream = callable .splitCall (openMonitorResponseObserver , context );
127+ }
128+ return requestStream ;
129+ }
130+ }
131+ }
132+
133+ @ Override
134+ public void send (BidiReadObjectRequest request ) {
135+ getRequestStream ().send (request );
136+ }
137+
138+ @ Override
139+ public void closeSendWithError (Throwable t ) {
140+ getRequestStream ().closeSendWithError (t );
141+ }
142+
143+ @ Override
144+ public void closeSend () {
145+ getRequestStream ().closeSend ();
146+ }
147+
148+ @ Override
149+ public boolean isSendReady () {
150+ return getRequestStream ().isSendReady ();
151+ }
152+
153+ @ Override
154+ public void addListener (Runnable listener , Executor executor ) {
155+ openSignal .addListener (listener , executor );
156+ }
157+
158+ @ Override
159+ public boolean cancel (boolean mayInterruptIfRunning ) {
160+ return openSignal .cancel (mayInterruptIfRunning );
161+ }
162+
163+ @ Override
164+ public Void get () throws InterruptedException , ExecutionException {
165+ return openSignal .get ();
166+ }
167+
168+ @ Override
169+ public Void get (long timeout , TimeUnit unit )
170+ throws InterruptedException , ExecutionException , TimeoutException {
171+ return openSignal .get (timeout , unit );
172+ }
173+
174+ @ Override
175+ public boolean isCancelled () {
176+ return openSignal .isCancelled ();
177+ }
178+
179+ @ Override
180+ public boolean isDone () {
181+ return openSignal .isDone ();
182+ }
183+
184+ private class OpenMonitorResponseObserver
185+ extends StateCheckingResponseObserver <BidiReadObjectResponse > {
186+
187+ private final BlobDescriptorResponseObserver responseObserver ;
188+
189+ private OpenMonitorResponseObserver (BlobDescriptorResponseObserver responseObserver ) {
190+ this .responseObserver = responseObserver ;
191+ }
192+
193+ @ Override
194+ protected void onStartImpl (StreamController controller ) {
195+ responseObserver .onStartImpl (controller );
196+ }
197+
198+ @ Override
199+ protected void onResponseImpl (BidiReadObjectResponse response ) {
200+ responseObserver .onResponseImpl (response );
201+ openSignal .set (null );
202+ }
203+
204+ @ Override
205+ protected void onErrorImpl (Throwable t ) {
206+ responseObserver .onErrorImpl (t );
207+ openSignal .setException (t );
208+ }
209+
210+ @ Override
211+ protected void onCompleteImpl () {
212+ responseObserver .onCompleteImpl ();
213+ openSignal .set (null );
214+ }
109215 }
110216 }
111217
@@ -118,15 +224,11 @@ private static final class BlobDescriptorResponseObserver
118224 private final ResponseContentLifecycleManager <BidiReadObjectResponse >
119225 bidiResponseContentLifecycleManager ;
120226
121- private final SettableApiFuture <Void > openSignal ;
122-
123- public BlobDescriptorResponseObserver (
124- SettableApiFuture <Void > openSignal ,
227+ private BlobDescriptorResponseObserver (
125228 BlobDescriptorState state ,
126229 Executor exec ,
127230 ResponseContentLifecycleManager <BidiReadObjectResponse >
128231 bidiResponseContentLifecycleManager ) {
129- this .openSignal = openSignal ;
130232 this .state = state ;
131233 this .exec = exec ;
132234 this .bidiResponseContentLifecycleManager = bidiResponseContentLifecycleManager ;
@@ -146,11 +248,9 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
146248 bidiResponseContentLifecycleManager .get (response )) {
147249 if (response .hasMetadata ()) {
148250 state .metadata .set (response .getMetadata ());
149- openSignal .set (null );
150251 }
151252 if (response .hasReadHandle ()) {
152253 state .ref .set (response .getReadHandle ());
153- openSignal .set (null );
154254 }
155255 List <ObjectRangeData > rangeData = response .getObjectDataRangesList ();
156256 if (rangeData .isEmpty ()) {
@@ -180,14 +280,10 @@ protected void onResponseImpl(BidiReadObjectResponse response) {
180280 }
181281
182282 @ Override
183- protected void onErrorImpl (Throwable t ) {
184- openSignal .setException (t );
185- }
283+ protected void onErrorImpl (Throwable t ) {}
186284
187285 @ Override
188- protected void onCompleteImpl () {
189- openSignal .set (null );
190- }
286+ protected void onCompleteImpl () {}
191287 }
192288
193289 @ VisibleForTesting
0 commit comments