@@ -41,6 +41,7 @@ abstract class BlobDescriptorStreamRead implements AutoCloseable, Closeable {
4141 protected final List <ChildRef > childRefs ;
4242 protected final RetryContext retryContext ;
4343 protected boolean closed ;
44+ protected boolean tombstoned ;
4445
4546 private BlobDescriptorStreamRead (long readId , ReadCursor readCursor , RetryContext retryContext ) {
4647 this (readId , readCursor , Collections .synchronizedList (new ArrayList <>()), retryContext , false );
@@ -57,6 +58,7 @@ private BlobDescriptorStreamRead(
5758 this .childRefs = childRefs ;
5859 this .retryContext = retryContext ;
5960 this .closed = closed ;
61+ this .tombstoned = false ;
6062 }
6163
6264 ReadCursor getReadCursor () {
@@ -110,7 +112,9 @@ public void close() throws IOException {
110112 }
111113 }
112114
113- abstract void recordError (Throwable t , OnSuccess onSuccess , OnFailure onFailure );
115+ abstract <T extends Throwable > void recordError (T t , OnSuccess onSuccess , OnFailure <T > onFailure );
116+
117+ public abstract boolean readyToSend ();
114118
115119 static AccumulatingRead <byte []> createByteArrayAccumulatingRead (
116120 long readId ,
@@ -154,22 +158,30 @@ private AccumulatingRead(
154158
155159 @ Override
156160 boolean acceptingBytes () {
157- return !complete .isDone () && readCursor .hasRemaining ();
161+ return !complete .isDone () && ! tombstoned && readCursor .hasRemaining ();
158162 }
159163
160164 @ Override
161- void fail (Throwable t ) throws IOException {
165+ void fail (Throwable t ) {
162166 try {
163- complete .setException (t );
164- } finally {
167+ tombstoned = true ;
165168 close ();
169+ } catch (IOException e ) {
170+ t .addSuppressed (t );
171+ } finally {
172+ complete .setException (t );
166173 }
167174 }
168175
169176 @ Override
170- void recordError (Throwable t , OnSuccess onSuccess , OnFailure onFailure ) {
177+ < T extends Throwable > void recordError (T t , OnSuccess onSuccess , OnFailure < T > onFailure ) {
171178 retryContext .recordError (t , onSuccess , onFailure );
172179 }
180+
181+ @ Override
182+ public boolean readyToSend () {
183+ return !tombstoned && !retryContext .inBackoff ();
184+ }
173185 }
174186
175187 /**
@@ -235,6 +247,7 @@ void eof() throws IOException {
235247
236248 @ Override
237249 ByteArrayAccumulatingRead withNewReadId (long newReadId ) {
250+ this .tombstoned = true ;
238251 return new ByteArrayAccumulatingRead (
239252 newReadId , readCursor , childRefs , retryContext , closed , complete );
240253 }
@@ -291,6 +304,7 @@ void eof() throws IOException {
291304
292305 @ Override
293306 ZeroCopyByteStringAccumulatingRead withNewReadId (long newReadId ) {
307+ this .tombstoned = true ;
294308 return new ZeroCopyByteStringAccumulatingRead (
295309 newReadId , readCursor , childRefs , retryContext , closed , complete , byteString );
296310 }
0 commit comments