Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit c54bcfe

Browse files
authored
feat: StreamWriterV2 will handle schema/streamName attachment (#877)
* feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones. * feat: StreamWriterV2 will attach stream name in the first request in the connection and remove stream name and schema in the following ones
1 parent c2796be commit c54bcfe

File tree

2 files changed

+32
-1
lines changed

2 files changed

+32
-1
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2.java

Lines changed: 18 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -296,6 +296,7 @@ public void close() {
296296
* It takes requests from waiting queue and sends them to server.
297297
*/
298298
private void appendLoop() {
299+
boolean isFirstRequestInConnection = true;
299300
Deque<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();
300301
while (!waitingQueueDrained()) {
301302
this.lock.lock();
@@ -322,7 +323,11 @@ private void appendLoop() {
322323

323324
// TODO: Add reconnection here.
324325
while (!localQueue.isEmpty()) {
325-
this.streamConnection.send(localQueue.pollFirst().message);
326+
AppendRowsRequest preparedRequest =
327+
prepareRequestBasedOnPosition(
328+
localQueue.pollFirst().message, isFirstRequestInConnection);
329+
this.streamConnection.send(preparedRequest);
330+
isFirstRequestInConnection = false;
326331
}
327332
}
328333

@@ -371,6 +376,18 @@ private void waitForDoneCallback() {
371376
}
372377
}
373378

379+
private AppendRowsRequest prepareRequestBasedOnPosition(
380+
AppendRowsRequest original, boolean isFirstRequest) {
381+
AppendRowsRequest.Builder requestBuilder = original.toBuilder();
382+
if (isFirstRequest) {
383+
requestBuilder.setWriteStream(this.streamName);
384+
} else {
385+
requestBuilder.clearWriteStream();
386+
requestBuilder.getProtoRowsBuilder().clearWriterSchema();
387+
}
388+
return requestBuilder.build();
389+
}
390+
374391
private void cleanupInflightRequests() {
375392
Throwable finalStatus;
376393
Deque<AppendRequestAndResponse> localQueue = new LinkedList<AppendRequestAndResponse>();

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterV2Test.java

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@
1616
package com.google.cloud.bigquery.storage.v1beta2;
1717

1818
import static org.junit.Assert.assertEquals;
19+
import static org.junit.Assert.assertFalse;
1920
import static org.junit.Assert.assertThrows;
2021
import static org.junit.Assert.assertTrue;
2122

@@ -198,6 +199,19 @@ public void testAppendSuccess() throws Exception {
198199
}
199200
assertEquals(appendCount, testBigQueryWrite.getAppendRequests().size());
200201

202+
for (int i = 0; i < appendCount; i++) {
203+
AppendRowsRequest serverRequest = testBigQueryWrite.getAppendRequests().get(i);
204+
if (i == 0) {
205+
// First request received by server should have schema and stream name.
206+
assertTrue(serverRequest.getProtoRows().hasWriterSchema());
207+
assertEquals(serverRequest.getWriteStream(), TEST_STREAM);
208+
} else {
209+
// Following request should not have schema and stream name.
210+
assertFalse(serverRequest.getProtoRows().hasWriterSchema());
211+
assertEquals(serverRequest.getWriteStream(), "");
212+
}
213+
}
214+
201215
writer.close();
202216
}
203217

0 commit comments

Comments
 (0)