5050import java .util .concurrent .ScheduledFuture ;
5151import java .util .concurrent .TimeUnit ;
5252import java .util .concurrent .atomic .AtomicBoolean ;
53+ import java .util .concurrent .atomic .AtomicReference ;
5354import java .util .concurrent .locks .Lock ;
5455import java .util .concurrent .locks .ReentrantLock ;
5556import java .util .logging .Level ;
5657import java .util .logging .Logger ;
5758import java .util .regex .Matcher ;
5859import java .util .regex .Pattern ;
60+ import javax .annotation .concurrent .GuardedBy ;
5961import org .threeten .bp .Duration ;
6062
6163/**
@@ -100,25 +102,34 @@ public class StreamWriter implements AutoCloseable {
100102
101103 private final Lock messagesBatchLock ;
102104 private final Lock appendAndRefreshAppendLock ;
105+
106+ @ GuardedBy ("appendAndRefreshAppendLock" )
103107 private final MessagesBatch messagesBatch ;
104108
105109 // Indicates if a stream has some non recoverable exception happened.
106- private final Lock exceptionLock ;
107- private Throwable streamException ;
110+ private AtomicReference <Throwable > streamException ;
108111
109112 private BackgroundResource backgroundResources ;
110113 private List <BackgroundResource > backgroundResourceList ;
111114
112115 private BigQueryWriteClient stub ;
113116 BidiStreamingCallable <AppendRowsRequest , AppendRowsResponse > bidiStreamingCallable ;
117+
118+ @ GuardedBy ("appendAndRefreshAppendLock" )
114119 ClientStream <AppendRowsRequest > clientStream ;
120+
115121 private final AppendResponseObserver responseObserver ;
116122
117123 private final ScheduledExecutorService executor ;
118124
119- private final AtomicBoolean shutdown ;
125+ @ GuardedBy ("appendAndRefreshAppendLock" )
126+ private boolean shutdown ;
127+
120128 private final Waiter messagesWaiter ;
121- private final AtomicBoolean activeAlarm ;
129+
130+ @ GuardedBy ("appendAndRefreshAppendLock" )
131+ private boolean activeAlarm ;
132+
122133 private ScheduledFuture <?> currentAlarmFuture ;
123134
124135 private Integer currentRetries = 0 ;
@@ -160,9 +171,8 @@ private StreamWriter(Builder builder)
160171 this .messagesBatch = new MessagesBatch (batchingSettings , this .streamName , this );
161172 messagesBatchLock = new ReentrantLock ();
162173 appendAndRefreshAppendLock = new ReentrantLock ();
163- activeAlarm = new AtomicBoolean (false );
164- this .exceptionLock = new ReentrantLock ();
165- this .streamException = null ;
174+ activeAlarm = false ;
175+ this .streamException = new AtomicReference <Throwable >(null );
166176
167177 executor = builder .executorProvider .getExecutor ();
168178 backgroundResourceList = new ArrayList <>();
@@ -185,7 +195,7 @@ private StreamWriter(Builder builder)
185195 stub = builder .client ;
186196 }
187197 backgroundResources = new BackgroundResourceAggregation (backgroundResourceList );
188- shutdown = new AtomicBoolean ( false ) ;
198+ shutdown = false ;
189199 if (builder .onSchemaUpdateRunnable != null ) {
190200 this .onSchemaUpdateRunnable = builder .onSchemaUpdateRunnable ;
191201 this .onSchemaUpdateRunnable .setStreamWriter (this );
@@ -216,14 +226,6 @@ OnSchemaUpdateRunnable getOnSchemaUpdateRunnable() {
216226 return this .onSchemaUpdateRunnable ;
217227 }
218228
219- private void setException (Throwable t ) {
220- exceptionLock .lock ();
221- if (this .streamException == null ) {
222- this .streamException = t ;
223- }
224- exceptionLock .unlock ();
225- }
226-
227229 /**
228230 * Schedules the writing of a message. The write of the message may occur immediately or be
229231 * delayed based on the writer batching options.
@@ -253,27 +255,27 @@ private void setException(Throwable t) {
253255 */
254256 public ApiFuture <AppendRowsResponse > append (AppendRowsRequest message ) {
255257 appendAndRefreshAppendLock .lock ();
256- Preconditions .checkState (!shutdown .get (), "Cannot append on a shut-down writer." );
257- Preconditions .checkNotNull (message , "Message is null." );
258- final AppendRequestAndFutureResponse outstandingAppend =
259- new AppendRequestAndFutureResponse (message );
260- List <InflightBatch > batchesToSend ;
261- messagesBatchLock .lock ();
258+
262259 try {
260+ Preconditions .checkState (!shutdown , "Cannot append on a shut-down writer." );
261+ Preconditions .checkNotNull (message , "Message is null." );
262+ Preconditions .checkState (streamException .get () == null , "Stream already failed." );
263+ final AppendRequestAndFutureResponse outstandingAppend =
264+ new AppendRequestAndFutureResponse (message );
265+ List <InflightBatch > batchesToSend ;
263266 batchesToSend = messagesBatch .add (outstandingAppend );
264267 // Setup the next duration based delivery alarm if there are messages batched.
265268 setupAlarm ();
266269 if (!batchesToSend .isEmpty ()) {
267270 for (final InflightBatch batch : batchesToSend ) {
268- LOG .fine ("Scheduling a batch for immediate sending. " );
271+ LOG .fine ("Scheduling a batch for immediate sending" );
269272 writeBatch (batch );
270273 }
271274 }
275+ return outstandingAppend .appendResult ;
272276 } finally {
273- messagesBatchLock .unlock ();
274277 appendAndRefreshAppendLock .unlock ();
275278 }
276- return outstandingAppend .appendResult ;
277279 }
278280
279281 /**
@@ -285,9 +287,10 @@ public void refreshAppend() throws InterruptedException {
285287 throw new UnimplementedException (null , GrpcStatusCode .of (Status .Code .UNIMPLEMENTED ), false );
286288 }
287289
290+ @ GuardedBy ("appendAndRefreshAppendLock" )
288291 private void setupAlarm () {
289292 if (!messagesBatch .isEmpty ()) {
290- if (!activeAlarm . getAndSet ( true ) ) {
293+ if (!activeAlarm ) {
291294 long delayThresholdMs = getBatchingSettings ().getDelayThreshold ().toMillis ();
292295 LOG .log (Level .FINE , "Setting up alarm for the next {0} ms." , delayThresholdMs );
293296 currentAlarmFuture =
@@ -296,12 +299,12 @@ private void setupAlarm() {
296299 @ Override
297300 public void run () {
298301 LOG .fine ("Sending messages based on schedule" );
299- activeAlarm . getAndSet ( false );
300- messagesBatchLock . lock () ;
302+ appendAndRefreshAppendLock . lock ( );
303+ activeAlarm = false ;
301304 try {
302305 writeBatch (messagesBatch .popBatch ());
303306 } finally {
304- messagesBatchLock .unlock ();
307+ appendAndRefreshAppendLock .unlock ();
305308 }
306309 }
307310 },
@@ -310,9 +313,8 @@ public void run() {
310313 }
311314 } else if (currentAlarmFuture != null ) {
312315 LOG .log (Level .FINER , "Cancelling alarm, no more messages" );
313- if (activeAlarm .getAndSet (false )) {
314- currentAlarmFuture .cancel (false );
315- }
316+ currentAlarmFuture .cancel (false );
317+ activeAlarm = false ;
316318 }
317319 }
318320
@@ -321,27 +323,41 @@ public void run() {
321323 * wait for the send operations to complete. To wait for messages to send, call {@code get} on the
322324 * futures returned from {@code append}.
323325 */
326+ @ GuardedBy ("appendAndRefreshAppendLock" )
324327 public void writeAllOutstanding () {
325328 InflightBatch unorderedOutstandingBatch = null ;
326- messagesBatchLock .lock ();
327- try {
328- if (!messagesBatch .isEmpty ()) {
329- writeBatch (messagesBatch .popBatch ());
330- }
331- messagesBatch .reset ();
332- } finally {
333- messagesBatchLock .unlock ();
329+ if (!messagesBatch .isEmpty ()) {
330+ writeBatch (messagesBatch .popBatch ());
334331 }
332+ messagesBatch .reset ();
335333 }
336334
335+ @ GuardedBy ("appendAndRefreshAppendLock" )
337336 private void writeBatch (final InflightBatch inflightBatch ) {
338337 if (inflightBatch != null ) {
339338 AppendRowsRequest request = inflightBatch .getMergedRequest ();
340339 try {
340+ appendAndRefreshAppendLock .unlock ();
341341 messagesWaiter .acquire (inflightBatch .getByteSize ());
342+ appendAndRefreshAppendLock .lock ();
343+ if (shutdown || streamException .get () != null ) {
344+ appendAndRefreshAppendLock .unlock ();
345+ messagesWaiter .release (inflightBatch .getByteSize ());
346+ appendAndRefreshAppendLock .lock ();
347+ inflightBatch .onFailure (
348+ new AbortedException (
349+ shutdown
350+ ? "Stream closed, abort append."
351+ : "Stream has previous errors, abort append." ,
352+ null ,
353+ GrpcStatusCode .of (Status .Code .ABORTED ),
354+ true ));
355+ return ;
356+ }
342357 responseObserver .addInflightBatch (inflightBatch );
343358 clientStream .send (request );
344359 } catch (FlowController .FlowControlException ex ) {
360+ appendAndRefreshAppendLock .lock ();
345361 inflightBatch .onFailure (ex );
346362 }
347363 }
@@ -447,9 +463,6 @@ private void onFailure(Throwable t) {
447463 // Error has been set already.
448464 LOG .warning ("Ignore " + t .toString () + " since error has already been set" );
449465 return ;
450- } else {
451- LOG .info ("Setting " + t .toString () + " on response" );
452- this .streamWriter .setException (t );
453466 }
454467
455468 for (AppendRequestAndFutureResponse request : inflightRequests ) {
@@ -511,26 +524,68 @@ public RetrySettings getRetrySettings() {
511524 * pending messages are lost.
512525 */
513526 protected void shutdown () {
514- if (shutdown .getAndSet (true )) {
515- LOG .fine ("Already shutdown." );
516- return ;
517- }
518- LOG .fine ("Shutdown called on writer" );
519- if (currentAlarmFuture != null && activeAlarm .getAndSet (false )) {
520- currentAlarmFuture .cancel (false );
521- }
522- writeAllOutstanding ();
527+ appendAndRefreshAppendLock .lock ();
523528 try {
524- synchronized (messagesWaiter ) {
529+ if (shutdown ) {
530+ LOG .fine ("Already shutdown." );
531+ return ;
532+ }
533+ shutdown = true ;
534+ LOG .info ("Shutdown called on writer: " + streamName );
535+ if (currentAlarmFuture != null && activeAlarm ) {
536+ currentAlarmFuture .cancel (false );
537+ activeAlarm = false ;
538+ }
539+ // Wait for current inflight to drain.
540+ try {
541+ appendAndRefreshAppendLock .unlock ();
525542 messagesWaiter .waitComplete (0 );
543+ } catch (InterruptedException e ) {
544+ LOG .warning ("Failed to wait for messages to return " + e .toString ());
526545 }
527- } catch (InterruptedException e ) {
528- LOG .warning ("Failed to wait for messages to return " + e .toString ());
529- }
530- if (clientStream .isSendReady ()) {
531- clientStream .closeSend ();
546+ appendAndRefreshAppendLock .lock ();
547+ // Try to send out what's left in batch.
548+ if (!messagesBatch .isEmpty ()) {
549+ InflightBatch inflightBatch = messagesBatch .popBatch ();
550+ AppendRowsRequest request = inflightBatch .getMergedRequest ();
551+ if (streamException .get () != null ) {
552+ inflightBatch .onFailure (
553+ new AbortedException (
554+ shutdown
555+ ? "Stream closed, abort append."
556+ : "Stream has previous errors, abort append." ,
557+ null ,
558+ GrpcStatusCode .of (Status .Code .ABORTED ),
559+ true ));
560+ } else {
561+ try {
562+ appendAndRefreshAppendLock .unlock ();
563+ messagesWaiter .acquire (inflightBatch .getByteSize ());
564+ appendAndRefreshAppendLock .lock ();
565+ responseObserver .addInflightBatch (inflightBatch );
566+ clientStream .send (request );
567+ } catch (FlowController .FlowControlException ex ) {
568+ appendAndRefreshAppendLock .lock ();
569+ LOG .warning (
570+ "Unexpected flow control exception when sending batch leftover: " + ex .toString ());
571+ }
572+ }
573+ }
574+ // Close the stream.
575+ try {
576+ appendAndRefreshAppendLock .unlock ();
577+ messagesWaiter .waitComplete (0 );
578+ } catch (InterruptedException e ) {
579+ LOG .warning ("Failed to wait for messages to return " + e .toString ());
580+ }
581+ appendAndRefreshAppendLock .lock ();
582+ if (clientStream .isSendReady ()) {
583+ clientStream .closeSend ();
584+ }
585+ backgroundResources .shutdown ();
586+ } finally {
587+ appendAndRefreshAppendLock .unlock ();
532588 }
533- backgroundResources .shutdown ();
534589 }
535590
536591 /**
@@ -815,11 +870,12 @@ public void onStart(StreamController controller) {
815870 }
816871
817872 private void abortInflightRequests (Throwable t ) {
873+ LOG .fine ("Aborting all inflight requests" );
818874 synchronized (this .inflightBatches ) {
819875 boolean first_error = true ;
820876 while (!this .inflightBatches .isEmpty ()) {
821877 InflightBatch inflightBatch = this .inflightBatches .poll ();
822- if (first_error ) {
878+ if (first_error || t . getCause (). getClass () == AbortedException . class ) {
823879 inflightBatch .onFailure (t );
824880 first_error = false ;
825881 } else {
@@ -894,7 +950,8 @@ public void onComplete() {
894950
895951 @ Override
896952 public void onError (Throwable t ) {
897- LOG .fine ("OnError called" );
953+ LOG .info ("OnError called: " + t .toString ());
954+ streamWriter .streamException .set (t );
898955 abortInflightRequests (t );
899956 }
900957 };
@@ -917,6 +974,7 @@ private MessagesBatch(
917974 }
918975
919976 // Get all the messages out in a batch.
977+ @ GuardedBy ("appendAndRefreshAppendLock" )
920978 private InflightBatch popBatch () {
921979 InflightBatch batch =
922980 new InflightBatch (
@@ -958,6 +1016,7 @@ private long getMaxBatchBytes() {
9581016 // The message batch returned could contain the previous batch of messages plus the current
9591017 // message.
9601018 // if the message is too large.
1019+ @ GuardedBy ("appendAndRefreshAppendLock" )
9611020 private List <InflightBatch > add (AppendRequestAndFutureResponse outstandingAppend ) {
9621021 List <InflightBatch > batchesToSend = new ArrayList <>();
9631022 // Check if the next message makes the current batch exceed the max batch byte size.
@@ -978,7 +1037,6 @@ && getBatchedBytes() + outstandingAppend.messageSize >= getMaxBatchBytes()) {
9781037 || getMessagesCount () == batchingSettings .getElementCountThreshold ()) {
9791038 batchesToSend .add (popBatch ());
9801039 }
981-
9821040 return batchesToSend ;
9831041 }
9841042 }
0 commit comments