Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,7 @@ protected void onError(Status status, Metadata trailers) {
String channelId = ChannelPool.extractIdentifier(trailers);
// Non retry scenario
if (!retryOptions.enableRetries()
|| !retryOptions.isRetryable(code)
|| !isStatusRetryable(status)
// Unauthenticated is special because the request never made it to
// to the server, so all requests are retryable
|| !(isRequestRetryable() || code == Code.UNAUTHENTICATED || code == Code.UNAVAILABLE)) {
Expand Down Expand Up @@ -263,6 +263,10 @@ protected boolean isRequestRetryable() {
return rpc.isRetryable(getRetryRequest());
}

protected boolean isStatusRetryable(Status status) {
return retryOptions.isRetryable(status.getCode());
}

protected void setException(Exception exception) {
completionFuture.setException(exception);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@
import com.google.protobuf.ByteString;
import io.grpc.Metadata;
import io.grpc.Status;
import io.grpc.Status.Code;
import io.grpc.stub.ClientResponseObserver;
import io.grpc.stub.StreamObserver;
import io.opencensus.trace.AttributeValue;
Expand Down Expand Up @@ -243,6 +244,24 @@ protected boolean isRequestRetryable() {
return true;
}

/** Read rows requests are retryable if the status is a rst stream error. */
@Override
protected boolean isStatusRetryable(Status status) {
return retryOptions.isRetryable(status.getCode()) || isRstStream(status);
}

private boolean isRstStream(Status status) {
if (status.getCode() == Code.INTERNAL) {
String description = status.getDescription();
if (description != null) {
return description.contains("Received Rst stream")
|| description.contains("RST_STREAM closed stream")
|| description.contains("Received RST_STREAM");
}
}
return false;
}

/** {@inheritDoc} */
@Override
protected boolean onOK(Metadata trailers) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -513,6 +513,26 @@ public Void answer(InvocationOnMock invocation) throws Throwable {
Assert.assertTrue(underTest.getRowMerger().isComplete());
}

@Test
public void testRetryRstStream() throws Exception {
RetryingReadRowsOperation underTest = createOperation();
start(underTest);

ByteString key1 = ByteString.copyFrom("SomeKey1", "UTF-8");
ByteString key2 = ByteString.copyFrom("SomeKey2", "UTF-8");
underTest.onMessage(buildResponse(key1));
underTest.onClose(
Status.INTERNAL.withDescription("HTTP/2 error code: INTERNAL_ERROR\nReceived Rst stream"),
null);
Assert.assertFalse(underTest.getRowMerger().isComplete());
underTest.onMessage(buildResponse(key2));
verify(mockFlatRowObserver, times(2)).onNext(any(FlatRow.class));
checkRetryRequest(underTest, key2, 8);
verify(mockClientCall, times(4)).request(eq(1));

finishOK(underTest, 1);
}

protected void performTimeout(RetryingReadRowsOperation underTest) {
underTest.onClose(
Status.CANCELLED.withCause(
Expand Down