|
47 | 47 | import java.util.concurrent.TimeoutException; |
48 | 48 | import java.util.concurrent.atomic.AtomicLong; |
49 | 49 | import org.checkerframework.checker.nullness.qual.NonNull; |
| 50 | +import org.checkerframework.checker.nullness.qual.Nullable; |
50 | 51 |
|
51 | 52 | final class GapicUnbufferedReadableByteChannel |
52 | 53 | implements UnbufferedReadableByteChannel, ScatteringByteChannel { |
@@ -258,6 +259,11 @@ ApiFuture<Object> getResult() { |
258 | 259 |
|
259 | 260 | private void ensureStreamOpen() { |
260 | 261 | if (readObjectObserver == null) { |
| 262 | + java.lang.Object peek = queue.peek(); |
| 263 | + if (peek instanceof Throwable || peek == EOF_MARKER) { |
| 264 | + // If our queue has an error or EOF, do not send another request |
| 265 | + return; |
| 266 | + } |
261 | 267 | readObjectObserver = |
262 | 268 | Retrying.run( |
263 | 269 | retryingDeps, |
@@ -326,13 +332,15 @@ protected void onResponseImpl(ReadObjectResponse response) { |
326 | 332 |
|
327 | 333 | @Override |
328 | 334 | protected void onErrorImpl(Throwable t) { |
329 | | - open.setException(t); |
330 | | - if (!alg.shouldRetry(t, null)) { |
331 | | - result.setException(StorageException.coalesce(t)); |
332 | | - } |
333 | 335 | if (t instanceof CancellationException) { |
334 | 336 | cancellation.set(t); |
335 | 337 | } |
| 338 | + if (!open.isDone()) { |
| 339 | + open.setException(t); |
| 340 | + if (!alg.shouldRetry(t, null)) { |
| 341 | + result.setException(StorageException.coalesce(t)); |
| 342 | + } |
| 343 | + } |
336 | 344 | try { |
337 | 345 | queue.offer(t); |
338 | 346 | } catch (InterruptedException e) { |
@@ -369,6 +377,11 @@ public boolean nonEmpty() { |
369 | 377 | return !queue.isEmpty(); |
370 | 378 | } |
371 | 379 |
|
| 380 | + @Nullable |
| 381 | + public T peek() { |
| 382 | + return queue.peek(); |
| 383 | + } |
| 384 | + |
372 | 385 | @NonNull |
373 | 386 | public T poll() throws InterruptedException { |
374 | 387 | return queue.take(); |
|
0 commit comments