Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 9026e4e

Browse files
authored
Merge 71bfe4a into 36322fb
2 parents 36322fb + 71bfe4a commit 9026e4e

2 files changed

Lines changed: 38 additions & 0 deletions

File tree

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -91,6 +91,8 @@ public class StreamWriter implements AutoCloseable {
9191
private final String streamName;
9292
private final String tableName;
9393

94+
private final String traceId;
95+
9496
private final BatchingSettings batchingSettings;
9597
private final RetrySettings retrySettings;
9698
private BigQueryWriteSettings stubSettings;
@@ -151,6 +153,7 @@ private StreamWriter(Builder builder)
151153
tableName = matcher.group(1);
152154
}
153155

156+
this.traceId = builder.traceId;
154157
this.batchingSettings = builder.batchingSettings;
155158
this.retrySettings = builder.retrySettings;
156159
this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this);
@@ -477,6 +480,11 @@ private AppendRowsRequest getMergedRequest() throws IllegalStateException {
477480
"The first message on the connection must have writer schema set");
478481
}
479482
requestBuilder.setWriteStream(streamName);
483+
if (!inflightRequests.get(0).message.getTraceId().isEmpty()) {
484+
requestBuilder.setTraceId(inflightRequests.get(0).message.getTraceId());
485+
} else if (streamWriter.traceId != null) {
486+
requestBuilder.setTraceId(streamWriter.traceId);
487+
}
480488
}
481489
return requestBuilder.setProtoRows(data.build()).build();
482490
}
@@ -660,6 +668,8 @@ public static final class Builder {
660668
private String streamOrTableName;
661669
private String endpoint = BigQueryWriteSettings.getDefaultEndpoint();
662670

671+
private String traceId;
672+
663673
private BigQueryWriteClient client = null;
664674

665675
// Batching options
@@ -814,6 +824,12 @@ public Builder createDefaultStream() {
814824
return this;
815825
}
816826

827+
/** Mark the request as coming from Dataflow. */
828+
public Builder setDataflowTraceId() {
829+
this.traceId = "Dataflow";
830+
return this;
831+
}
832+
817833
/** Builds the {@code StreamWriter}. */
818834
public StreamWriter build() throws IllegalArgumentException, IOException, InterruptedException {
819835
return new StreamWriter(this);

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriterTest.java

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -993,4 +993,26 @@ public void testFlushAllFailed() throws Exception {
993993

994994
writer.close();
995995
}
996+
997+
@Test
998+
public void testDatasetTraceId() throws Exception {
999+
StreamWriter writer =
1000+
getTestStreamWriterBuilder()
1001+
.setBatchingSettings(
1002+
StreamWriter.Builder.DEFAULT_BATCHING_SETTINGS
1003+
.toBuilder()
1004+
.setElementCountThreshold(1L)
1005+
.build())
1006+
.setDataflowTraceId()
1007+
.build();
1008+
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
1009+
testBigQueryWrite.addResponse(AppendRowsResponse.newBuilder().build());
1010+
1011+
ApiFuture<AppendRowsResponse> appendFuture1 = sendTestMessage(writer, new String[] {"A"});
1012+
ApiFuture<AppendRowsResponse> appendFuture2 = sendTestMessage(writer, new String[] {"B"});
1013+
appendFuture1.get();
1014+
appendFuture2.get();
1015+
assertEquals("Dataflow", testBigQueryWrite.getAppendRequests().get(0).getTraceId());
1016+
assertEquals("", testBigQueryWrite.getAppendRequests().get(1).getTraceId());
1017+
}
9961018
}

0 commit comments

Comments
 (0)