Skip to content

Commit 6629821

Browse files
authored
feat: use PingAndWarm request for channel priming (#1179)
Switching channel priming from sending fake ReadRowsRequest to PingAndWarm request, which on the server side will list all the tables for an instance. In the settings we won't need to specify the table Ids to prime.
1 parent 2aa490c commit 6629821

File tree

8 files changed

+187
-196
lines changed

8 files changed

+187
-196
lines changed

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/BigtableDataSettings.java

Lines changed: 9 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -215,10 +215,10 @@ public boolean isRefreshingChannel() {
215215
}
216216

217217
/**
218-
* Gets the table ids that will be used to send warmup requests when {@link
219-
* #isRefreshingChannel()} is enabled.
218+
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
219+
* requests will be sent to all table ids of the instance.
220220
*/
221-
@BetaApi("Channel priming is not currently stable and may change in the future")
221+
@Deprecated
222222
public List<String> getPrimingTableIds() {
223223
return stubSettings.getPrimedTableIds();
224224
}
@@ -377,23 +377,20 @@ public boolean isRefreshingChannel() {
377377
}
378378

379379
/**
380-
* Configure the tables that can be used to prime a channel during a refresh.
381-
*
382-
* <p>These tables work in conjunction with {@link #setRefreshingChannel(boolean)}. When a
383-
* channel is refreshed, it will send a request to each table to warm up the serverside caches
384-
* before admitting the new channel into the channel pool.
380+
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
381+
* requests will be sent to all table ids of the instance.
385382
*/
386-
@BetaApi("Channel priming is not currently stable and may change in the future")
383+
@Deprecated
387384
public Builder setPrimingTableIds(String... tableIds) {
388385
stubSettings.setPrimedTableIds(tableIds);
389386
return this;
390387
}
391388

392389
/**
393-
* Gets the table ids that will be used to send warmup requests when {@link
394-
* #setRefreshingChannel(boolean)} is enabled.
390+
* @deprecated This field is ignored. If {@link #isRefreshingChannel()} is enabled, warm up
391+
* requests will be sent to all table ids of the instance.
395392
*/
396-
@BetaApi("Channel priming is not currently stable and may change in the future")
393+
@Deprecated
397394
public List<String> getPrimingTableIds() {
398395
return stubSettings.getPrimedTableIds();
399396
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableChannelPrimer.java

Lines changed: 22 additions & 94 deletions
Original file line numberDiff line numberDiff line change
@@ -15,33 +15,20 @@
1515
*/
1616
package com.google.cloud.bigtable.data.v2.stub;
1717

18-
import com.google.api.core.ApiFuture;
1918
import com.google.api.core.BetaApi;
2019
import com.google.api.gax.core.FixedCredentialsProvider;
2120
import com.google.api.gax.core.InstantiatingExecutorProvider;
2221
import com.google.api.gax.grpc.ChannelPrimer;
2322
import com.google.api.gax.grpc.GrpcTransportChannel;
2423
import com.google.api.gax.rpc.FixedTransportChannelProvider;
2524
import com.google.auth.Credentials;
26-
import com.google.bigtable.v2.ReadRowsRequest;
27-
import com.google.bigtable.v2.RowFilter;
28-
import com.google.bigtable.v2.RowSet;
29-
import com.google.bigtable.v2.TableName;
30-
import com.google.cloud.bigtable.data.v2.models.DefaultRowAdapter;
31-
import com.google.cloud.bigtable.data.v2.models.Row;
25+
import com.google.bigtable.v2.PingAndWarmRequest;
26+
import com.google.cloud.bigtable.data.v2.internal.NameUtil;
3227
import com.google.common.base.Preconditions;
33-
import com.google.common.collect.ImmutableList;
34-
import com.google.protobuf.ByteString;
35-
import io.grpc.ConnectivityState;
3628
import io.grpc.ManagedChannel;
3729
import java.io.IOException;
38-
import java.util.HashMap;
39-
import java.util.List;
40-
import java.util.Map;
4130
import java.util.concurrent.ExecutionException;
42-
import java.util.concurrent.TimeUnit;
4331
import java.util.logging.Logger;
44-
import org.threeten.bp.Duration;
4532

4633
/**
4734
* A channel warmer that ensures that a Bigtable channel is ready to be used before being added to
@@ -54,18 +41,10 @@
5441
class BigtableChannelPrimer implements ChannelPrimer {
5542
private static Logger LOG = Logger.getLogger(BigtableChannelPrimer.class.toString());
5643

57-
static ByteString PRIMING_ROW_KEY = ByteString.copyFromUtf8("nonexistent-priming-row");
58-
private static Duration PRIME_REQUEST_TIMEOUT = Duration.ofSeconds(30);
59-
6044
private final EnhancedBigtableStubSettings settingsTemplate;
61-
private final List<String> tableIds;
6245

6346
static BigtableChannelPrimer create(
64-
Credentials credentials,
65-
String projectId,
66-
String instanceId,
67-
String appProfileId,
68-
List<String> tableIds) {
47+
Credentials credentials, String projectId, String instanceId, String appProfileId) {
6948
EnhancedBigtableStubSettings.Builder builder =
7049
EnhancedBigtableStubSettings.newBuilder()
7150
.setProjectId(projectId)
@@ -75,28 +54,12 @@ static BigtableChannelPrimer create(
7554
.setExecutorProvider(
7655
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(1).build());
7756

78-
// Disable retries for priming request
79-
builder
80-
.readRowSettings()
81-
.setRetrySettings(
82-
builder
83-
.readRowSettings()
84-
.getRetrySettings()
85-
.toBuilder()
86-
.setMaxAttempts(1)
87-
.setJittered(false)
88-
.setInitialRpcTimeout(PRIME_REQUEST_TIMEOUT)
89-
.setMaxRpcTimeout(PRIME_REQUEST_TIMEOUT)
90-
.setTotalTimeout(PRIME_REQUEST_TIMEOUT)
91-
.build());
92-
return new BigtableChannelPrimer(builder.build(), tableIds);
57+
return new BigtableChannelPrimer(builder.build());
9358
}
9459

95-
private BigtableChannelPrimer(
96-
EnhancedBigtableStubSettings settingsTemplate, List<String> tableIds) {
60+
private BigtableChannelPrimer(EnhancedBigtableStubSettings settingsTemplate) {
9761
Preconditions.checkNotNull(settingsTemplate, "settingsTemplate can't be null");
9862
this.settingsTemplate = settingsTemplate;
99-
this.tableIds = ImmutableList.copyOf(tableIds);
10063
}
10164

10265
@Override
@@ -110,25 +73,7 @@ public void primeChannel(ManagedChannel managedChannel) {
11073
}
11174

11275
private void primeChannelUnsafe(ManagedChannel managedChannel) throws IOException {
113-
if (tableIds.isEmpty()) {
114-
waitForChannelReady(managedChannel);
115-
} else {
116-
sendPrimeRequests(managedChannel);
117-
}
118-
}
119-
120-
private void waitForChannelReady(ManagedChannel managedChannel) {
121-
for (int i = 0; i < 30; i++) {
122-
ConnectivityState connectivityState = managedChannel.getState(true);
123-
if (connectivityState == ConnectivityState.READY) {
124-
break;
125-
}
126-
try {
127-
TimeUnit.SECONDS.sleep(1);
128-
} catch (InterruptedException e) {
129-
break;
130-
}
131-
}
76+
sendPrimeRequests(managedChannel);
13277
}
13378

13479
private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException {
@@ -141,41 +86,24 @@ private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException
14186
.build();
14287

14388
try (EnhancedBigtableStub stub = EnhancedBigtableStub.create(primingSettings)) {
144-
Map<String, ApiFuture<?>> primeFutures = new HashMap<>();
145-
146-
// Prime all of the table ids in parallel
147-
for (String tableId : tableIds) {
148-
ApiFuture<Row> f =
149-
stub.createReadRowsRawCallable(new DefaultRowAdapter())
150-
.first()
151-
.futureCall(
152-
ReadRowsRequest.newBuilder()
153-
.setTableName(
154-
TableName.format(
155-
primingSettings.getProjectId(),
156-
primingSettings.getInstanceId(),
157-
tableId))
158-
.setAppProfileId(primingSettings.getAppProfileId())
159-
.setRows(RowSet.newBuilder().addRowKeys(PRIMING_ROW_KEY).build())
160-
.setFilter(RowFilter.newBuilder().setBlockAllFilter(true).build())
161-
.setRowsLimit(1)
162-
.build());
89+
PingAndWarmRequest request =
90+
PingAndWarmRequest.newBuilder()
91+
.setName(
92+
NameUtil.formatInstanceName(
93+
primingSettings.getProjectId(), primingSettings.getInstanceId()))
94+
.setAppProfileId(primingSettings.getAppProfileId())
95+
.build();
16396

164-
primeFutures.put(tableId, f);
165-
}
166-
167-
// Wait for all of the prime requests to complete.
168-
for (Map.Entry<String, ApiFuture<?>> entry : primeFutures.entrySet()) {
169-
try {
170-
entry.getValue().get();
171-
} catch (Throwable e) {
172-
if (e instanceof ExecutionException) {
173-
e = e.getCause();
174-
}
175-
LOG.warning(
176-
String.format(
177-
"Failed to prime channel for table: %s: %s", entry.getKey(), e.getMessage()));
97+
try {
98+
stub.pingAndWarmCallable().call(request);
99+
} catch (Throwable e) {
100+
// TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
101+
// channels if the new
102+
// channel is bad.
103+
if (e instanceof ExecutionException) {
104+
e = e.getCause();
178105
}
106+
LOG.warning(String.format("Failed to prime channel: %s", e));
179107
}
180108
}
181109
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/EnhancedBigtableStub.java

Lines changed: 29 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -51,6 +51,8 @@
5151
import com.google.bigtable.v2.MutateRowResponse;
5252
import com.google.bigtable.v2.MutateRowsRequest;
5353
import com.google.bigtable.v2.MutateRowsResponse;
54+
import com.google.bigtable.v2.PingAndWarmRequest;
55+
import com.google.bigtable.v2.PingAndWarmResponse;
5456
import com.google.bigtable.v2.ReadModifyWriteRowRequest;
5557
import com.google.bigtable.v2.ReadModifyWriteRowResponse;
5658
import com.google.bigtable.v2.ReadRowsRequest;
@@ -104,6 +106,7 @@
104106
import java.io.IOException;
105107
import java.net.URI;
106108
import java.net.URISyntaxException;
109+
import java.util.Collections;
107110
import java.util.List;
108111
import java.util.Map;
109112
import java.util.concurrent.TimeUnit;
@@ -141,6 +144,7 @@ public class EnhancedBigtableStub implements AutoCloseable {
141144
private final UnaryCallable<BulkMutation, Void> bulkMutateRowsCallable;
142145
private final UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable;
143146
private final UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable;
147+
private final UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable;
144148

145149
public static EnhancedBigtableStub create(EnhancedBigtableStubSettings settings)
146150
throws IOException {
@@ -181,8 +185,7 @@ public static EnhancedBigtableStubSettings finalizeSettings(
181185
credentials,
182186
settings.getProjectId(),
183187
settings.getInstanceId(),
184-
settings.getAppProfileId(),
185-
settings.getPrimedTableIds()))
188+
settings.getAppProfileId()))
186189
.build());
187190
}
188191

@@ -284,6 +287,7 @@ public EnhancedBigtableStub(EnhancedBigtableStubSettings settings, ClientContext
284287
bulkMutateRowsCallable = createBulkMutateRowsCallable();
285288
checkAndMutateRowCallable = createCheckAndMutateRowCallable();
286289
readModifyWriteRowCallable = createReadModifyWriteRowCallable();
290+
pingAndWarmCallable = createPingAndWarmCallable();
287291
}
288292

289293
// <editor-fold desc="Callable creators">
@@ -810,6 +814,25 @@ private <RequestT, ResponseT> UnaryCallable<RequestT, ResponseT> createUserFacin
810814

811815
return traced.withDefaultCallContext(clientContext.getDefaultCallContext());
812816
}
817+
818+
private UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> createPingAndWarmCallable() {
819+
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarm =
820+
GrpcRawCallableFactory.createUnaryCallable(
821+
GrpcCallSettings.<PingAndWarmRequest, PingAndWarmResponse>newBuilder()
822+
.setMethodDescriptor(BigtableGrpc.getPingAndWarmMethod())
823+
.setParamsExtractor(
824+
new RequestParamsExtractor<PingAndWarmRequest>() {
825+
@Override
826+
public Map<String, String> extract(PingAndWarmRequest request) {
827+
return ImmutableMap.of(
828+
"name", request.getName(),
829+
"app_profile_id", request.getAppProfileId());
830+
}
831+
})
832+
.build(),
833+
Collections.emptySet());
834+
return pingAndWarm.withDefaultCallContext(clientContext.getDefaultCallContext());
835+
}
813836
// </editor-fold>
814837

815838
// <editor-fold desc="Callable accessors">
@@ -854,6 +877,10 @@ public UnaryCallable<ConditionalRowMutation, Boolean> checkAndMutateRowCallable(
854877
public UnaryCallable<ReadModifyWriteRow, Row> readModifyWriteRowCallable() {
855878
return readModifyWriteRowCallable;
856879
}
880+
881+
UnaryCallable<PingAndWarmRequest, PingAndWarmResponse> pingAndWarmCallable() {
882+
return pingAndWarmCallable;
883+
}
857884
// </editor-fold>
858885

859886
private SpanName getSpanName(String methodName) {

0 commit comments

Comments
 (0)