Skip to content

Commit 140a1ad

Browse files
authored
feat(bigtable): expose a metric to track the number of outstanding rpcs (unary , streaming) in channel pool (#2696)
* feat(bigtable): introduce a channel trace for exporting bigtable channel specific metrics. Creates per_connection_error_count and outstanding_rpcs_per_connection tracers.
1 parent 16447fb commit 140a1ad

14 files changed

+1023
-571
lines changed

google-cloud-bigtable/clirr-ignored-differences.xml

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,10 @@
5555
<differenceType>8001</differenceType>
5656
<className>com/google/cloud/bigtable/data/v2/stub/metrics/HeaderTracer</className>
5757
</difference>
58+
<difference>
59+
<differenceType>8001</differenceType>
60+
<className>com/google/cloud/bigtable/data/v2/stub/metrics/ErrorCountPerConnectionMetricTracker</className>
61+
</difference>
5862
<!-- InternalApi that was removed -->
5963
<difference>
6064
<differenceType>8001</differenceType>

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/BigtableClientContext.java

Lines changed: 10 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -27,9 +27,9 @@
2727
import com.google.cloud.bigtable.data.v2.BigtableDataSettings;
2828
import com.google.cloud.bigtable.data.v2.internal.JwtCredentialsWithAudience;
2929
import com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants;
30+
import com.google.cloud.bigtable.data.v2.stub.metrics.ChannelPoolMetricsTracer;
3031
import com.google.cloud.bigtable.data.v2.stub.metrics.CustomOpenTelemetryMetricsProvider;
3132
import com.google.cloud.bigtable.data.v2.stub.metrics.DefaultMetricsProvider;
32-
import com.google.cloud.bigtable.data.v2.stub.metrics.ErrorCountPerConnectionMetricTracker;
3333
import com.google.cloud.bigtable.data.v2.stub.metrics.MetricsProvider;
3434
import com.google.cloud.bigtable.data.v2.stub.metrics.NoopMetricsProvider;
3535
import com.google.cloud.bigtable.gaxx.grpc.BigtableTransportChannelProvider;
@@ -97,19 +97,17 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings
9797
: null;
9898

9999
@Nullable OpenTelemetrySdk internalOtel = null;
100-
@Nullable ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker = null;
101-
100+
@Nullable ChannelPoolMetricsTracer channelPoolMetricsTracer = null;
102101
// Internal metrics are scoped to the connections, so we need a mutable transportProvider,
103102
// otherwise there is
104103
// no reason to build the internal OtelProvider
105104
if (transportProvider != null) {
106105
internalOtel =
107106
settings.getInternalMetricsProvider().createOtelProvider(settings, credentials);
108107
if (internalOtel != null) {
109-
// Set up per connection error count tracker if all dependencies are met:
110-
// a configurable transport provider + otel
111-
errorCountPerConnectionMetricTracker =
112-
setupPerConnectionErrorTracer(builder, transportProvider, internalOtel);
108+
channelPoolMetricsTracer =
109+
new ChannelPoolMetricsTracer(
110+
internalOtel, EnhancedBigtableStub.createBuiltinAttributes(builder.build()));
113111

114112
// Configure grpc metrics
115113
configureGrpcOtel(transportProvider, internalOtel);
@@ -137,16 +135,16 @@ public static BigtableClientContext create(EnhancedBigtableStubSettings settings
137135

138136
BigtableTransportChannelProvider btTransportProvider =
139137
BigtableTransportChannelProvider.create(
140-
(InstantiatingGrpcChannelProvider) transportProvider.build(), channelPrimer);
138+
(InstantiatingGrpcChannelProvider) transportProvider.build(),
139+
channelPrimer,
140+
channelPoolMetricsTracer);
141141

142142
builder.setTransportChannelProvider(btTransportProvider);
143143
}
144144

145145
ClientContext clientContext = ClientContext.create(builder.build());
146-
147-
if (errorCountPerConnectionMetricTracker != null) {
148-
errorCountPerConnectionMetricTracker.startConnectionErrorCountTracker(
149-
clientContext.getExecutor());
146+
if (channelPoolMetricsTracer != null) {
147+
channelPoolMetricsTracer.start(clientContext.getExecutor());
150148
}
151149

152150
return new BigtableClientContext(
@@ -264,27 +262,6 @@ private static void patchCredentials(EnhancedBigtableStubSettings.Builder settin
264262
settings.setCredentialsProvider(FixedCredentialsProvider.create(patchedCreds));
265263
}
266264

267-
private static ErrorCountPerConnectionMetricTracker setupPerConnectionErrorTracer(
268-
EnhancedBigtableStubSettings.Builder builder,
269-
InstantiatingGrpcChannelProvider.Builder transportProvider,
270-
OpenTelemetry openTelemetry) {
271-
ErrorCountPerConnectionMetricTracker errorCountPerConnectionMetricTracker =
272-
new ErrorCountPerConnectionMetricTracker(
273-
openTelemetry, EnhancedBigtableStub.createBuiltinAttributes(builder.build()));
274-
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldChannelConfigurator =
275-
transportProvider.getChannelConfigurator();
276-
transportProvider.setChannelConfigurator(
277-
managedChannelBuilder -> {
278-
managedChannelBuilder.intercept(errorCountPerConnectionMetricTracker.getInterceptor());
279-
280-
if (oldChannelConfigurator != null) {
281-
managedChannelBuilder = oldChannelConfigurator.apply(managedChannelBuilder);
282-
}
283-
return managedChannelBuilder;
284-
});
285-
return errorCountPerConnectionMetricTracker;
286-
}
287-
288265
private static void setupCookieHolder(
289266
InstantiatingGrpcChannelProvider.Builder transportProvider) {
290267
ApiFunction<ManagedChannelBuilder, ManagedChannelBuilder> oldChannelConfigurator =

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/BuiltinMetricsConstants.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,7 @@ public class BuiltinMetricsConstants {
7070
static final String REMAINING_DEADLINE_NAME = "remaining_deadline";
7171
static final String CLIENT_BLOCKING_LATENCIES_NAME = "throttling_latencies";
7272
static final String PER_CONNECTION_ERROR_COUNT_NAME = "per_connection_error_count";
73+
static final String OUTSTANDING_RPCS_PER_CHANNEL_NAME = "connection_pool/outstanding_rpcs";
7374

7475
// Start allow list of metrics that will be exported as internal
7576
public static final Map<String, Set<String>> GRPC_METRICS =
@@ -140,6 +141,15 @@ public class BuiltinMetricsConstants {
140141
500_000.0,
141142
1_000_000.0));
142143

144+
// Buckets for outstanding RPCs per channel, max ~100
145+
private static final Aggregation AGGREGATION_OUTSTANDING_RPCS_HISTOGRAM =
146+
Aggregation.explicitBucketHistogram(
147+
ImmutableList.of(
148+
0.0, 5.0, 10.0, 15.0, 20.0, 25.0, 30.0, 35.0, 40.0, 45.0, 50.0, 55.0, 60.0, 65.0,
149+
70.0, 75.0, 80.0, 85.0, 90.0, 95.0, 100.0, 105.0, 110.0, 115.0, 120.0, 125.0, 130.0,
150+
135.0, 140.0, 145.0, 150.0, 155.0, 160.0, 165.0, 170.0, 175.0, 180.0, 185.0, 190.0,
151+
195.0, 200.0));
152+
143153
static final Set<AttributeKey> COMMON_ATTRIBUTES =
144154
ImmutableSet.of(
145155
BIGTABLE_PROJECT_ID_KEY,
@@ -181,6 +191,7 @@ static void defineView(
181191
viewMap.put(selector, view);
182192
}
183193

194+
// uses cloud.BigtableClient schema
184195
public static Map<InstrumentSelector, View> getInternalViews() {
185196
ImmutableMap.Builder<InstrumentSelector, View> views = ImmutableMap.builder();
186197
defineView(
@@ -192,6 +203,15 @@ public static Map<InstrumentSelector, View> getInternalViews() {
192203
ImmutableSet.<AttributeKey>builder()
193204
.add(BIGTABLE_PROJECT_ID_KEY, INSTANCE_ID_KEY, APP_PROFILE_KEY, CLIENT_NAME_KEY)
194205
.build());
206+
defineView(
207+
views,
208+
OUTSTANDING_RPCS_PER_CHANNEL_NAME,
209+
AGGREGATION_OUTSTANDING_RPCS_HISTOGRAM,
210+
InstrumentType.HISTOGRAM,
211+
"1",
212+
ImmutableSet.<AttributeKey>builder()
213+
.add(BIGTABLE_PROJECT_ID_KEY, INSTANCE_ID_KEY, APP_PROFILE_KEY, CLIENT_NAME_KEY)
214+
.build());
195215
return views.build();
196216
}
197217

Lines changed: 132 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,132 @@
1+
/*
2+
* Copyright 2025 Google LLC
3+
*
4+
* Licensed under the Apache License, Version 2.0 (the "License");
5+
* you may not use this file except in compliance with the License.
6+
* You may obtain a copy of the License at
7+
*
8+
* https://www.apache.org/licenses/LICENSE-2.0
9+
*
10+
* Unless required by applicable law or agreed to in writing, software
11+
* distributed under the License is distributed on an "AS IS" BASIS,
12+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13+
* See the License for the specific language governing permissions and
14+
* limitations under the License.
15+
*/
16+
package com.google.cloud.bigtable.data.v2.stub.metrics;
17+
18+
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.METER_NAME;
19+
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.OUTSTANDING_RPCS_PER_CHANNEL_NAME;
20+
import static com.google.cloud.bigtable.data.v2.stub.metrics.BuiltinMetricsConstants.PER_CONNECTION_ERROR_COUNT_NAME;
21+
22+
import com.google.api.core.InternalApi;
23+
import com.google.cloud.bigtable.gaxx.grpc.BigtableChannelObserver;
24+
import com.google.cloud.bigtable.gaxx.grpc.BigtableChannelPoolObserver;
25+
import io.opentelemetry.api.OpenTelemetry;
26+
import io.opentelemetry.api.common.Attributes;
27+
import io.opentelemetry.api.metrics.LongHistogram;
28+
import io.opentelemetry.api.metrics.Meter;
29+
import java.util.List;
30+
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.ScheduledFuture;
32+
import java.util.concurrent.TimeUnit;
33+
import java.util.concurrent.atomic.AtomicReference;
34+
import java.util.logging.Logger;
35+
import javax.annotation.Nullable;
36+
37+
@InternalApi("For internal use only")
38+
public class ChannelPoolMetricsTracer implements Runnable {
39+
private static final Logger logger = Logger.getLogger(ChannelPoolMetricsTracer.class.getName());
40+
41+
private static final int SAMPLING_PERIOD_SECONDS = 60;
42+
private final LongHistogram outstandingRpcsHistogram;
43+
private final LongHistogram perConnectionErrorCountHistogram;
44+
45+
private final AtomicReference<BigtableChannelPoolObserver> bigtableChannelInsightsProviderRef =
46+
new AtomicReference<>();
47+
private final AtomicReference<String> lbPolicyRef = new AtomicReference<>("ROUND_ROBIN");
48+
private final Attributes commonAttrs;
49+
50+
// Attributes for unary and streaming RPCs, built on demand in run()
51+
@Nullable private Attributes unaryAttributes;
52+
@Nullable private Attributes streamingAttributes;
53+
54+
public ChannelPoolMetricsTracer(OpenTelemetry openTelemetry, Attributes commonAttrs) {
55+
Meter meter = openTelemetry.getMeter(METER_NAME);
56+
this.commonAttrs = commonAttrs;
57+
this.outstandingRpcsHistogram =
58+
meter
59+
.histogramBuilder(OUTSTANDING_RPCS_PER_CHANNEL_NAME)
60+
.ofLongs()
61+
.setDescription(
62+
"A distribution of the number of outstanding RPCs per connection in the client pool, sampled periodically.")
63+
.setUnit("1")
64+
.build();
65+
66+
this.perConnectionErrorCountHistogram =
67+
meter
68+
.histogramBuilder(PER_CONNECTION_ERROR_COUNT_NAME)
69+
.ofLongs()
70+
.setDescription("Distribution of counts of channels per 'error count per minute'.")
71+
.setUnit("1")
72+
.build();
73+
}
74+
75+
/**
76+
* Registers the provider for the channel pool entries. This should be called by the component
77+
* that creates the BigtableChannelPool.
78+
*/
79+
public void registerChannelInsightsProvider(BigtableChannelPoolObserver channelInsightsProvider) {
80+
this.bigtableChannelInsightsProviderRef.set(channelInsightsProvider);
81+
}
82+
83+
/** Register the current lb policy * */
84+
public void registerLoadBalancingStrategy(String lbPolicy) {
85+
this.lbPolicyRef.set(lbPolicy);
86+
}
87+
88+
/** Starts the periodic collection. */
89+
public ScheduledFuture<?> start(ScheduledExecutorService scheduler) {
90+
return scheduler.scheduleAtFixedRate(
91+
this, SAMPLING_PERIOD_SECONDS, SAMPLING_PERIOD_SECONDS, TimeUnit.SECONDS);
92+
}
93+
94+
@Override
95+
public void run() {
96+
BigtableChannelPoolObserver channelInsightsProvider = bigtableChannelInsightsProviderRef.get();
97+
if (channelInsightsProvider == null) {
98+
logger.warning("No Bigtable ChannelPoolObserver available");
99+
return; // Not registered yet
100+
}
101+
String lbPolicy = lbPolicyRef.get();
102+
103+
// Build attributes if they haven't been built yet.
104+
if (unaryAttributes == null || streamingAttributes == null) {
105+
Attributes baseAttrs = commonAttrs.toBuilder().put("lb_policy", lbPolicy).build();
106+
this.unaryAttributes = baseAttrs.toBuilder().put("streaming", false).build();
107+
this.streamingAttributes = baseAttrs.toBuilder().put("streaming", true).build();
108+
}
109+
List<? extends BigtableChannelObserver> channelInsights =
110+
channelInsightsProvider.getChannelInfos();
111+
if (channelInsights == null || channelInsights.isEmpty()) {
112+
return;
113+
}
114+
for (BigtableChannelObserver info : channelInsights) {
115+
String transportTypeValue = info.isAltsChannel() ? "DIRECTPATH" : "CLOUDPATH";
116+
this.unaryAttributes =
117+
this.unaryAttributes.toBuilder().put("transport_type", transportTypeValue).build();
118+
this.streamingAttributes =
119+
this.streamingAttributes.toBuilder().put("transport_type", transportTypeValue).build();
120+
121+
long currentOutstandingUnaryRpcs = info.getOutstandingUnaryRpcs();
122+
long currentOutstandingStreamingRpcs = info.getOutstandingStreamingRpcs();
123+
// Record outstanding unary RPCs with streaming=false
124+
outstandingRpcsHistogram.record(currentOutstandingUnaryRpcs, unaryAttributes);
125+
// Record outstanding streaming RPCs with streaming=true
126+
outstandingRpcsHistogram.record(currentOutstandingStreamingRpcs, streamingAttributes);
127+
128+
long errors = info.getAndResetErrorCount();
129+
perConnectionErrorCountHistogram.record(errors, commonAttrs);
130+
}
131+
}
132+
}

google-cloud-bigtable/src/main/java/com/google/cloud/bigtable/data/v2/stub/metrics/ConnectionErrorCountInterceptor.java

Lines changed: 0 additions & 89 deletions
This file was deleted.

0 commit comments

Comments
 (0)