Skip to content

Commit 52639da

Browse files
JesseLovelaceBenWhitehead
authored andcommitted
chore: protoype for appends api
1 parent 961f3cb commit 52639da

16 files changed

+366
-24
lines changed

google-cloud-storage/clirr-ignored-differences.xml

Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -127,6 +127,17 @@
127127
<method>com.google.api.core.ApiFuture getBlobDescriptor(com.google.cloud.storage.BlobId, com.google.cloud.storage.Storage$BlobSourceOption[])</method>
128128
</difference>
129129

130+
<difference>
131+
<differenceType>7012</differenceType>
132+
<className>com/google/cloud/storage/BufferedWritableByteChannelSession$BufferedWritableByteChannel</className>
133+
<method>com.google.cloud.storage.UnbufferedWritableByteChannelSession$UnbufferedWritableByteChannel getChannel()</method>
134+
</difference>
135+
136+
<difference>
137+
<differenceType>7012</differenceType>
138+
<className>com/google/cloud/storage/Storage</className>
139+
<method>com.google.cloud.storage.AppendableBlobUpload createAppendableBlobUpload(com.google.cloud.storage.BlobInfo, int, com.google.cloud.storage.Storage$BlobWriteOption[])</method>
140+
</difference>
130141
<difference>
131142
<differenceType>7005</differenceType>
132143
<className>com/google/cloud/storage/Hasher$*</className>
Lines changed: 56 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,56 @@
1+
/*
2+
* Copyright 2024 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* http://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
17+
package com.google.cloud.storage;
18+
19+
import com.google.api.core.ApiFuture;
20+
import java.io.IOException;
21+
import java.nio.ByteBuffer;
22+
import java.util.concurrent.ExecutionException;
23+
24+
public final class AppendableBlobUpload implements AutoCloseable {
25+
private final BufferedWritableByteChannelSession.BufferedWritableByteChannel channel;
26+
private final ApiFuture<BlobInfo> result;
27+
28+
private AppendableBlobUpload(BlobInfo blob, BlobWriteSession session) throws IOException {
29+
channel = (BufferedWritableByteChannelSession.BufferedWritableByteChannel) (session.open());
30+
result = session.getResult();
31+
}
32+
33+
static AppendableBlobUpload createNewAppendableBlob(BlobInfo blob, BlobWriteSession session)
34+
throws IOException {
35+
return new AppendableBlobUpload(blob, session);
36+
}
37+
38+
public BlobInfo finalizeUpload() throws IOException, ExecutionException, InterruptedException {
39+
((GapicBidiUnbufferedWritableByteChannel) channel.getChannel()).setFinalFlush();
40+
channel.flush();
41+
((GapicBidiUnbufferedWritableByteChannel) channel.getChannel()).finalizeWrite();
42+
close();
43+
return result.get();
44+
}
45+
46+
public void write(ByteBuffer buffer) throws IOException {
47+
channel.write(buffer);
48+
}
49+
50+
@Override
51+
public void close() throws IOException {
52+
if (channel.isOpen()) {
53+
channel.close();
54+
}
55+
}
56+
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BidiBlobWriteSessionConfig.java

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -88,8 +88,8 @@ WriterFactory createFactory(Clock clock) throws IOException {
8888
}
8989

9090
@InternalApi
91-
private static final class Factory implements WriterFactory {
92-
private static final Conversions.Decoder<BidiWriteObjectResponse, BlobInfo>
91+
static final class Factory implements WriterFactory {
92+
static final Conversions.Decoder<BidiWriteObjectResponse, BlobInfo>
9393
WRITE_OBJECT_RESPONSE_BLOB_INFO_DECODER =
9494
Conversions.grpc().blobInfo().compose(BidiWriteObjectResponse::getResource);
9595

google-cloud-storage/src/main/java/com/google/cloud/storage/BidiResumableWrite.java

Lines changed: 60 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -94,3 +94,63 @@ static BidiResumableWrite identity(BidiResumableWrite w) {
9494
return w;
9595
}
9696
}
97+
98+
final class BidiAppendableWrite implements BidiWriteObjectRequestBuilderFactory {
99+
100+
private final BidiWriteObjectRequest req;
101+
102+
public BidiAppendableWrite(BidiWriteObjectRequest req) {
103+
req =
104+
req.toBuilder()
105+
.setWriteObjectSpec(req.getWriteObjectSpec().toBuilder().setAppendable(true).build())
106+
.build();
107+
this.req = req;
108+
}
109+
110+
public BidiWriteObjectRequest getReq() {
111+
return req;
112+
}
113+
114+
@Override
115+
public BidiWriteObjectRequest.Builder newBuilder() {
116+
return req.toBuilder();
117+
}
118+
119+
@Override
120+
public @Nullable String bucketName() {
121+
if (req.hasWriteObjectSpec() && req.getWriteObjectSpec().hasResource()) {
122+
return req.getWriteObjectSpec().getResource().getBucket();
123+
}
124+
return null;
125+
}
126+
127+
@Override
128+
public String toString() {
129+
return "BidiAppendableWrite{" + "req=" + fmtProto(req) + '}';
130+
}
131+
132+
@Override
133+
public boolean equals(Object o) {
134+
if (this == o) {
135+
return true;
136+
}
137+
if (!(o instanceof BidiAppendableWrite)) {
138+
return false;
139+
}
140+
BidiAppendableWrite BidiAppendableWrite = (BidiAppendableWrite) o;
141+
return Objects.equals(req, BidiAppendableWrite.getReq());
142+
}
143+
144+
@Override
145+
public int hashCode() {
146+
return Objects.hash(req);
147+
}
148+
149+
/**
150+
* Helper function which is more specific than {@link Function#identity()}. Constraining the input
151+
* and output to be exactly {@link BidiAppendableWrite}.
152+
*/
153+
static BidiAppendableWrite identity(BidiAppendableWrite w) {
154+
return w;
155+
}
156+
}

google-cloud-storage/src/main/java/com/google/cloud/storage/BufferedWritableByteChannelSession.java

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -25,5 +25,9 @@ interface BufferedWritableByteChannelSession<ResultT>
2525

2626
interface BufferedWritableByteChannel extends WritableByteChannel {
2727
void flush() throws IOException;
28+
29+
default UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel getChannel() {
30+
return null;
31+
}
2832
}
2933
}

google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBlobWriteSessionConfig.java

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -304,6 +304,12 @@ public void close() throws IOException {
304304
throw StorageException.coalesce(e);
305305
}
306306
}
307+
308+
@Override
309+
public UnbufferedWritableByteChannelSession.UnbufferedWritableByteChannel
310+
getChannel() {
311+
return delegate.getChannel();
312+
}
307313
},
308314
MoreExecutors.directExecutor());
309315
}

google-cloud-storage/src/main/java/com/google/cloud/storage/DefaultBufferedWritableByteChannel.java

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -189,6 +189,11 @@ public void flush() throws IOException {
189189
}
190190
}
191191

192+
@Override
193+
public UnbufferedWritableByteChannel getChannel() {
194+
return channel;
195+
}
196+
192197
private boolean enqueuedBytes() {
193198
return handle.position() > 0;
194199
}

google-cloud-storage/src/main/java/com/google/cloud/storage/GapicBidiUnbufferedWritableByteChannel.java

Lines changed: 37 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -56,14 +56,16 @@ final class GapicBidiUnbufferedWritableByteChannel implements UnbufferedWritable
5656
private final SettableApiFuture<BidiWriteObjectResponse> resultFuture;
5757
private final ChunkSegmenter chunkSegmenter;
5858

59-
private final BidiWriteCtx<BidiResumableWrite> writeCtx;
59+
private final BidiWriteCtx<BidiWriteCtx.BidiWriteObjectRequestBuilderFactory> writeCtx;
6060
private final GrpcCallContext context;
6161
private final BidiObserver responseObserver;
6262

6363
private volatile ApiStreamObserver<BidiWriteObjectRequest> stream;
6464
private boolean open = true;
6565
private boolean first = true;
6666
private boolean finished = false;
67+
private boolean appendable = false;
68+
private boolean finalFlush = false;
6769
private volatile BidiWriteObjectRequest lastWrittenRequest;
6870
private volatile RewindableContent currentContent;
6971

@@ -73,7 +75,7 @@ final class GapicBidiUnbufferedWritableByteChannel implements UnbufferedWritable
7375
ResultRetryAlgorithm<?> alg,
7476
SettableApiFuture<BidiWriteObjectResponse> resultFuture,
7577
ChunkSegmenter chunkSegmenter,
76-
BidiWriteCtx<BidiResumableWrite> writeCtx,
78+
BidiWriteCtx<BidiWriteCtx.BidiWriteObjectRequestBuilderFactory> writeCtx,
7779
Supplier<GrpcCallContext> baseContextSupplier) {
7880
this.write = write;
7981
this.deps = deps;
@@ -110,7 +112,7 @@ public void close() throws IOException {
110112
return;
111113
}
112114
try {
113-
if (!finished) {
115+
if (!finished && !appendable) {
114116
BidiWriteObjectRequest message = finishMessage();
115117
lastWrittenRequest = message;
116118
flush(Collections.singletonList(message));
@@ -127,8 +129,19 @@ public void close() throws IOException {
127129
}
128130
}
129131

132+
public void finalizeWrite() throws IOException {
133+
BidiWriteObjectRequest message = finishMessage();
134+
lastWrittenRequest = message;
135+
flush(Collections.singletonList(message));
136+
close();
137+
}
138+
139+
public void setFinalFlush() {
140+
finalFlush = true;
141+
}
142+
130143
@VisibleForTesting
131-
BidiWriteCtx<BidiResumableWrite> getWriteCtx() {
144+
BidiWriteCtx<BidiWriteCtx.BidiWriteObjectRequestBuilderFactory> getWriteCtx() {
132145
return writeCtx;
133146
}
134147

@@ -140,7 +153,12 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
140153

141154
long begin = writeCtx.getConfirmedBytes().get();
142155
currentContent = RewindableContent.of(srcs, srcsOffset, srcsLength);
143-
ChunkSegment[] data = chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength, finalize);
156+
boolean allowUnaligned = finalize;
157+
if (finalFlush) {
158+
allowUnaligned = true;
159+
}
160+
ChunkSegment[] data =
161+
chunkSegmenter.segmentBuffers(srcs, srcsOffset, srcsLength, allowUnaligned);
144162
if (data.length == 0) {
145163
currentContent = null;
146164
return 0;
@@ -166,11 +184,15 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
166184
if (!first) {
167185
builder.clearUploadId();
168186
builder.clearObjectChecksums();
187+
builder.clearWriteObjectSpec();
169188
} else {
170189
first = false;
190+
if (builder.hasWriteObjectSpec()) {
191+
appendable = builder.getWriteObjectSpec().getAppendable();
192+
}
171193
}
172194
builder.setWriteOffset(offset).setChecksummedData(checksummedData.build());
173-
if (!datum.isOnlyFullBlocks()) {
195+
if (!datum.isOnlyFullBlocks() && !appendable) {
174196
builder.setFinishWrite(true);
175197
if (cumulative != null) {
176198
builder.setObjectChecksums(
@@ -180,7 +202,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
180202
}
181203

182204
if (i == data.length - 1 && !finished) {
183-
if (finalize) {
205+
if (finalize && !appendable) {
184206
builder.setFinishWrite(true);
185207
finished = true;
186208
} else {
@@ -191,7 +213,7 @@ private long internalWrite(ByteBuffer[] srcs, int srcsOffset, int srcsLength, bo
191213
BidiWriteObjectRequest build = builder.build();
192214
messages.add(build);
193215
}
194-
if (finalize && !finished) {
216+
if (finalize && !finished && !appendable) {
195217
messages.add(finishMessage());
196218
finished = true;
197219
}
@@ -217,7 +239,7 @@ private BidiWriteObjectRequest finishMessage() {
217239

218240
BidiWriteObjectRequest.Builder b = writeCtx.newRequestBuilder();
219241
if (!first) {
220-
b.clearUploadId().clearObjectChecksums();
242+
b.clearUploadId().clearObjectChecksums().clearWriteObjectSpec();
221243
}
222244
b.setFinishWrite(true).setWriteOffset(offset);
223245
if (crc32cValue != null) {
@@ -280,9 +302,12 @@ private BidiObserver() {
280302
@Override
281303
public void onNext(BidiWriteObjectResponse value) {
282304
boolean finalizing = lastWrittenRequest.getFinishWrite();
283-
if (!finalizing && value.hasPersistedSize()) { // incremental
305+
boolean firstAppendable = lastWrittenRequest.getWriteObjectSpec().getAppendable();
306+
307+
if (!finalizing && (firstAppendable || value.hasPersistedSize())) { // incremental
284308
long totalSentBytes = writeCtx.getTotalSentBytes().get();
285-
long persistedSize = value.getPersistedSize();
309+
long persistedSize =
310+
firstAppendable ? value.getResource().getSize() : value.getPersistedSize();
286311

287312
if (totalSentBytes == persistedSize) {
288313
writeCtx.getConfirmedBytes().set(persistedSize);
@@ -382,7 +407,7 @@ public void onError(Throwable t) {
382407

383408
@Override
384409
public void onCompleted() {
385-
if (last != null && last.hasResource()) {
410+
if (last != null && (appendable || last.hasResource())) {
386411
resultFuture.set(last);
387412
}
388413
sem.release();

0 commit comments

Comments
 (0)