Skip to content

Commit 6858a9d

Browse files
authored
chore: refactor RangeProjectionConfigs to have RangeSpec provided as part of the config (#99)
Not all projections have the same way of determining a RangeSpec for its operation. As such, RangeSpec needs to be provided per projection rather than passed in to the BlobReadSession readAs method. For example, a seekable channel sets it's offset by seeking, whereas a range read as an ApiFuture<byte[]> will want a RangeSpec.
1 parent bb7e1b4 commit 6858a9d

12 files changed

+196
-74
lines changed

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@ public interface BlobReadSession extends IOAutoCloseable {
2626
BlobInfo getBlobInfo();
2727

2828
@BetaApi
29-
<Projection> Projection readRange(RangeSpec range, RangeProjectionConfig<Projection> config);
29+
<Projection> Projection readAs(RangeProjectionConfig<Projection> config);
3030

3131
@Override
3232
@BetaApi

google-cloud-storage/src/main/java/com/google/cloud/storage/BlobReadSessionAdapter.java

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -39,9 +39,8 @@ public BlobInfo getBlobInfo() {
3939
// the return type.
4040
@SuppressWarnings({"rawtypes", "unchecked"})
4141
@Override
42-
public <Projection> Projection readRange(
43-
RangeSpec range, RangeProjectionConfig<Projection> config) {
44-
Projection projection = session.readRange(range, config);
42+
public <Projection> Projection readAs(RangeProjectionConfig<Projection> config) {
43+
Projection projection = session.readAs(config);
4544
if (projection instanceof ApiFuture) {
4645
ApiFuture apiFuture = (ApiFuture) projection;
4746
return (Projection) StorageException.coalesceAsync(apiFuture);

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSession.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -22,5 +22,5 @@ public interface ObjectReadSession extends IOAutoCloseable {
2222

2323
Object getResource();
2424

25-
<Projection> Projection readRange(RangeSpec range, RangeProjectionConfig<Projection> config);
25+
<Projection> Projection readAs(RangeProjectionConfig<Projection> config);
2626
}

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionImpl.java

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -80,20 +80,19 @@ public Object getResource() {
8080
}
8181

8282
@Override
83-
public <Projection> Projection readRange(
84-
RangeSpec range, RangeProjectionConfig<Projection> config) {
83+
public <Projection> Projection readAs(RangeProjectionConfig<Projection> config) {
8584
lock.lock();
8685
try {
8786
checkState(open, "stream already closed");
8887
switch (config.getType()) {
8988
case STREAM_READ:
9089
long readId = state.newReadId();
9190
ObjectReadSessionStreamRead<Projection> read =
92-
config.cast().newRead(readId, range, retryContextProvider.create());
91+
config.cast().newRead(readId, retryContextProvider.create());
9392
registerReadInState(readId, read);
9493
return read.project();
9594
case SESSION_USER:
96-
return config.project(range, this, IOAutoCloseable.noOp());
95+
return config.project(this, IOAutoCloseable.noOp());
9796
default:
9897
throw new IllegalStateException(
9998
String.format(

google-cloud-storage/src/main/java/com/google/cloud/storage/ObjectReadSessionSeekableByteChannel.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -67,7 +67,7 @@ public int read(ByteBuffer dst) throws IOException {
6767
"RangeSpec does not begin at provided position. expected = %s, actual = %s",
6868
position,
6969
apply.begin());
70-
rbc = session.readRange(apply, RangeProjectionConfigs.asChannel());
70+
rbc = session.readAs(RangeProjectionConfigs.asChannel().withRangeSpec(apply));
7171
lastRangeSpec = apply;
7272
}
7373

google-cloud-storage/src/main/java/com/google/cloud/storage/OtelStorageDecorator.java

Lines changed: 7 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1849,9 +1849,9 @@ public ProjectionType getType() {
18491849
}
18501850

18511851
@Override
1852-
Projection project(RangeSpec range, ObjectReadSession session, IOAutoCloseable closeAlongWith) {
1852+
Projection project(ObjectReadSession session, IOAutoCloseable closeAlongWith) {
18531853
try {
1854-
return delegate.project(range, session, closeAlongWith.andThen(parentSpan::end));
1854+
return delegate.project(session, closeAlongWith.andThen(parentSpan::end));
18551855
} catch (Throwable t) {
18561856
parentSpan.recordException(t);
18571857
parentSpan.setStatus(StatusCode.ERROR, t.getClass().getSimpleName());
@@ -1869,12 +1869,10 @@ private OtelBaseConfigDecorator(BaseConfig<Projection, ?> delegate) {
18691869
}
18701870

18711871
@Override
1872-
ObjectReadSessionStreamRead<Projection> newRead(
1873-
long readId, RangeSpec range, RetryContext retryContext) {
1872+
ObjectReadSessionStreamRead<Projection> newRead(long readId, RetryContext retryContext) {
18741873
OtelRetryContextDecorator otelRetryContext =
18751874
new OtelRetryContextDecorator(retryContext, parentSpan);
1876-
ObjectReadSessionStreamRead<Projection> read =
1877-
delegate.newRead(readId, range, otelRetryContext);
1875+
ObjectReadSessionStreamRead<Projection> read = delegate.newRead(readId, otelRetryContext);
18781876
read.setOnCloseCallback(parentSpan::end);
18791877
return new OtelDecoratingObjectReadSessionStreamRead<>(read, parentSpan);
18801878
}
@@ -1947,18 +1945,17 @@ public BlobInfo getBlobInfo() {
19471945
}
19481946

19491947
@Override
1950-
public <Projection> Projection readRange(
1951-
RangeSpec range, RangeProjectionConfig<Projection> config) {
1948+
public <Projection> Projection readAs(RangeProjectionConfig<Projection> config) {
19521949
Span readRangeSpan =
19531950
tracer
1954-
.spanBuilder("readRange")
1951+
.spanBuilder("readAs")
19551952
.setAttribute("gsutil.uri", id.toGsUtilUriWithGeneration())
19561953
.setParent(blobReadSessionContext)
19571954
.startSpan();
19581955
try (Scope ignore2 = readRangeSpan.makeCurrent()) {
19591956
OtelRangeProjectionConfig<Projection> c =
19601957
new OtelRangeProjectionConfig<>(config, readRangeSpan);
1961-
return delegate.readRange(range, c);
1958+
return delegate.readAs(c);
19621959
} catch (Throwable t) {
19631960
readRangeSpan.recordException(t);
19641961
readRangeSpan.setStatus(StatusCode.ERROR, t.getClass().getSimpleName());

google-cloud-storage/src/main/java/com/google/cloud/storage/RangeProjectionConfig.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,7 @@ public abstract class RangeProjectionConfig<Projection> {
2929

3030
abstract ProjectionType getType();
3131

32-
Projection project(RangeSpec range, ObjectReadSession session, IOAutoCloseable closeAlongWith) {
32+
Projection project(ObjectReadSession session, IOAutoCloseable closeAlongWith) {
3333
throw new UnsupportedOperationException(
3434
String.format(Locale.US, "%s#project()", this.getClass().getName()));
3535
}

google-cloud-storage/src/main/java/com/google/cloud/storage/RangeProjectionConfigs.java

Lines changed: 133 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -31,7 +31,7 @@ public final class RangeProjectionConfigs {
3131

3232
abstract static class BaseConfig<Projection, Read extends ObjectReadSessionStreamRead<Projection>>
3333
extends RangeProjectionConfig<Projection> {
34-
abstract Read newRead(long readId, RangeSpec range, RetryContext retryContext);
34+
abstract Read newRead(long readId, RetryContext retryContext);
3535

3636
@Override
3737
ProjectionType getType() {
@@ -100,8 +100,7 @@ SeekableChannelConfig withCrc32cValidationEnabled(boolean enabled) {
100100
}
101101

102102
@Override
103-
SeekableByteChannel project(
104-
RangeSpec range, ObjectReadSession session, IOAutoCloseable closeAlongWith) {
103+
SeekableByteChannel project(ObjectReadSession session, IOAutoCloseable closeAlongWith) {
105104
return StorageByteChannels.seekable(
106105
new ObjectReadSessionSeekableByteChannel(session, this, closeAlongWith));
107106
}
@@ -138,15 +137,30 @@ public String toString() {
138137

139138
public static final class RangeAsChannel
140139
extends BaseConfig<ScatteringByteChannel, StreamingRead> {
141-
private static final RangeAsChannel INSTANCE = new RangeAsChannel(Hasher.enabled());
140+
private static final RangeAsChannel INSTANCE =
141+
new RangeAsChannel(RangeSpec.all(), Hasher.enabled());
142142

143+
private final RangeSpec range;
143144
private final Hasher hasher;
144145

145-
private RangeAsChannel(Hasher hasher) {
146+
private RangeAsChannel(RangeSpec range, Hasher hasher) {
146147
super();
148+
this.range = range;
147149
this.hasher = hasher;
148150
}
149151

152+
public RangeSpec getRange() {
153+
return range;
154+
}
155+
156+
public RangeAsChannel withRangeSpec(RangeSpec range) {
157+
requireNonNull(range, "range must be non null");
158+
if (this.range.equals(range)) {
159+
return this;
160+
}
161+
return new RangeAsChannel(range, hasher);
162+
}
163+
150164
boolean getCrc32cValidationEnabled() {
151165
return Hasher.enabled().equals(hasher);
152166
}
@@ -157,7 +171,7 @@ RangeAsChannel withCrc32cValidationEnabled(boolean enabled) {
157171
} else if (!enabled && Hasher.noop().equals(hasher)) {
158172
return this;
159173
}
160-
return new RangeAsChannel(enabled ? Hasher.enabled() : Hasher.noop());
174+
return new RangeAsChannel(range, enabled ? Hasher.enabled() : Hasher.noop());
161175
}
162176

163177
@Override
@@ -166,22 +180,62 @@ RangeAsChannel withCrc32cValidationEnabled(boolean enabled) {
166180
}
167181

168182
@Override
169-
StreamingRead newRead(long readId, RangeSpec range, RetryContext retryContext) {
183+
StreamingRead newRead(long readId, RetryContext retryContext) {
170184
return ObjectReadSessionStreamRead.streamingRead(readId, range, hasher, retryContext);
171185
}
186+
187+
@Override
188+
public boolean equals(Object o) {
189+
if (this == o) {
190+
return true;
191+
}
192+
if (!(o instanceof RangeAsChannel)) {
193+
return false;
194+
}
195+
RangeAsChannel that = (RangeAsChannel) o;
196+
return Objects.equals(range, that.range) && Objects.equals(hasher, that.hasher);
197+
}
198+
199+
@Override
200+
public int hashCode() {
201+
return Objects.hash(range, hasher);
202+
}
203+
204+
@Override
205+
public String toString() {
206+
return MoreObjects.toStringHelper(this)
207+
.add("range", range)
208+
.add("crc32cValidationEnabled", getCrc32cValidationEnabled())
209+
.toString();
210+
}
172211
}
173212

174213
public static final class RangeAsFutureBytes
175214
extends BaseConfig<ApiFuture<byte[]>, AccumulatingRead<byte[]>> {
176-
private static final RangeAsFutureBytes INSTANCE = new RangeAsFutureBytes(Hasher.enabled());
215+
private static final RangeAsFutureBytes INSTANCE =
216+
new RangeAsFutureBytes(RangeSpec.all(), Hasher.enabled());
177217

218+
private final RangeSpec range;
178219
private final Hasher hasher;
179220

180-
private RangeAsFutureBytes(Hasher hasher) {
221+
private RangeAsFutureBytes(RangeSpec range, Hasher hasher) {
181222
super();
223+
this.range = range;
182224
this.hasher = hasher;
183225
}
184226

227+
public RangeSpec getRange() {
228+
return range;
229+
}
230+
231+
public RangeAsFutureBytes withRangeSpec(RangeSpec range) {
232+
requireNonNull(range, "range must be non null");
233+
if (this.range.equals(range)) {
234+
return this;
235+
}
236+
return new RangeAsFutureBytes(range, hasher);
237+
}
238+
185239
boolean getCrc32cValidationEnabled() {
186240
return Hasher.enabled().equals(hasher);
187241
}
@@ -192,7 +246,7 @@ RangeAsFutureBytes withCrc32cValidationEnabled(boolean enabled) {
192246
} else if (!enabled && Hasher.noop().equals(hasher)) {
193247
return this;
194248
}
195-
return new RangeAsFutureBytes(enabled ? Hasher.enabled() : Hasher.noop());
249+
return new RangeAsFutureBytes(range, enabled ? Hasher.enabled() : Hasher.noop());
196250
}
197251

198252
@Override
@@ -201,24 +255,63 @@ RangeAsFutureBytes withCrc32cValidationEnabled(boolean enabled) {
201255
}
202256

203257
@Override
204-
AccumulatingRead<byte[]> newRead(long readId, RangeSpec range, RetryContext retryContext) {
258+
AccumulatingRead<byte[]> newRead(long readId, RetryContext retryContext) {
205259
return ObjectReadSessionStreamRead.createByteArrayAccumulatingRead(
206260
readId, range, hasher, retryContext);
207261
}
262+
263+
@Override
264+
public boolean equals(Object o) {
265+
if (this == o) {
266+
return true;
267+
}
268+
if (!(o instanceof RangeAsFutureBytes)) {
269+
return false;
270+
}
271+
RangeAsFutureBytes that = (RangeAsFutureBytes) o;
272+
return Objects.equals(range, that.range) && Objects.equals(hasher, that.hasher);
273+
}
274+
275+
@Override
276+
public int hashCode() {
277+
return Objects.hash(range, hasher);
278+
}
279+
280+
@Override
281+
public String toString() {
282+
return MoreObjects.toStringHelper(this)
283+
.add("range", range)
284+
.add("crc32cValidationEnabled", getCrc32cValidationEnabled())
285+
.toString();
286+
}
208287
}
209288

210289
static final class RangeAsFutureByteString
211290
extends BaseConfig<ApiFuture<DisposableByteString>, AccumulatingRead<DisposableByteString>> {
212291
private static final RangeAsFutureByteString INSTANCE =
213-
new RangeAsFutureByteString(Hasher.enabled());
292+
new RangeAsFutureByteString(RangeSpec.all(), Hasher.enabled());
214293

294+
private final RangeSpec range;
215295
private final Hasher hasher;
216296

217-
private RangeAsFutureByteString(Hasher hasher) {
297+
private RangeAsFutureByteString(RangeSpec range, Hasher hasher) {
218298
super();
299+
this.range = range;
219300
this.hasher = hasher;
220301
}
221302

303+
public RangeSpec getRange() {
304+
return range;
305+
}
306+
307+
public RangeAsFutureByteString withRangeSpec(RangeSpec range) {
308+
requireNonNull(range, "range must be non null");
309+
if (this.range.equals(range)) {
310+
return this;
311+
}
312+
return new RangeAsFutureByteString(range, hasher);
313+
}
314+
222315
boolean getCrc32cValidationEnabled() {
223316
return Hasher.enabled().equals(hasher);
224317
}
@@ -229,7 +322,7 @@ RangeAsFutureByteString withCrc32cValidationEnabled(boolean enabled) {
229322
} else if (!enabled && Hasher.noop().equals(hasher)) {
230323
return this;
231324
}
232-
return new RangeAsFutureByteString(enabled ? Hasher.enabled() : Hasher.noop());
325+
return new RangeAsFutureByteString(range, enabled ? Hasher.enabled() : Hasher.noop());
233326
}
234327

235328
@Override
@@ -238,10 +331,34 @@ RangeAsFutureByteString withCrc32cValidationEnabled(boolean enabled) {
238331
}
239332

240333
@Override
241-
AccumulatingRead<DisposableByteString> newRead(
242-
long readId, RangeSpec range, RetryContext retryContext) {
334+
AccumulatingRead<DisposableByteString> newRead(long readId, RetryContext retryContext) {
243335
return ObjectReadSessionStreamRead.createZeroCopyByteStringAccumulatingRead(
244336
readId, range, hasher, retryContext);
245337
}
338+
339+
@Override
340+
public boolean equals(Object o) {
341+
if (this == o) {
342+
return true;
343+
}
344+
if (!(o instanceof RangeAsFutureByteString)) {
345+
return false;
346+
}
347+
RangeAsFutureByteString that = (RangeAsFutureByteString) o;
348+
return Objects.equals(range, that.range) && Objects.equals(hasher, that.hasher);
349+
}
350+
351+
@Override
352+
public int hashCode() {
353+
return Objects.hash(range, hasher);
354+
}
355+
356+
@Override
357+
public String toString() {
358+
return MoreObjects.toStringHelper(this)
359+
.add("range", range)
360+
.add("crc32cValidationEnabled", getCrc32cValidationEnabled())
361+
.toString();
362+
}
246363
}
247364
}

google-cloud-storage/src/main/java/com/google/cloud/storage/StorageDataClient.java

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -73,7 +73,6 @@ ApiFuture<ObjectReadSession> readSession(BidiReadObjectRequest req, GrpcCallCont
7373
<Projection> ApiFuture<FastOpenObjectReadSession<Projection>> fastOpenReadSession(
7474
BidiReadObjectRequest openRequest,
7575
GrpcCallContext ctx,
76-
RangeSpec range,
7776
RangeProjectionConfig<Projection> config) {
7877
checkArgument(
7978
openRequest.getReadRangesList().isEmpty(),
@@ -91,7 +90,7 @@ <Projection> ApiFuture<FastOpenObjectReadSession<Projection>> fastOpenReadSessio
9190

9291
long readId = state.newReadId();
9392
ObjectReadSessionStreamRead<Projection> read =
94-
config.cast().newRead(readId, range, retryContextProvider.create());
93+
config.cast().newRead(readId, retryContextProvider.create());
9594
state.putOutstandingRead(readId, read);
9695

9796
ApiFuture<FastOpenObjectReadSession<Projection>> objectReadSessionFuture =

0 commit comments

Comments
 (0)