@@ -37,17 +37,23 @@ abstract class BlobDescriptorStreamRead implements AutoCloseable, Closeable {
3737 protected final long readId ;
3838 protected final ReadCursor readCursor ;
3939 protected final List <ChildRef > childRefs ;
40+ protected final RetryContext retryContext ;
4041 protected boolean closed ;
4142
42- private BlobDescriptorStreamRead (long readId , ReadCursor readCursor ) {
43- this (readId , readCursor , Collections .synchronizedList (new ArrayList <>()), false );
43+ private BlobDescriptorStreamRead (long readId , ReadCursor readCursor , RetryContext retryContext ) {
44+ this (readId , readCursor , Collections .synchronizedList (new ArrayList <>()), retryContext , false );
4445 }
4546
4647 private BlobDescriptorStreamRead (
47- long readId , ReadCursor readCursor , List <ChildRef > childRefs , boolean closed ) {
48+ long readId ,
49+ ReadCursor readCursor ,
50+ List <ChildRef > childRefs ,
51+ RetryContext retryContext ,
52+ boolean closed ) {
4853 this .readId = readId ;
4954 this .readCursor = readCursor ;
5055 this .childRefs = childRefs ;
56+ this .retryContext = retryContext ;
5157 this .closed = closed ;
5258 }
5359
@@ -61,7 +67,18 @@ ReadCursor getReadCursor() {
6167
6268 abstract void eof () throws IOException ;
6369
64- abstract void fail (Status status ) throws IOException ;
70+ final void fail (Status status ) throws IOException {
71+ io .grpc .Status grpcStatus = io .grpc .Status .fromCodeValue (status .getCode ());
72+ if (!status .getMessage ().isEmpty ()) {
73+ grpcStatus = grpcStatus .withDescription (status .getMessage ());
74+ }
75+ StatusRuntimeException cause = grpcStatus .asRuntimeException ();
76+ ApiException apiException =
77+ ApiExceptionFactory .createException (cause , GrpcStatusCode .of (grpcStatus .getCode ()), false );
78+ fail (apiException );
79+ }
80+
81+ abstract void fail (Throwable t ) throws IOException ;
6582
6683 abstract BlobDescriptorStreamRead withNewReadId (long newReadId );
6784
@@ -76,38 +93,51 @@ final ReadRange makeReadRange() {
7693 @ Override
7794 public void close () throws IOException {
7895 if (!closed ) {
96+ retryContext .reset ();
7997 closed = true ;
8098 GrpcUtils .closeAll (childRefs );
8199 }
82100 }
83101
84102 static AccumulatingRead <byte []> createByteArrayAccumulatingRead (
85- long readId , ReadCursor readCursor , SettableApiFuture <byte []> complete ) {
86- return new ByteArrayAccumulatingRead (readId , readCursor , complete );
103+ long readId ,
104+ ReadCursor readCursor ,
105+ RetryContext retryContext ,
106+ SettableApiFuture <byte []> complete ) {
107+ return new ByteArrayAccumulatingRead (readId , readCursor , retryContext , complete );
87108 }
88109
89110 static ZeroCopyByteStringAccumulatingRead createZeroCopyByteStringAccumulatingRead (
90- long readId , ReadCursor readCursor , SettableApiFuture <DisposableByteString > complete ) {
91- return new ZeroCopyByteStringAccumulatingRead (readId , readCursor , complete );
111+ long readId ,
112+ ReadCursor readCursor ,
113+ SettableApiFuture <DisposableByteString > complete ,
114+ RetryContext retryContext ) {
115+ return new ZeroCopyByteStringAccumulatingRead (readId , readCursor , retryContext , complete );
92116 }
93117
118+ public abstract void recordError (Throwable e );
119+
94120 /** Base class of a read that will accumulate before completing by resolving a future */
95121 abstract static class AccumulatingRead <Result > extends BlobDescriptorStreamRead {
96122 protected final SettableApiFuture <Result > complete ;
97123
98124 private AccumulatingRead (
99- long readId , ReadCursor readCursor , SettableApiFuture <Result > complete ) {
100- super (readId , readCursor );
125+ long readId ,
126+ ReadCursor readCursor ,
127+ RetryContext retryContext ,
128+ SettableApiFuture <Result > complete ) {
129+ super (readId , readCursor , retryContext );
101130 this .complete = complete ;
102131 }
103132
104133 private AccumulatingRead (
105134 long readId ,
106135 ReadCursor readCursor ,
107136 List <ChildRef > childRefs ,
137+ RetryContext retryContext ,
108138 boolean closed ,
109139 SettableApiFuture <Result > complete ) {
110- super (readId , readCursor , childRefs , closed );
140+ super (readId , readCursor , childRefs , retryContext , closed );
111141 this .complete = complete ;
112142 }
113143
@@ -117,16 +147,17 @@ boolean acceptingBytes() {
117147 }
118148
119149 @ Override
120- void fail (Status status ) throws IOException {
121- io .grpc .Status grpcStatus = io .grpc .Status .fromCodeValue (status .getCode ());
122- if (!status .getMessage ().isEmpty ()) {
123- grpcStatus = grpcStatus .withDescription (status .getMessage ());
150+ void fail (Throwable t ) throws IOException {
151+ try {
152+ complete .setException (t );
153+ } finally {
154+ close ();
124155 }
125- StatusRuntimeException cause = grpcStatus . asRuntimeException ();
126- ApiException apiException =
127- ApiExceptionFactory . createException (
128- cause , GrpcStatusCode . of ( grpcStatus . getCode ()), false );
129- complete . setException ( apiException );
156+ }
157+
158+ @ Override
159+ public void recordError ( Throwable e ) {
160+ retryContext . recordError ( e );
130161 }
131162 }
132163
@@ -135,41 +166,51 @@ void fail(Status status) throws IOException {
135166 * java.nio.channels.ReadableByteChannel})
136167 */
137168 abstract static class StreamingRead extends BlobDescriptorStreamRead {
138- private StreamingRead (long readId , long readOffset , long readLimit ) {
139- super (readId , new ReadCursor (readOffset , readOffset + readLimit ));
169+ private StreamingRead (long readId , long readOffset , long readLimit , RetryContext retryContext ) {
170+ super (readId , new ReadCursor (readOffset , readOffset + readLimit ), retryContext );
140171 }
141172
142- public StreamingRead (
143- long readId , ReadCursor readCursor , List <ChildRef > childRefs , boolean closed ) {
144- super (readId , readCursor , childRefs , closed );
173+ private StreamingRead (
174+ long readId ,
175+ ReadCursor readCursor ,
176+ List <ChildRef > childRefs ,
177+ RetryContext retryContext ,
178+ boolean closed ) {
179+ super (readId , readCursor , childRefs , retryContext , closed );
145180 }
146181 }
147182
148183 static final class ByteArrayAccumulatingRead extends AccumulatingRead <byte []> {
149184
150185 private ByteArrayAccumulatingRead (
151- long readId , ReadCursor readCursor , SettableApiFuture <byte []> complete ) {
152- super (readId , readCursor , complete );
186+ long readId ,
187+ ReadCursor readCursor ,
188+ RetryContext retryContext ,
189+ SettableApiFuture <byte []> complete ) {
190+ super (readId , readCursor , retryContext , complete );
153191 }
154192
155193 private ByteArrayAccumulatingRead (
156194 long readId ,
157195 ReadCursor readCursor ,
158196 List <ChildRef > childRefs ,
197+ RetryContext retryContext ,
159198 boolean closed ,
160199 SettableApiFuture <byte []> complete ) {
161- super (readId , readCursor , childRefs , closed , complete );
200+ super (readId , readCursor , childRefs , retryContext , closed , complete );
162201 }
163202
164203 @ Override
165204 void accept (ChildRef childRef ) throws IOException {
205+ retryContext .reset ();
166206 int size = childRef .byteString ().size ();
167207 childRefs .add (childRef );
168208 readCursor .advance (size );
169209 }
170210
171211 @ Override
172212 void eof () throws IOException {
213+ retryContext .reset ();
173214 try {
174215 ByteString base = ByteString .empty ();
175216 for (ChildRef ref : childRefs ) {
@@ -183,7 +224,8 @@ void eof() throws IOException {
183224
184225 @ Override
185226 ByteArrayAccumulatingRead withNewReadId (long newReadId ) {
186- return new ByteArrayAccumulatingRead (newReadId , readCursor , childRefs , closed , complete );
227+ return new ByteArrayAccumulatingRead (
228+ newReadId , readCursor , childRefs , retryContext , closed , complete );
187229 }
188230 }
189231
@@ -193,8 +235,23 @@ static final class ZeroCopyByteStringAccumulatingRead
193235 private volatile ByteString byteString ;
194236
195237 private ZeroCopyByteStringAccumulatingRead (
196- long readId , ReadCursor readCursor , SettableApiFuture <DisposableByteString > complete ) {
197- super (readId , readCursor , complete );
238+ long readId ,
239+ ReadCursor readCursor ,
240+ RetryContext retryContext ,
241+ SettableApiFuture <DisposableByteString > complete ) {
242+ super (readId , readCursor , retryContext , complete );
243+ }
244+
245+ public ZeroCopyByteStringAccumulatingRead (
246+ long readId ,
247+ ReadCursor readCursor ,
248+ List <ChildRef > childRefs ,
249+ RetryContext retryContext ,
250+ boolean closed ,
251+ SettableApiFuture <DisposableByteString > complete ,
252+ ByteString byteString ) {
253+ super (readId , readCursor , childRefs , retryContext , closed , complete );
254+ this .byteString = byteString ;
198255 }
199256
200257 @ Override
@@ -204,13 +261,15 @@ public ByteString byteString() {
204261
205262 @ Override
206263 void accept (ChildRef childRef ) throws IOException {
264+ retryContext .reset ();
207265 int size = childRef .byteString ().size ();
208266 childRefs .add (childRef );
209267 readCursor .advance (size );
210268 }
211269
212270 @ Override
213271 void eof () throws IOException {
272+ retryContext .reset ();
214273 ByteString base = ByteString .empty ();
215274 for (ChildRef ref : childRefs ) {
216275 base = base .concat (ref .byteString ());
@@ -221,7 +280,8 @@ void eof() throws IOException {
221280
222281 @ Override
223282 ZeroCopyByteStringAccumulatingRead withNewReadId (long newReadId ) {
224- return new ZeroCopyByteStringAccumulatingRead (newReadId , readCursor , complete );
283+ return new ZeroCopyByteStringAccumulatingRead (
284+ newReadId , readCursor , childRefs , retryContext , closed , complete , byteString );
225285 }
226286 }
227287}
0 commit comments