Skip to content

Commit f437c0a

Browse files
committed
chore: update BlobDescriptorStreamRead to track whether it is ready to be sent as a request
Also update state tracking to track if tombstoned, like when withNewReadId is called
1 parent ba70fd8 commit f437c0a

File tree

2 files changed

+21
-6
lines changed

2 files changed

+21
-6
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorState.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -116,6 +116,7 @@ BlobDescriptorStreamRead assignNewReadId(long oldReadId) {
116116
List<ReadRange> getOutstandingReads() {
117117
synchronized (this) {
118118
return outstandingReads.values().stream()
119+
.filter(BlobDescriptorStreamRead::readyToSend)
119120
.map(BlobDescriptorStreamRead::makeReadRange)
120121
.collect(ImmutableList.toImmutableList());
121122
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobDescriptorStreamRead.java

Lines changed: 20 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -41,6 +41,7 @@ abstract class BlobDescriptorStreamRead implements AutoCloseable, Closeable {
4141
protected final List<ChildRef> childRefs;
4242
protected final RetryContext retryContext;
4343
protected boolean closed;
44+
protected boolean tombstoned;
4445

4546
private BlobDescriptorStreamRead(long readId, ReadCursor readCursor, RetryContext retryContext) {
4647
this(readId, readCursor, Collections.synchronizedList(new ArrayList<>()), retryContext, false);
@@ -57,6 +58,7 @@ private BlobDescriptorStreamRead(
5758
this.childRefs = childRefs;
5859
this.retryContext = retryContext;
5960
this.closed = closed;
61+
this.tombstoned = false;
6062
}
6163

6264
ReadCursor getReadCursor() {
@@ -110,7 +112,9 @@ public void close() throws IOException {
110112
}
111113
}
112114

113-
abstract void recordError(Throwable t, OnSuccess onSuccess, OnFailure onFailure);
115+
abstract <T extends Throwable> void recordError(T t, OnSuccess onSuccess, OnFailure<T> onFailure);
116+
117+
public abstract boolean readyToSend();
114118

115119
static AccumulatingRead<byte[]> createByteArrayAccumulatingRead(
116120
long readId,
@@ -154,22 +158,30 @@ private AccumulatingRead(
154158

155159
@Override
156160
boolean acceptingBytes() {
157-
return !complete.isDone() && readCursor.hasRemaining();
161+
return !complete.isDone() && !tombstoned && readCursor.hasRemaining();
158162
}
159163

160164
@Override
161-
void fail(Throwable t) throws IOException {
165+
void fail(Throwable t) {
162166
try {
163-
complete.setException(t);
164-
} finally {
167+
tombstoned = true;
165168
close();
169+
} catch (IOException e) {
170+
t.addSuppressed(t);
171+
} finally {
172+
complete.setException(t);
166173
}
167174
}
168175

169176
@Override
170-
void recordError(Throwable t, OnSuccess onSuccess, OnFailure onFailure) {
177+
<T extends Throwable> void recordError(T t, OnSuccess onSuccess, OnFailure<T> onFailure) {
171178
retryContext.recordError(t, onSuccess, onFailure);
172179
}
180+
181+
@Override
182+
public boolean readyToSend() {
183+
return !tombstoned && !retryContext.inBackoff();
184+
}
173185
}
174186

175187
/**
@@ -235,6 +247,7 @@ void eof() throws IOException {
235247

236248
@Override
237249
ByteArrayAccumulatingRead withNewReadId(long newReadId) {
250+
this.tombstoned = true;
238251
return new ByteArrayAccumulatingRead(
239252
newReadId, readCursor, childRefs, retryContext, closed, complete);
240253
}
@@ -291,6 +304,7 @@ void eof() throws IOException {
291304

292305
@Override
293306
ZeroCopyByteStringAccumulatingRead withNewReadId(long newReadId) {
307+
this.tombstoned = true;
294308
return new ZeroCopyByteStringAccumulatingRead(
295309
newReadId, readCursor, childRefs, retryContext, closed, complete, byteString);
296310
}

0 commit comments

Comments
 (0)