Skip to content

Commit 8453281

Browse files
JesseLovelaceBenWhitehead
authored andcommitted
chore: refactor appendable blob uploads
1 parent 4170803 commit 8453281

11 files changed

+613
-99
lines changed

google-cloud-storage/clirr-ignored-differences.xml

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -127,12 +127,6 @@
127127
<method>com.google.api.core.ApiFuture blobReadSession(com.google.cloud.storage.BlobId, com.google.cloud.storage.Storage$BlobSourceOption[])</method>
128128
</difference>
129129

130-
<difference>
131-
<differenceType>7012</differenceType>
132-
<className>com/google/cloud/storage/BufferedWritableByteChannelSession$BufferedWritableByteChannel</className>
133-
<method>com.google.cloud.storage.UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel getChannel()</method>
134-
</difference>
135-
136130
<difference>
137131
<differenceType>7012</differenceType>
138132
<className>com/google/cloud/storage/Storage</className>

google-cloud-storage/src/main/java/com/google/cloud/storage/AppendableBlobUpload.java

Lines changed: 79 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -17,16 +17,18 @@
1717
package com.google.cloud.storage;
1818

1919
import com.google.api.core.ApiFuture;
20+
import com.google.cloud.storage.BufferedWritableByteChannelSession.BufferedWritableByteChannel;
2021
import java.io.IOException;
2122
import java.nio.ByteBuffer;
2223
import java.util.concurrent.ExecutionException;
24+
import java.util.concurrent.locks.ReentrantLock;
2325

2426
public final class AppendableBlobUpload implements AutoCloseable {
25-
private final BufferedWritableByteChannelSession.BufferedWritableByteChannel channel;
27+
private final AppendableObjectBufferedWritableByteChannel channel;
2628
private final ApiFuture<BlobInfo> result;
2729

2830
private AppendableBlobUpload(BlobInfo blob, BlobWriteSession session) throws IOException {
29-
channel = (BufferedWritableByteChannelSession.BufferedWritableByteChannel) (session.open());
31+
channel = (AppendableObjectBufferedWritableByteChannel) (session.open());
3032
result = session.getResult();
3133
}
3234

@@ -36,9 +38,7 @@ static AppendableBlobUpload createNewAppendableBlob(BlobInfo blob, BlobWriteSess
3638
}
3739

3840
public BlobInfo finalizeUpload() throws IOException, ExecutionException, InterruptedException {
39-
((GapicBidiUnbufferedWritableByteChannel) channel.getChannel()).setFinalFlush();
40-
channel.flush();
41-
((GapicBidiUnbufferedWritableByteChannel) channel.getChannel()).finalizeWrite();
41+
channel.finalizeWrite();
4242
close();
4343
return result.get();
4444
}
@@ -53,4 +53,78 @@ public void close() throws IOException {
5353
channel.close();
5454
}
5555
}
56+
57+
/**
58+
* This class extends BufferedWritableByteChannel to handle a special case for Appendable writes,
59+
* namely closing the stream without finalizing the write. It adds the {@code finalizeWrite}
60+
* method, which must be manually called to finalize the write. This couldn't be accomplished with
61+
* the base BufferedWritableByteChannel class because it only has a close() method, which it
62+
* assumes should finalize the write before the close. It also re-implements
63+
* SynchronizedBufferedWritableByteChannel to avoid needing to make a decorator class for it and
64+
* wrap it over this one.
65+
*/
66+
static final class AppendableObjectBufferedWritableByteChannel
67+
implements BufferedWritableByteChannel {
68+
private final BufferedWritableByteChannel buffered;
69+
private final GapicBidiUnbufferedAppendableWritableByteChannel unbuffered;
70+
private final ReentrantLock lock;
71+
72+
AppendableObjectBufferedWritableByteChannel(
73+
BufferedWritableByteChannel buffered,
74+
GapicBidiUnbufferedAppendableWritableByteChannel unbuffered) {
75+
this.buffered = buffered;
76+
this.unbuffered = unbuffered;
77+
lock = new ReentrantLock();
78+
}
79+
80+
@Override
81+
public void flush() throws IOException {
82+
lock.lock();
83+
try {
84+
buffered.flush();
85+
} finally {
86+
lock.unlock();
87+
}
88+
}
89+
90+
@Override
91+
public int write(ByteBuffer src) throws IOException {
92+
lock.lock();
93+
try {
94+
return buffered.write(src);
95+
} finally {
96+
lock.unlock();
97+
}
98+
}
99+
100+
@Override
101+
public boolean isOpen() {
102+
lock.lock();
103+
try {
104+
return buffered.isOpen();
105+
} finally {
106+
lock.unlock();
107+
}
108+
}
109+
110+
@Override
111+
public void close() throws IOException {
112+
lock.lock();
113+
try {
114+
buffered.close();
115+
} finally {
116+
lock.unlock();
117+
}
118+
}
119+
120+
public void finalizeWrite() throws IOException {
121+
lock.lock();
122+
try {
123+
buffered.flush();
124+
unbuffered.finalizeWrite();
125+
} finally {
126+
lock.unlock();
127+
}
128+
}
129+
}
56130
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BufferedWritableByteChannelSession.java

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -25,9 +25,5 @@ interface BufferedWritableByteChannelSession<ResultT>
2525

2626
interface BufferedWritableByteChannel extends WritableByteChannel {
2727
void flush() throws IOException;
28-
29-
default UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel getChannel() {
30-
return null;
31-
}
3228
}
3329
}

google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java

Lines changed: 0 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -304,12 +304,6 @@ public void close() throws IOException {
304304
throw StorageException.coalesce(e);
305305
}
306306
}
307-
308-
@Override
309-
public UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel
310-
getChannel() {
311-
return delegate.getChannel();
312-
}
313307
},
314308
MoreExecutors.directExecutor());
315309
}

google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedWritableByteChannel.java

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -189,11 +189,6 @@ public void flush() throws IOException {
189189
}
190190
}
191191

192-
@Override
193-
public UnbufferedWritableByteChannel getChannel() {
194-
return channel;
195-
}
196-
197192
private boolean enqueuedBytes() {
198193
return handle.position() > 0;
199194
}

0 commit comments

Comments
 (0)