Skip to content
This repository was archived by the owner on May 8, 2026. It is now read-only.

Commit 51657d8

Browse files
committed
convert to internal changes in change stream classes
1 parent ce6a279 commit 51657d8

7 files changed

Lines changed: 44 additions & 96 deletions

File tree

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutation.java

Lines changed: 6 additions & 35 deletions
Original file line numberDiff line numberDiff line change
@@ -15,11 +15,7 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.models;
1717

18-
import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeInstant;
19-
import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant;
20-
2118
import com.google.api.core.InternalApi;
22-
import com.google.api.core.ObsoleteApi;
2319
import com.google.auto.value.AutoValue;
2420
import com.google.cloud.bigtable.data.v2.models.Range.TimestampRange;
2521
import com.google.cloud.bigtable.data.v2.stub.changestream.ChangeStreamRecordMerger;
@@ -82,7 +78,7 @@ static Builder createUserMutation(
8278
.setRowKey(rowKey)
8379
.setType(MutationType.USER)
8480
.setSourceClusterId(sourceClusterId)
85-
.setCommitTime(commitTimestamp)
81+
.setCommitTimestamp(commitTimestamp)
8682
.setTieBreaker(tieBreaker);
8783
}
8884

@@ -97,7 +93,7 @@ static Builder createGcMutation(
9793
.setRowKey(rowKey)
9894
.setType(MutationType.GARBAGE_COLLECTION)
9995
.setSourceClusterId("")
100-
.setCommitTime(commitTimestamp)
96+
.setCommitTimestamp(commitTimestamp)
10197
.setTieBreaker(tieBreaker);
10298
}
10399

@@ -113,14 +109,7 @@ static Builder createGcMutation(
113109
@Nonnull
114110
public abstract String getSourceClusterId();
115111

116-
/** This method is obsolete. Use {@link #getCommitTime()} instead. */
117-
@ObsoleteApi("Use getCommitTime() instead")
118-
public abstract org.threeten.bp.Instant getCommitTimestamp();
119-
120-
/** Get the commit timestamp of the current mutation. */
121-
public java.time.Instant getCommitTime() {
122-
return toJavaTimeInstant(getCommitTimestamp());
123-
}
112+
public abstract java.time.Instant getCommitTimestamp();
124113

125114
/**
126115
* Get the tie breaker of the current mutation. This is used to resolve conflicts when multiple
@@ -132,14 +121,8 @@ public java.time.Instant getCommitTime() {
132121
@Nonnull
133122
public abstract String getToken();
134123

135-
/** This method is obsolete. Use {@link #getEstimatedLowWatermarkTime()} instead. */
136-
@ObsoleteApi("Use getEstimatedLowWatermarkTime() instead")
137-
public abstract org.threeten.bp.Instant getEstimatedLowWatermark();
138-
139124
/** Get the low watermark of the current mutation. */
140-
public java.time.Instant getEstimatedLowWatermarkTime() {
141-
return toJavaTimeInstant(getEstimatedLowWatermark());
142-
}
125+
public abstract java.time.Instant getEstimatedLowWatermark();
143126

144127
/** Get the list of mods of the current mutation. */
145128
@Nonnull
@@ -160,27 +143,15 @@ abstract static class Builder {
160143

161144
abstract Builder setSourceClusterId(@Nonnull String sourceClusterId);
162145

163-
Builder setCommitTime(java.time.Instant commitTimestamp) {
164-
return setCommitTimestamp(toThreetenInstant(commitTimestamp));
165-
}
166-
167-
/** This method is obsolete. Use {@link #setCommitTime(java.time.Instant)} instead. */
168-
@ObsoleteApi("Use setCommitTime(java.time.Instant) instead")
169-
abstract Builder setCommitTimestamp(org.threeten.bp.Instant commitTimestamp);
146+
abstract Builder setCommitTimestamp(java.time.Instant commitTimestamp);
170147

171148
abstract Builder setTieBreaker(int tieBreaker);
172149

173150
abstract ImmutableList.Builder<Entry> entriesBuilder();
174151

175152
abstract Builder setToken(@Nonnull String token);
176153

177-
Builder setLowWatermarkTime(java.time.Instant estimatedLowWatermark) {
178-
return setEstimatedLowWatermark(toThreetenInstant(estimatedLowWatermark));
179-
}
180-
181-
/** This method is obsolete. Use {@link #setLowWatermarkTime(java.time.Instant)} instead. */
182-
@ObsoleteApi("Use setEstimatedLowWatermarkInstant(java.time.Instant) instead")
183-
abstract Builder setEstimatedLowWatermark(org.threeten.bp.Instant estimatedLowWatermark);
154+
abstract Builder setEstimatedLowWatermark(java.time.Instant estimatedLowWatermark);
184155

185156
Builder setCell(
186157
@Nonnull String familyName,

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/DefaultChangeStreamRecordAdapter.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -177,7 +177,7 @@ public void finishCell() {
177177
public ChangeStreamRecord finishChangeStreamMutation(
178178
String token, Instant estimatedLowWatermark) {
179179
this.changeStreamMutationBuilder.setToken(token);
180-
this.changeStreamMutationBuilder.setLowWatermarkTime(estimatedLowWatermark);
180+
this.changeStreamMutationBuilder.setEstimatedLowWatermark(estimatedLowWatermark);
181181
return this.changeStreamMutationBuilder.build();
182182
}
183183

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/models/Heartbeat.java

Lines changed: 5 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -15,14 +15,11 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.models;
1717

18-
import static com.google.api.gax.util.TimeConversionUtils.toJavaTimeInstant;
19-
import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant;
20-
2118
import com.google.api.core.InternalApi;
22-
import com.google.api.core.ObsoleteApi;
2319
import com.google.auto.value.AutoValue;
2420
import com.google.bigtable.v2.ReadChangeStreamResponse;
2521
import java.io.Serializable;
22+
import java.time.Instant;
2623
import javax.annotation.Nonnull;
2724

2825
/** A simple wrapper for {@link ReadChangeStreamResponse.Heartbeat}. */
@@ -32,30 +29,22 @@ public abstract class Heartbeat implements ChangeStreamRecord, Serializable {
3229
private static final long serialVersionUID = 7316215828353608504L;
3330

3431
private static Heartbeat create(
35-
ChangeStreamContinuationToken changeStreamContinuationToken,
36-
java.time.Instant estimatedLowWatermark) {
37-
return new AutoValue_Heartbeat(
38-
changeStreamContinuationToken, toThreetenInstant(estimatedLowWatermark));
32+
ChangeStreamContinuationToken changeStreamContinuationToken, Instant estimatedLowWatermark) {
33+
return new AutoValue_Heartbeat(changeStreamContinuationToken, estimatedLowWatermark);
3934
}
4035

4136
/** Wraps the protobuf {@link ReadChangeStreamResponse.Heartbeat}. */
4237
static Heartbeat fromProto(@Nonnull ReadChangeStreamResponse.Heartbeat heartbeat) {
4338
return create(
4439
ChangeStreamContinuationToken.fromProto(heartbeat.getContinuationToken()),
45-
java.time.Instant.ofEpochSecond(
40+
Instant.ofEpochSecond(
4641
heartbeat.getEstimatedLowWatermark().getSeconds(),
4742
heartbeat.getEstimatedLowWatermark().getNanos()));
4843
}
4944

5045
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
5146
public abstract ChangeStreamContinuationToken getChangeStreamContinuationToken();
5247

53-
/** This method is obsolete. Use {@link #getEstimatedLowWatermarkInstant()} instead. */
54-
@ObsoleteApi("Use getEstimatedLowWatermarkInstant() instead")
55-
public abstract org.threeten.bp.Instant getEstimatedLowWatermark();
56-
5748
@InternalApi("Intended for use by the BigtableIO in apache/beam only.")
58-
public java.time.Instant getEstimatedLowWatermarkInstant() {
59-
return toJavaTimeInstant(getEstimatedLowWatermark());
60-
}
49+
public abstract Instant getEstimatedLowWatermark();
6150
}

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamMutationTest.java

Lines changed: 13 additions & 22 deletions
Original file line numberDiff line numberDiff line change
@@ -15,7 +15,6 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.models;
1717

18-
import static com.google.api.gax.util.TimeConversionUtils.toThreetenInstant;
1918
import static com.google.common.truth.Truth.assertThat;
2019

2120
import com.google.bigtable.v2.MutateRowRequest;
@@ -46,10 +45,6 @@ public class ChangeStreamMutationTest {
4645
RequestContext.create(PROJECT_ID, INSTANCE_ID, APP_PROFILE_ID);
4746
private static final Instant FAKE_COMMIT_TIMESTAMP = Instant.ofEpochSecond(0, 1000L);
4847
private static final Instant FAKE_LOW_WATERMARK = Instant.ofEpochSecond(0, 2000L);
49-
private static final org.threeten.bp.Instant FAKE_COMMIT_TIMESTAMP_THREETEN =
50-
toThreetenInstant(FAKE_COMMIT_TIMESTAMP);
51-
private static final org.threeten.bp.Instant FAKE_LOW_WATERMARK_THREETEN =
52-
toThreetenInstant(FAKE_LOW_WATERMARK);
5348

5449
@Test
5550
public void userInitiatedMutationTest() throws IOException, ClassNotFoundException {
@@ -78,20 +73,18 @@ public void userInitiatedMutationTest() throws IOException, ClassNotFoundExcepti
7873
Value.rawTimestamp(1000),
7974
Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
8075
.setToken("fake-token")
81-
.setLowWatermarkTime(FAKE_LOW_WATERMARK)
76+
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
8277
.build();
8378

8479
// Test the getters.
8580
assertThat(changeStreamMutation.getRowKey()).isEqualTo(ByteString.copyFromUtf8("key"));
8681
assertThat(changeStreamMutation.getType()).isEqualTo(ChangeStreamMutation.MutationType.USER);
8782
assertThat(changeStreamMutation.getSourceClusterId()).isEqualTo("fake-source-cluster-id");
88-
assertThat(changeStreamMutation.getCommitTime()).isEqualTo(FAKE_COMMIT_TIMESTAMP);
89-
assertThat(changeStreamMutation.getCommitTimestamp()).isEqualTo(FAKE_COMMIT_TIMESTAMP_THREETEN);
83+
assertThat(changeStreamMutation.getCommitTimestamp()).isEqualTo(FAKE_COMMIT_TIMESTAMP);
9084
assertThat(changeStreamMutation.getTieBreaker()).isEqualTo(0);
9185
assertThat(changeStreamMutation.getToken()).isEqualTo("fake-token");
92-
assertThat(changeStreamMutation.getEstimatedLowWatermarkTime()).isEqualTo(FAKE_LOW_WATERMARK);
93-
assertThat(changeStreamMutation.getEstimatedLowWatermark())
94-
.isEqualTo(FAKE_LOW_WATERMARK_THREETEN);
86+
assertThat(changeStreamMutation.getEstimatedLowWatermark()).isEqualTo(FAKE_LOW_WATERMARK);
87+
assertThat(changeStreamMutation.getEstimatedLowWatermark()).isEqualTo(FAKE_LOW_WATERMARK);
9588

9689
// Test serialization.
9790
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -120,21 +113,19 @@ public void gcMutationTest() throws IOException, ClassNotFoundException {
120113
ByteString.copyFromUtf8("fake-qualifier"),
121114
Range.TimestampRange.create(1000L, 2000L))
122115
.setToken("fake-token")
123-
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
116+
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
124117
.build();
125118

126119
// Test the getters.
127120
assertThat(changeStreamMutation.getRowKey()).isEqualTo(ByteString.copyFromUtf8("key"));
128121
assertThat(changeStreamMutation.getType())
129122
.isEqualTo(ChangeStreamMutation.MutationType.GARBAGE_COLLECTION);
130123
Assert.assertTrue(changeStreamMutation.getSourceClusterId().isEmpty());
131-
assertThat(changeStreamMutation.getCommitTime()).isEqualTo(FAKE_COMMIT_TIMESTAMP);
132-
assertThat(changeStreamMutation.getCommitTimestamp()).isEqualTo(FAKE_COMMIT_TIMESTAMP_THREETEN);
124+
assertThat(changeStreamMutation.getCommitTimestamp()).isEqualTo(FAKE_COMMIT_TIMESTAMP);
133125
assertThat(changeStreamMutation.getTieBreaker()).isEqualTo(0);
134126
assertThat(changeStreamMutation.getToken()).isEqualTo("fake-token");
135-
assertThat(changeStreamMutation.getEstimatedLowWatermarkTime()).isEqualTo(FAKE_LOW_WATERMARK);
136-
assertThat(changeStreamMutation.getEstimatedLowWatermark())
137-
.isEqualTo(FAKE_LOW_WATERMARK_THREETEN);
127+
assertThat(changeStreamMutation.getEstimatedLowWatermark()).isEqualTo(FAKE_LOW_WATERMARK);
128+
assertThat(changeStreamMutation.getEstimatedLowWatermark()).isEqualTo(FAKE_LOW_WATERMARK);
138129

139130
// Test serialization.
140131
ByteArrayOutputStream bos = new ByteArrayOutputStream();
@@ -172,7 +163,7 @@ public void toRowMutationTest() {
172163
Value.rawTimestamp(1000),
173164
Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
174165
.setToken("fake-token")
175-
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
166+
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
176167
.build();
177168

178169
// Convert it to a rowMutation and construct a MutateRowRequest.
@@ -215,7 +206,7 @@ public void toRowMutationWithoutTokenShouldFailTest() {
215206
ChangeStreamMutation.createUserMutation(
216207
ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0)
217208
.deleteFamily("fake-family")
218-
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN);
209+
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK);
219210
Assert.assertThrows(IllegalStateException.class, builder::build);
220211
}
221212

@@ -255,7 +246,7 @@ public void toRowMutationEntryTest() {
255246
Value.rawTimestamp(1000),
256247
Value.rawValue(ByteString.copyFrom(Longs.toByteArray(1234L))))
257248
.setToken("fake-token")
258-
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
249+
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
259250
.build();
260251

261252
// Convert it to a rowMutationEntry and construct a MutateRowRequest.
@@ -295,7 +286,7 @@ public void toRowMutationEntryWithoutTokenShouldFailTest() {
295286
ChangeStreamMutation.createUserMutation(
296287
ByteString.copyFromUtf8("key"), "fake-source-cluster-id", FAKE_COMMIT_TIMESTAMP, 0)
297288
.deleteFamily("fake-family")
298-
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN);
289+
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK);
299290
Assert.assertThrows(IllegalStateException.class, builder::build);
300291
}
301292

@@ -320,7 +311,7 @@ public void testWithLongValue() {
320311
1000L,
321312
ByteString.copyFrom(Longs.toByteArray(1L)))
322313
.setToken("fake-token")
323-
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK_THREETEN)
314+
.setEstimatedLowWatermark(FAKE_LOW_WATERMARK)
324315
.build();
325316

326317
RowMutation rowMutation = changeStreamMutation.toRowMutation(TABLE_ID);

google-cloud-bigtable/src/test/java/com/google/cloud/bigtable/data/v2/models/ChangeStreamRecordTest.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -129,7 +129,7 @@ public void heartbeatTest() {
129129
.build();
130130
Heartbeat actualHeartbeat = Heartbeat.fromProto(heartbeatProto);
131131

132-
assertThat(actualHeartbeat.getEstimatedLowWatermarkInstant())
132+
assertThat(actualHeartbeat.getEstimatedLowWatermark())
133133
.isEqualTo(Instant.ofEpochSecond(lowWatermark.getSeconds(), lowWatermark.getNanos()));
134134
assertThat(actualHeartbeat.getChangeStreamContinuationToken().getPartition())
135135
.isEqualTo(ByteStringRange.create(rowRange.getStartKeyClosed(), rowRange.getEndKeyOpen()));

0 commit comments

Comments
 (0)