Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit d1c2ecf

Browse files
authored
Merge a04598a into 94c7848
2 parents 94c7848 + a04598a commit d1c2ecf

File tree

2 files changed

+35
-0
lines changed

2 files changed

+35
-0
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -932,6 +932,7 @@ public void onComplete() {
932932
public void onError(Throwable t) {
933933
LOG.fine("OnError called");
934934
if (streamWriter.shutdown.get()) {
935+
abortInflightRequests(t);
935936
return;
936937
}
937938
InflightBatch inflightBatch = null;

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1015,4 +1015,38 @@ public void testDatasetTraceId() throws Exception {
10151015
assertEquals("Dataflow", testBigQueryWrite.getAppendRequests().get(0).getTraceId());
10161016
assertEquals("", testBigQueryWrite.getAppendRequests().get(1).getTraceId());
10171017
}
1018+
1019+
@Test
1020+
public void testShutdownWithConnectionError() throws Exception {
1021+
StreamWriter writer =
1022+
getTestStreamWriterBuilder()
1023+
.setBatchingSettings(
1024+
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
1025+
.toBuilder()
1026+
.setElementCountThreshold(1L)
1027+
.build())
1028+
.build();
1029+
testBigQueryWrite.addResponse(
1030+
AppendRowsResponse.newBuilder()
1031+
.setAppendResult(
1032+
AppendRowsResponse.AppendResult.newBuilder().setOffset(Int64Value.of(1)).build())
1033+
.build());
1034+
testBigQueryWrite.addException(Status.DATA_LOSS.asException());
1035+
testBigQueryWrite.setResponseDelay(Duration.ofSeconds(10));
1036+
1037+
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
1038+
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
1039+
Thread.sleep(5000L);
1040+
// Move the needle for responses to be sent.
1041+
fakeExecutor.advanceTime(Duration.ofSeconds(20));
1042+
// Shutdown writer immediately and there will be some error happened when flushing the queue.
1043+
writer.shutdown();
1044+
assertEquals(1, appendFuture1.get().getAppendResult().getOffset().getValue());
1045+
try {
1046+
appendFuture2.get();
1047+
fail("Should fail with exception");
1048+
} catch (java.util.concurrent.ExecutionException e) {
1049+
assertEquals("Request aborted due to previous failures", e.getCause().getMessage());
1050+
}
1051+
}
10181052
}

0 commit comments

Comments
 (0)