Skip to content
This repository was archived by the owner on Feb 24, 2026. It is now read-only.

Commit 37db867

Browse files
authored
Merge 31af7be into d211c76
2 parents d211c76 + 31af7be commit 37db867

File tree

5 files changed

+723
-87
lines changed

5 files changed

+723
-87
lines changed

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/StreamWriter.java

Lines changed: 120 additions & 62 deletions
Original file line numberDiff line numberDiff line change
@@ -50,12 +50,14 @@
5050
import java.util.concurrent.ScheduledFuture;
5151
import java.util.concurrent.TimeUnit;
5252
import java.util.concurrent.atomic.AtomicBoolean;
53+
import java.util.concurrent.atomic.AtomicReference;
5354
import java.util.concurrent.locks.Lock;
5455
import java.util.concurrent.locks.ReentrantLock;
5556
import java.util.logging.Level;
5657
import java.util.logging.Logger;
5758
import java.util.regex.Matcher;
5859
import java.util.regex.Pattern;
60+
import javax.annotation.concurrent.GuardedBy;
5961
import org.threeten.bp.Duration;
6062

6163
/**
@@ -100,25 +102,34 @@ public class StreamWriter implements AutoCloseable {
100102

101103
private final Lock messagesBatchLock;
102104
private final Lock appendAndRefreshAppendLock;
105+
106+
@GuardedBy("appendAndRefreshAppendLock")
103107
private final MessagesBatch messagesBatch;
104108

105109
// Indicates if a stream has some non recoverable exception happened.
106-
private final Lock exceptionLock;
107-
private Throwable streamException;
110+
private AtomicReference<Throwable> streamException;
108111

109112
private BackgroundResource backgroundResources;
110113
private List<BackgroundResource> backgroundResourceList;
111114

112115
private BigQueryWriteClient stub;
113116
BidiStreamingCallable<AppendRowsRequest, AppendRowsResponse> bidiStreamingCallable;
117+
118+
@GuardedBy("appendAndRefreshAppendLock")
114119
ClientStream<AppendRowsRequest> clientStream;
120+
115121
private final AppendResponseObserver responseObserver;
116122

117123
private final ScheduledExecutorService executor;
118124

119-
private final AtomicBoolean shutdown;
125+
@GuardedBy("appendAndRefreshAppendLock")
126+
private boolean shutdown;
127+
120128
private final Waiter messagesWaiter;
121-
private final AtomicBoolean activeAlarm;
129+
130+
@GuardedBy("appendAndRefreshAppendLock")
131+
private boolean activeAlarm;
132+
122133
private ScheduledFuture<?> currentAlarmFuture;
123134

124135
private Integer currentRetries = 0;
@@ -160,9 +171,8 @@ private StreamWriter(Builder builder)
160171
this.messagesBatch = new MessagesBatch(batchingSettings, this.streamName, this);
161172
messagesBatchLock = new ReentrantLock();
162173
appendAndRefreshAppendLock = new ReentrantLock();
163-
activeAlarm = new AtomicBoolean(false);
164-
this.exceptionLock = new ReentrantLock();
165-
this.streamException = null;
174+
activeAlarm = false;
175+
this.streamException = new AtomicReference<Throwable>(null);
166176

167177
executor = builder.executorProvider.getExecutor();
168178
backgroundResourceList = new ArrayList<>();
@@ -185,7 +195,7 @@ private StreamWriter(Builder builder)
185195
stub = builder.client;
186196
}
187197
backgroundResources = new BackgroundResourceAggregation(backgroundResourceList);
188-
shutdown = new AtomicBoolean(false);
198+
shutdown = false;
189199
if (builder.onSchemaUpdateRunnable != null) {
190200
this.onSchemaUpdateRunnable = builder.onSchemaUpdateRunnable;
191201
this.onSchemaUpdateRunnable.setStreamWriter(this);
@@ -216,14 +226,6 @@ OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() {
216226
return this.onSchemaUpdateRunnable;
217227
}
218228

219-
private void setException(Throwable t) {
220-
exceptionLock.lock();
221-
if (this.streamException == null) {
222-
this.streamException = t;
223-
}
224-
exceptionLock.unlock();
225-
}
226-
227229
/**
228230
* Schedules the writing of a message. The write of the message may occur immediately or be
229231
* delayed based on the writer batching options.
@@ -253,27 +255,27 @@ private void setException(Throwable t) {
253255
*/
254256
public ApiFuture<AppendRowsResponse> append(AppendRowsRequest message) {
255257
appendAndRefreshAppendLock.lock();
256-
Preconditions.checkState(!shutdown.get(), "Cannot append on a shut-down writer.");
257-
Preconditions.checkNotNull(message, "Message is null.");
258-
final AppendRequestAndFutureResponse outstandingAppend =
259-
new AppendRequestAndFutureResponse(message);
260-
List<InflightBatch> batchesToSend;
261-
messagesBatchLock.lock();
258+
262259
try {
260+
Preconditions.checkState(!shutdown, "Cannot append on a shut-down writer.");
261+
Preconditions.checkNotNull(message, "Message is null.");
262+
Preconditions.checkState(streamException.get() == null, "Stream already failed.");
263+
final AppendRequestAndFutureResponse outstandingAppend =
264+
new AppendRequestAndFutureResponse(message);
265+
List<InflightBatch> batchesToSend;
263266
batchesToSend = messagesBatch.add(outstandingAppend);
264267
// Setup the next duration based delivery alarm if there are messages batched.
265268
setupAlarm();
266269
if (!batchesToSend.isEmpty()) {
267270
for (final InflightBatch batch : batchesToSend) {
268-
LOG.fine("Scheduling a batch for immediate sending.");
271+
LOG.fine("Scheduling a batch for immediate sending");
269272
writeBatch(batch);
270273
}
271274
}
275+
return outstandingAppend.appendResult;
272276
} finally {
273-
messagesBatchLock.unlock();
274277
appendAndRefreshAppendLock.unlock();
275278
}
276-
return outstandingAppend.appendResult;
277279
}
278280

279281
/**
@@ -285,9 +287,10 @@ public void refreshAppend() throws InterruptedException {
285287
throw new UnimplementedException(null, GrpcStatusCode.of(Status.Code.UNIMPLEMENTED), false);
286288
}
287289

290+
@GuardedBy("appendAndRefreshAppendLock")
288291
private void setupAlarm() {
289292
if (!messagesBatch.isEmpty()) {
290-
if (!activeAlarm.getAndSet(true)) {
293+
if (!activeAlarm) {
291294
long delayThresholdMs = getBatchingSettings().getDelayThreshold().toMillis();
292295
LOG.log(Level.FINE, "Setting up alarm for the next {0} ms.", delayThresholdMs);
293296
currentAlarmFuture =
@@ -296,12 +299,12 @@ private void setupAlarm() {
296299
@Override
297300
public void run() {
298301
LOG.fine("Sending messages based on schedule");
299-
activeAlarm.getAndSet(false);
300-
messagesBatchLock.lock();
302+
appendAndRefreshAppendLock.lock();
303+
activeAlarm = false;
301304
try {
302305
writeBatch(messagesBatch.popBatch());
303306
} finally {
304-
messagesBatchLock.unlock();
307+
appendAndRefreshAppendLock.unlock();
305308
}
306309
}
307310
},
@@ -310,9 +313,8 @@ public void run() {
310313
}
311314
} else if (currentAlarmFuture != null) {
312315
LOG.log(Level.FINER, "Cancelling alarm, no more messages");
313-
if (activeAlarm.getAndSet(false)) {
314-
currentAlarmFuture.cancel(false);
315-
}
316+
currentAlarmFuture.cancel(false);
317+
activeAlarm = false;
316318
}
317319
}
318320

@@ -321,27 +323,41 @@ public void run() {
321323
* wait for the send operations to complete. To wait for messages to send, call {@code get} on the
322324
* futures returned from {@code append}.
323325
*/
326+
@GuardedBy("appendAndRefreshAppendLock")
324327
public void writeAllOutstanding() {
325328
InflightBatch unorderedOutstandingBatch = null;
326-
messagesBatchLock.lock();
327-
try {
328-
if (!messagesBatch.isEmpty()) {
329-
writeBatch(messagesBatch.popBatch());
330-
}
331-
messagesBatch.reset();
332-
} finally {
333-
messagesBatchLock.unlock();
329+
if (!messagesBatch.isEmpty()) {
330+
writeBatch(messagesBatch.popBatch());
334331
}
332+
messagesBatch.reset();
335333
}
336334

335+
@GuardedBy("appendAndRefreshAppendLock")
337336
private void writeBatch(final InflightBatch inflightBatch) {
338337
if (inflightBatch != null) {
339338
AppendRowsRequest request = inflightBatch.getMergedRequest();
340339
try {
340+
appendAndRefreshAppendLock.unlock();
341341
messagesWaiter.acquire(inflightBatch.getByteSize());
342+
appendAndRefreshAppendLock.lock();
343+
if (shutdown || streamException.get() != null) {
344+
appendAndRefreshAppendLock.unlock();
345+
messagesWaiter.release(inflightBatch.getByteSize());
346+
appendAndRefreshAppendLock.lock();
347+
inflightBatch.onFailure(
348+
new AbortedException(
349+
shutdown
350+
? "Stream closed, abort append."
351+
: "Stream has previous errors, abort append.",
352+
null,
353+
GrpcStatusCode.of(Status.Code.ABORTED),
354+
true));
355+
return;
356+
}
342357
responseObserver.addInflightBatch(inflightBatch);
343358
clientStream.send(request);
344359
} catch (FlowController.FlowControlException ex) {
360+
appendAndRefreshAppendLock.lock();
345361
inflightBatch.onFailure(ex);
346362
}
347363
}
@@ -447,9 +463,6 @@ private void onFailure(Throwable t) {
447463
// Error has been set already.
448464
LOG.warning("Ignore " + t.toString() + " since error has already been set");
449465
return;
450-
} else {
451-
LOG.info("Setting " + t.toString() + " on response");
452-
this.streamWriter.setException(t);
453466
}
454467

455468
for (AppendRequestAndFutureResponse request : inflightRequests) {
@@ -511,26 +524,68 @@ public RetrySettings getRetrySettings() {
511524
* pending messages are lost.
512525
*/
513526
protected void shutdown() {
514-
if (shutdown.getAndSet(true)) {
515-
LOG.fine("Already shutdown.");
516-
return;
517-
}
518-
LOG.fine("Shutdown called on writer");
519-
if (currentAlarmFuture != null && activeAlarm.getAndSet(false)) {
520-
currentAlarmFuture.cancel(false);
521-
}
522-
writeAllOutstanding();
527+
appendAndRefreshAppendLock.lock();
523528
try {
524-
synchronized (messagesWaiter) {
529+
if (shutdown) {
530+
LOG.fine("Already shutdown.");
531+
return;
532+
}
533+
shutdown = true;
534+
LOG.info("Shutdown called on writer: " + streamName);
535+
if (currentAlarmFuture != null && activeAlarm) {
536+
currentAlarmFuture.cancel(false);
537+
activeAlarm = false;
538+
}
539+
// Wait for current inflight to drain.
540+
try {
541+
appendAndRefreshAppendLock.unlock();
525542
messagesWaiter.waitComplete(0);
543+
} catch (InterruptedException e) {
544+
LOG.warning("Failed to wait for messages to return " + e.toString());
526545
}
527-
} catch (InterruptedException e) {
528-
LOG.warning("Failed to wait for messages to return " + e.toString());
529-
}
530-
if (clientStream.isSendReady()) {
531-
clientStream.closeSend();
546+
appendAndRefreshAppendLock.lock();
547+
// Try to send out what's left in batch.
548+
if (!messagesBatch.isEmpty()) {
549+
InflightBatch inflightBatch = messagesBatch.popBatch();
550+
AppendRowsRequest request = inflightBatch.getMergedRequest();
551+
if (streamException.get() != null) {
552+
inflightBatch.onFailure(
553+
new AbortedException(
554+
shutdown
555+
? "Stream closed, abort append."
556+
: "Stream has previous errors, abort append.",
557+
null,
558+
GrpcStatusCode.of(Status.Code.ABORTED),
559+
true));
560+
} else {
561+
try {
562+
appendAndRefreshAppendLock.unlock();
563+
messagesWaiter.acquire(inflightBatch.getByteSize());
564+
appendAndRefreshAppendLock.lock();
565+
responseObserver.addInflightBatch(inflightBatch);
566+
clientStream.send(request);
567+
} catch (FlowController.FlowControlException ex) {
568+
appendAndRefreshAppendLock.lock();
569+
LOG.warning(
570+
"Unexpected flow control exception when sending batch leftover: " + ex.toString());
571+
}
572+
}
573+
}
574+
// Close the stream.
575+
try {
576+
appendAndRefreshAppendLock.unlock();
577+
messagesWaiter.waitComplete(0);
578+
} catch (InterruptedException e) {
579+
LOG.warning("Failed to wait for messages to return " + e.toString());
580+
}
581+
appendAndRefreshAppendLock.lock();
582+
if (clientStream.isSendReady()) {
583+
clientStream.closeSend();
584+
}
585+
backgroundResources.shutdown();
586+
} finally {
587+
appendAndRefreshAppendLock.unlock();
532588
}
533-
backgroundResources.shutdown();
534589
}
535590

536591
/**
@@ -815,11 +870,12 @@ public void onStart(StreamController controller) {
815870
}
816871

817872
private void abortInflightRequests(Throwable t) {
873+
LOG.fine("Aborting all inflight requests");
818874
synchronized (this.inflightBatches) {
819875
boolean first_error = true;
820876
while (!this.inflightBatches.isEmpty()) {
821877
InflightBatch inflightBatch = this.inflightBatches.poll();
822-
if (first_error) {
878+
if (first_error || t.getCause().getClass() == AbortedException.class) {
823879
inflightBatch.onFailure(t);
824880
first_error = false;
825881
} else {
@@ -894,7 +950,8 @@ public void onComplete() {
894950

895951
@Override
896952
public void onError(Throwable t) {
897-
LOG.fine("OnError called");
953+
LOG.info("OnError called: " + t.toString());
954+
streamWriter.streamException.set(t);
898955
abortInflightRequests(t);
899956
}
900957
};
@@ -917,6 +974,7 @@ private MessagesBatch(
917974
}
918975

919976
// Get all the messages out in a batch.
977+
@GuardedBy("appendAndRefreshAppendLock")
920978
private InflightBatch popBatch() {
921979
InflightBatch batch =
922980
new InflightBatch(
@@ -958,6 +1016,7 @@ private long getMaxBatchBytes() {
9581016
// The message batch returned could contain the previous batch of messages plus the current
9591017
// message.
9601018
// if the message is too large.
1019+
@GuardedBy("appendAndRefreshAppendLock")
9611020
private List<InflightBatch> add(AppendRequestAndFutureResponse outstandingAppend) {
9621021
List<InflightBatch> batchesToSend = new ArrayList<>();
9631022
// Check if the next message makes the current batch exceed the max batch byte size.
@@ -978,7 +1037,6 @@ && getBatchedBytes() + outstandingAppend.messageSize >= getMaxBatchBytes()) {
9781037
|| getMessagesCount() == batchingSettings.getElementCountThreshold()) {
9791038
batchesToSend.add(popBatch());
9801039
}
981-
9821040
return batchesToSend;
9831041
}
9841042
}

google-cloud-bigquerystorage/src/main/java/com/google/cloud/bigquery/storage/v1beta2/Waiter.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -66,6 +66,7 @@ private void notifyNextAcquires() {
6666

6767
public synchronized void release(long messageSize) throws IllegalStateException {
6868
lock.lock();
69+
LOG.fine("release: " + pendingCount + " to " + (pendingCount - 1));
6970
--pendingCount;
7071
if (pendingCount < 0) {
7172
throw new IllegalStateException("pendingCount cannot be less than 0");
@@ -82,6 +83,7 @@ public synchronized void release(long messageSize) throws IllegalStateException
8283
public void acquire(long messageSize) throws FlowController.FlowControlException {
8384
lock.lock();
8485
try {
86+
LOG.fine("acquire " + pendingCount + " to " + (pendingCount + 1));
8587
if (pendingCount >= countLimit
8688
&& behavior == FlowController.LimitExceededBehavior.ThrowException) {
8789
throw new FlowController.MaxOutstandingElementCountReachedException(countLimit);

google-cloud-bigquerystorage/src/test/java/com/google/cloud/bigquery/storage/v1beta2/FakeBigQueryWrite.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -39,6 +39,10 @@ public List<AbstractMessage> getRequests() {
3939
return new LinkedList<AbstractMessage>(serviceImpl.getCapturedRequests());
4040
}
4141

42+
public void waitForResponseScheduled() throws InterruptedException {
43+
serviceImpl.waitForResponseScheduled();
44+
}
45+
4246
public List<AppendRowsRequest> getAppendRequests() {
4347
return serviceImpl.getCapturedRequests();
4448
}

0 commit comments

Comments
 (0)