Skip to content

Commit 4fd63af

Browse files
authored
feat: add read attempt and operation timeout to ExportJob settings (#4533)
* feat: add read attempt and operation timeout to ExportJob settings * use nested value provider
1 parent eeba7f8 commit 4fd63af

File tree

3 files changed

+42
-1
lines changed

3 files changed

+42
-1
lines changed

bigtable-client-core-parent/bigtable-hbase/src/main/java/com/google/cloud/bigtable/hbase/wrappers/veneer/DataClientVeneerApi.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@
3333
import com.google.cloud.bigtable.data.v2.models.Row;
3434
import com.google.cloud.bigtable.data.v2.models.RowMutation;
3535
import com.google.cloud.bigtable.hbase.adapters.Adapters;
36+
import com.google.cloud.bigtable.hbase.util.Logger;
3637
import com.google.cloud.bigtable.hbase.wrappers.BulkMutationWrapper;
3738
import com.google.cloud.bigtable.hbase.wrappers.BulkReadWrapper;
3839
import com.google.cloud.bigtable.hbase.wrappers.DataClientWrapper;
@@ -59,11 +60,14 @@
5960
import org.apache.hadoop.hbase.client.AbstractClientScanner;
6061
import org.apache.hadoop.hbase.client.Result;
6162
import org.apache.hadoop.hbase.client.ResultScanner;
63+
import org.threeten.bp.Duration;
6264

6365
/** For internal use only - public for technical reasons. */
6466
@InternalApi("For internal usage only")
6567
public class DataClientVeneerApi implements DataClientWrapper {
6668

69+
private final Logger LOG = new Logger(DataClientVeneerApi.class);
70+
6771
private static final RowResultAdapter RESULT_ADAPTER = new RowResultAdapter();
6872

6973
private final BigtableDataClient delegate;
@@ -203,7 +207,9 @@ private GrpcCallContext createScanCallContext() {
203207
callSettings.getOperationTimeout().get().toMillis(), TimeUnit.MILLISECONDS)));
204208
}
205209
if (callSettings.getAttemptTimeout().isPresent()) {
206-
ctx = ctx.withTimeout(callSettings.getAttemptTimeout().get());
210+
Duration attemptTimeout = callSettings.getAttemptTimeout().get();
211+
LOG.info("effective attempt timeout for scan is %s", attemptTimeout);
212+
ctx = ctx.withTimeout(attemptTimeout);
207213
}
208214

209215
return ctx;

bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/TemplateUtils.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -96,6 +96,29 @@ public static CloudBigtableScanConfiguration buildExportConfig(ExportOptions opt
9696
options.getBigtableStopRow(),
9797
options.getBigtableMaxVersions(),
9898
options.getBigtableFilter()));
99+
if (options.getBigtableReadRpcTimeoutMs() != null) {
100+
ValueProvider.NestedValueProvider.of(
101+
options.getBigtableReadRpcTimeoutMs(),
102+
(Integer timeout) -> {
103+
if (timeout != null) {
104+
configBuilder.withConfiguration(
105+
BigtableOptionsFactory.BIGTABLE_READ_RPC_TIMEOUT_MS_KEY, String.valueOf(timeout));
106+
}
107+
return null;
108+
});
109+
}
110+
if (options.getBigtableReadRpcAttemptTimeoutMs() != null) {
111+
ValueProvider.NestedValueProvider.of(
112+
options.getBigtableReadRpcAttemptTimeoutMs(),
113+
(Integer timeout) -> {
114+
if (timeout != null) {
115+
configBuilder.withConfiguration(
116+
BigtableOptionsFactory.BIGTABLE_READ_RPC_ATTEMPT_TIMEOUT_MS_KEY,
117+
String.valueOf(timeout));
118+
}
119+
return null;
120+
});
121+
}
99122
return configBuilder.build();
100123
}
101124
}

bigtable-dataflow-parent/bigtable-beam-import/src/main/java/com/google/cloud/bigtable/beam/sequencefiles/ExportJob.java

Lines changed: 12 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -179,6 +179,18 @@ public interface ExportOptions extends GcpOptions, GcsOptions {
179179

180180
@SuppressWarnings("unused")
181181
void setRetryIdleTimeout(boolean retryIdleTimeout);
182+
183+
@Description("Read RPC timeout in milliseconds.")
184+
ValueProvider<Integer> getBigtableReadRpcTimeoutMs();
185+
186+
@SuppressWarnings("unused")
187+
void setBigtableReadRpcTimeoutMs(ValueProvider<Integer> readRpcTimeoutMs);
188+
189+
@Description("Read RPC attempt timeout in milliseconds.")
190+
ValueProvider<Integer> getBigtableReadRpcAttemptTimeoutMs();
191+
192+
@SuppressWarnings("unused")
193+
void setBigtableReadRpcAttemptTimeoutMs(ValueProvider<Integer> readRpcAttemptTimeoutMs);
182194
}
183195

184196
public static void main(String[] args) {

0 commit comments

Comments
 (0)