1717package com .google .cloud .storage ;
1818
1919import com .google .api .core .ApiFuture ;
20+ import com .google .cloud .storage .BufferedWritableByteChannelSession .BufferedWritableByteChannel ;
2021import java .io .IOException ;
2122import java .nio .ByteBuffer ;
2223import java .util .concurrent .ExecutionException ;
24+ import java .util .concurrent .locks .ReentrantLock ;
2325
2426public final class AppendableBlobUpload implements AutoCloseable {
25- private final BufferedWritableByteChannelSession . BufferedWritableByteChannel channel ;
27+ private final AppendableObjectBufferedWritableByteChannel channel ;
2628 private final ApiFuture <BlobInfo > result ;
2729
2830 private AppendableBlobUpload (BlobInfo blob , BlobWriteSession session ) throws IOException {
29- channel = (BufferedWritableByteChannelSession . BufferedWritableByteChannel ) (session .open ());
31+ channel = (AppendableObjectBufferedWritableByteChannel ) (session .open ());
3032 result = session .getResult ();
3133 }
3234
@@ -36,9 +38,7 @@ static AppendableBlobUpload createNewAppendableBlob(BlobInfo blob, BlobWriteSess
3638 }
3739
3840 public BlobInfo finalizeUpload () throws IOException , ExecutionException , InterruptedException {
39- ((GapicBidiUnbufferedWritableByteChannel ) channel .getChannel ()).setFinalFlush ();
40- channel .flush ();
41- ((GapicBidiUnbufferedWritableByteChannel ) channel .getChannel ()).finalizeWrite ();
41+ channel .finalizeWrite ();
4242 close ();
4343 return result .get ();
4444 }
@@ -53,4 +53,78 @@ public void close() throws IOException {
5353 channel .close ();
5454 }
5555 }
56+
57+ /**
58+ * This class extends BufferedWritableByteChannel to handle a special case for Appendable writes,
59+ * namely closing the stream without finalizing the write. It adds the {@code finalizeWrite}
60+ * method, which must be manually called to finalize the write. This couldn't be accomplished with
61+ * the base BufferedWritableByteChannel class because it only has a close() method, which it
62+ * assumes should finalize the write before the close. It also re-implements
63+ * SynchronizedBufferedWritableByteChannel to avoid needing to make a decorator class for it and
64+ * wrap it over this one.
65+ */
66+ static final class AppendableObjectBufferedWritableByteChannel
67+ implements BufferedWritableByteChannel {
68+ private final BufferedWritableByteChannel buffered ;
69+ private final GapicBidiUnbufferedAppendableWritableByteChannel unbuffered ;
70+ private final ReentrantLock lock ;
71+
72+ AppendableObjectBufferedWritableByteChannel (
73+ BufferedWritableByteChannel buffered ,
74+ GapicBidiUnbufferedAppendableWritableByteChannel unbuffered ) {
75+ this .buffered = buffered ;
76+ this .unbuffered = unbuffered ;
77+ lock = new ReentrantLock ();
78+ }
79+
80+ @ Override
81+ public void flush () throws IOException {
82+ lock .lock ();
83+ try {
84+ buffered .flush ();
85+ } finally {
86+ lock .unlock ();
87+ }
88+ }
89+
90+ @ Override
91+ public int write (ByteBuffer src ) throws IOException {
92+ lock .lock ();
93+ try {
94+ return buffered .write (src );
95+ } finally {
96+ lock .unlock ();
97+ }
98+ }
99+
100+ @ Override
101+ public boolean isOpen () {
102+ lock .lock ();
103+ try {
104+ return buffered .isOpen ();
105+ } finally {
106+ lock .unlock ();
107+ }
108+ }
109+
110+ @ Override
111+ public void close () throws IOException {
112+ lock .lock ();
113+ try {
114+ buffered .close ();
115+ } finally {
116+ lock .unlock ();
117+ }
118+ }
119+
120+ public void finalizeWrite () throws IOException {
121+ lock .lock ();
122+ try {
123+ buffered .flush ();
124+ unbuffered .finalizeWrite ();
125+ } finally {
126+ lock .unlock ();
127+ }
128+ }
129+ }
56130}
0 commit comments