1515 */
1616package com .google .cloud .bigtable .data .v2 .stub ;
1717
18- import com .google .api .core .ApiFuture ;
1918import com .google .api .core .BetaApi ;
2019import com .google .api .gax .core .FixedCredentialsProvider ;
2120import com .google .api .gax .core .InstantiatingExecutorProvider ;
2221import com .google .api .gax .grpc .ChannelPrimer ;
2322import com .google .api .gax .grpc .GrpcTransportChannel ;
2423import com .google .api .gax .rpc .FixedTransportChannelProvider ;
2524import com .google .auth .Credentials ;
26- import com .google .bigtable .v2 .ReadRowsRequest ;
27- import com .google .bigtable .v2 .RowFilter ;
28- import com .google .bigtable .v2 .RowSet ;
29- import com .google .bigtable .v2 .TableName ;
30- import com .google .cloud .bigtable .data .v2 .models .DefaultRowAdapter ;
31- import com .google .cloud .bigtable .data .v2 .models .Row ;
25+ import com .google .bigtable .v2 .PingAndWarmRequest ;
26+ import com .google .cloud .bigtable .data .v2 .internal .NameUtil ;
3227import com .google .common .base .Preconditions ;
33- import com .google .common .collect .ImmutableList ;
34- import com .google .protobuf .ByteString ;
35- import io .grpc .ConnectivityState ;
3628import io .grpc .ManagedChannel ;
3729import java .io .IOException ;
38- import java .util .HashMap ;
39- import java .util .List ;
40- import java .util .Map ;
4130import java .util .concurrent .ExecutionException ;
42- import java .util .concurrent .TimeUnit ;
4331import java .util .logging .Logger ;
44- import org .threeten .bp .Duration ;
4532
4633/**
4734 * A channel warmer that ensures that a Bigtable channel is ready to be used before being added to
5441class BigtableChannelPrimer implements ChannelPrimer {
5542 private static Logger LOG = Logger .getLogger (BigtableChannelPrimer .class .toString ());
5643
57- static ByteString PRIMING_ROW_KEY = ByteString .copyFromUtf8 ("nonexistent-priming-row" );
58- private static Duration PRIME_REQUEST_TIMEOUT = Duration .ofSeconds (30 );
59-
6044 private final EnhancedBigtableStubSettings settingsTemplate ;
61- private final List <String > tableIds ;
6245
6346 static BigtableChannelPrimer create (
64- Credentials credentials ,
65- String projectId ,
66- String instanceId ,
67- String appProfileId ,
68- List <String > tableIds ) {
47+ Credentials credentials , String projectId , String instanceId , String appProfileId ) {
6948 EnhancedBigtableStubSettings .Builder builder =
7049 EnhancedBigtableStubSettings .newBuilder ()
7150 .setProjectId (projectId )
@@ -75,28 +54,12 @@ static BigtableChannelPrimer create(
7554 .setExecutorProvider (
7655 InstantiatingExecutorProvider .newBuilder ().setExecutorThreadCount (1 ).build ());
7756
78- // Disable retries for priming request
79- builder
80- .readRowSettings ()
81- .setRetrySettings (
82- builder
83- .readRowSettings ()
84- .getRetrySettings ()
85- .toBuilder ()
86- .setMaxAttempts (1 )
87- .setJittered (false )
88- .setInitialRpcTimeout (PRIME_REQUEST_TIMEOUT )
89- .setMaxRpcTimeout (PRIME_REQUEST_TIMEOUT )
90- .setTotalTimeout (PRIME_REQUEST_TIMEOUT )
91- .build ());
92- return new BigtableChannelPrimer (builder .build (), tableIds );
57+ return new BigtableChannelPrimer (builder .build ());
9358 }
9459
95- private BigtableChannelPrimer (
96- EnhancedBigtableStubSettings settingsTemplate , List <String > tableIds ) {
60+ private BigtableChannelPrimer (EnhancedBigtableStubSettings settingsTemplate ) {
9761 Preconditions .checkNotNull (settingsTemplate , "settingsTemplate can't be null" );
9862 this .settingsTemplate = settingsTemplate ;
99- this .tableIds = ImmutableList .copyOf (tableIds );
10063 }
10164
10265 @ Override
@@ -110,25 +73,7 @@ public void primeChannel(ManagedChannel managedChannel) {
11073 }
11174
11275 private void primeChannelUnsafe (ManagedChannel managedChannel ) throws IOException {
113- if (tableIds .isEmpty ()) {
114- waitForChannelReady (managedChannel );
115- } else {
116- sendPrimeRequests (managedChannel );
117- }
118- }
119-
120- private void waitForChannelReady (ManagedChannel managedChannel ) {
121- for (int i = 0 ; i < 30 ; i ++) {
122- ConnectivityState connectivityState = managedChannel .getState (true );
123- if (connectivityState == ConnectivityState .READY ) {
124- break ;
125- }
126- try {
127- TimeUnit .SECONDS .sleep (1 );
128- } catch (InterruptedException e ) {
129- break ;
130- }
131- }
76+ sendPrimeRequests (managedChannel );
13277 }
13378
13479 private void sendPrimeRequests (ManagedChannel managedChannel ) throws IOException {
@@ -141,41 +86,24 @@ private void sendPrimeRequests(ManagedChannel managedChannel) throws IOException
14186 .build ();
14287
14388 try (EnhancedBigtableStub stub = EnhancedBigtableStub .create (primingSettings )) {
144- Map <String , ApiFuture <?>> primeFutures = new HashMap <>();
145-
146- // Prime all of the table ids in parallel
147- for (String tableId : tableIds ) {
148- ApiFuture <Row > f =
149- stub .createReadRowsRawCallable (new DefaultRowAdapter ())
150- .first ()
151- .futureCall (
152- ReadRowsRequest .newBuilder ()
153- .setTableName (
154- TableName .format (
155- primingSettings .getProjectId (),
156- primingSettings .getInstanceId (),
157- tableId ))
158- .setAppProfileId (primingSettings .getAppProfileId ())
159- .setRows (RowSet .newBuilder ().addRowKeys (PRIMING_ROW_KEY ).build ())
160- .setFilter (RowFilter .newBuilder ().setBlockAllFilter (true ).build ())
161- .setRowsLimit (1 )
162- .build ());
89+ PingAndWarmRequest request =
90+ PingAndWarmRequest .newBuilder ()
91+ .setName (
92+ NameUtil .formatInstanceName (
93+ primingSettings .getProjectId (), primingSettings .getInstanceId ()))
94+ .setAppProfileId (primingSettings .getAppProfileId ())
95+ .build ();
16396
164- primeFutures .put (tableId , f );
165- }
166-
167- // Wait for all of the prime requests to complete.
168- for (Map .Entry <String , ApiFuture <?>> entry : primeFutures .entrySet ()) {
169- try {
170- entry .getValue ().get ();
171- } catch (Throwable e ) {
172- if (e instanceof ExecutionException ) {
173- e = e .getCause ();
174- }
175- LOG .warning (
176- String .format (
177- "Failed to prime channel for table: %s: %s" , entry .getKey (), e .getMessage ()));
97+ try {
98+ stub .pingAndWarmCallable ().call (request );
99+ } catch (Throwable e ) {
100+ // TODO: Not sure if we should swallow the error here. We are pre-emptively swapping
101+ // channels if the new
102+ // channel is bad.
103+ if (e instanceof ExecutionException ) {
104+ e = e .getCause ();
178105 }
106+ LOG .warning (String .format ("Failed to prime channel: %s" , e ));
179107 }
180108 }
181109 }
0 commit comments