Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 4677d7b

Browse files
authored
feat: StreamWriterV2 sets exception for response with error (#884)
1 parent 8e2ab01 commit 4677d7b

File tree

2 files changed

+38
-2
lines changed

2 files changed

+38
-2
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -401,7 +401,15 @@ private void requestCallback(AppendRowsResponse response) {
401401
} finally {
402402
this.lock.unlock();
403403
}
404-
requestWrapper.appendResult.set(response);
404+
if (response.hasError()) {
405+
StatusRuntimeException exception =
406+
new StatusRuntimeException(
407+
Status.fromCodeValue(response.getError().getCode())
408+
.withDescription(response.getError().getMessage()));
409+
requestWrapper.appendResult.setException(exception);
410+
} else {
411+
requestWrapper.appendResult.set(response);
412+
}
405413
}
406414

407415
private void doneCallback(Throwable finalStatus) {

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java

Lines changed: 29 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -122,6 +122,12 @@ private AppendRowsResponse createAppendResponse(long offset) {
122122
.build();
123123
}
124124

125+
private AppendRowsResponse createAppendResponseWithError(Status.Code code, String message) {
126+
return AppendRowsResponse.newBuilder()
127+
.setError(com.google.rpc.Status.newBuilder().setCode(code.value()).setMessage(message))
128+
.build();
129+
}
130+
125131
private ApiFuture<AppendRowsResponse> sendTestMessage(StreamWriterV2 writer, String[] messages) {
126132
return writer.append(createAppendRequest(messages, -1));
127133
}
@@ -196,7 +202,7 @@ public void testAppendSuccess() throws Exception {
196202
}
197203

198204
@Test
199-
public void testAppendSuccessAndError() throws Exception {
205+
public void testAppendSuccessAndConnectionError() throws Exception {
200206
StreamWriterV2 writer = getTestStreamWriterV2();
201207
testBigQueryWrite.addResponse(createAppendResponse(0));
202208
testBigQueryWrite.addException(Status.INTERNAL.asException());
@@ -211,6 +217,28 @@ public void testAppendSuccessAndError() throws Exception {
211217
writer.close();
212218
}
213219

220+
@Test
221+
public void testAppendSuccessAndInStreamError() throws Exception {
222+
StreamWriterV2 writer = getTestStreamWriterV2();
223+
testBigQueryWrite.addResponse(createAppendResponse(0));
224+
testBigQueryWrite.addResponse(
225+
createAppendResponseWithError(Status.INVALID_ARGUMENT.getCode(), "test message"));
226+
testBigQueryWrite.addResponse(createAppendResponse(1));
227+
228+
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
229+
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
230+
ApiFuture<AppendRowsResponse> appendFuture3 = sendTestMessage(writer, new String[] {"C"});
231+
232+
assertEquals(0, appendFuture1.get().getAppendResult().getOffset().getValue());
233+
StatusRuntimeException actualError =
234+
assertFutureException(StatusRuntimeException.class, appendFuture2);
235+
assertEquals(Status.Code.INVALID_ARGUMENT, actualError.getStatus().getCode());
236+
assertEquals("test message", actualError.getStatus().getDescription());
237+
assertEquals(1, appendFuture3.get().getAppendResult().getOffset().getValue());
238+
239+
writer.close();
240+
}
241+
214242
@Test
215243
public void longIdleBetweenAppends() throws Exception {
216244
StreamWriterV2 writer = getTestStreamWriterV2();

0 commit comments

Comments
 (0)