Skip to content

Commit ffaf876

Browse files
committed
sql: use adaptive sampling rate for telemetry logging
Resolves #70553 Previously, telemetry logging used a configurable QPS threshold and sampling rate, for which we would log all statements if we were under the QPS threshold, and then start sampling at the given rate once at the threshold. Using this technique meant that we will often see a sharp decreaes in telemetry logging once the sampling rate increases, as sampling rates would typically need to be at low values to accomodate a high QPS. This commit replaces the above technique with an adaptive sampling rate which merely logs events to telemetry at a maximum frequency. Rather than relying on QPS, we will simply track when we have last logged to the telemtry channel, and decide whether or not to log a given event accordingly. Release note (sql change): The cluster setting `sql.telemetry.query_sampling.qps_threshold`, and `sql.telemetry.query_sampling.sample_rate` have been removed. A new setting, `sql.telemetry.query_sampling.max_event_frequency` has been introduced, with default value of 10 events per second.
1 parent 150d5b7 commit ffaf876

10 files changed

Lines changed: 112 additions & 720 deletions

pkg/settings/float.go

Lines changed: 0 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -162,11 +162,3 @@ func PositiveFloat(v float64) error {
162162
}
163163
return nil
164164
}
165-
166-
// FloatBetweenZeroAndOneInclusive can be passed to RegisterFloatSetting.
167-
func FloatBetweenZeroAndOneInclusive(v float64) error {
168-
if math.Signbit(v) || v > 1 {
169-
return errors.Errorf("must set to value between 0 and 1 inclusive: %f", v)
170-
}
171-
return nil
172-
}

pkg/settings/registry.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -105,6 +105,8 @@ var retiredSettings = map[string]struct{}{
105105
"sql.distsql.prefer_local_execution.enabled": {},
106106
"kv.follower_read.target_multiple": {},
107107
"kv.closed_timestamp.close_fraction": {},
108+
"sql.telemetry.query_sampling.qps_threshold": {},
109+
"sql.telemetry.query_sampling.sample_rate": {},
108110

109111
// removed as of 22.1.
110112
"sql.defaults.drop_enum_value.enabled": {},

pkg/sql/BUILD.bazel

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -145,7 +145,6 @@ go_library(
145145
"prepared_stmt.go",
146146
"privileged_accessor.go",
147147
"project_set.go",
148-
"query_sampling.go",
149148
"reassign_owned_by.go",
150149
"recursive_cte.go",
151150
"refresh_materialized_view.go",
@@ -485,7 +484,6 @@ go_test(
485484
"plan_opt_test.go",
486485
"planner_test.go",
487486
"privileged_accessor_test.go",
488-
"query_sampling_test.go",
489487
"rand_test.go",
490488
"region_util_test.go",
491489
"rename_test.go",

pkg/sql/conn_executor.go

Lines changed: 1 addition & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -353,9 +353,7 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
353353
}),
354354
}
355355

356-
telemetryLoggingMetrics := NewTelemetryLoggingMetrics(
357-
telemetrySmoothingAlpha.Get(&cfg.Settings.SV),
358-
cfg.getTelemetryRollingInterval())
356+
telemetryLoggingMetrics := &TelemetryLoggingMetrics{}
359357

360358
telemetryLoggingMetrics.Knobs = cfg.TelemetryLoggingTestingKnobs
361359
s.TelemetryLoggingMetrics = telemetryLoggingMetrics
@@ -378,14 +376,6 @@ func NewServer(cfg *ExecutorConfig, pool *mon.BytesMonitor) *Server {
378376
return s
379377
}
380378

381-
func (cfg *ExecutorConfig) getTelemetryRollingInterval() int64 {
382-
if cfg.TelemetryLoggingTestingKnobs != nil && cfg.TelemetryLoggingTestingKnobs.getRollingIntervalLength != nil {
383-
return cfg.TelemetryLoggingTestingKnobs.getRollingIntervalLength()
384-
}
385-
386-
return telemetryRollingInterval.Get(&cfg.Settings.SV)
387-
}
388-
389379
func makeMetrics(cfg *ExecutorConfig, internal bool) Metrics {
390380
return Metrics{
391381
EngineMetrics: EngineMetrics{

pkg/sql/conn_executor_exec.go

Lines changed: 0 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -994,7 +994,6 @@ func (ex *connExecutor) dispatchToExecutionEngine(
994994
ex.statsCollector.PhaseTimes().GetSessionPhaseTime(sessionphase.SessionQueryReceived),
995995
&ex.extraTxnState.hasAdminRoleCache,
996996
ex.server.TelemetryLoggingMetrics,
997-
ex.rng,
998997
)
999998
}()
1000999

@@ -1851,10 +1850,6 @@ func (ex *connExecutor) handleAutoCommit(
18511850
// statement counter for stmt's type.
18521851
func (ex *connExecutor) incrementStartedStmtCounter(ast tree.Statement) {
18531852
ex.metrics.StartedStatementCounters.incrementCount(ex, ast)
1854-
if ex.executorType != executorTypeInternal {
1855-
// Update the non-internal QPS estimation.
1856-
ex.server.TelemetryLoggingMetrics.updateRollingQueryCounts()
1857-
}
18581853
}
18591854

18601855
// incrementExecutedStmtCounter increments the appropriate executed

pkg/sql/exec_log.go

Lines changed: 11 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,6 @@ import (
1414
"bytes"
1515
"context"
1616
"fmt"
17-
"math/rand"
1817
"time"
1918

2019
"github.com/cockroachdb/cockroach/pkg/settings"
@@ -142,9 +141,8 @@ func (p *planner) maybeLogStatement(
142141
queryReceived time.Time,
143142
hasAdminRoleCache *HasAdminRoleCache,
144143
telemetryLoggingMetrics *TelemetryLoggingMetrics,
145-
rng *rand.Rand,
146144
) {
147-
p.maybeLogStatementInternal(ctx, execType, numRetries, txnCounter, rows, err, queryReceived, hasAdminRoleCache, telemetryLoggingMetrics, rng)
145+
p.maybeLogStatementInternal(ctx, execType, numRetries, txnCounter, rows, err, queryReceived, hasAdminRoleCache, telemetryLoggingMetrics)
148146
}
149147

150148
func (p *planner) maybeLogStatementInternal(
@@ -155,7 +153,6 @@ func (p *planner) maybeLogStatementInternal(
155153
startTime time.Time,
156154
hasAdminRoleCache *HasAdminRoleCache,
157155
telemetryMetrics *TelemetryLoggingMetrics,
158-
rng *rand.Rand,
159156
) {
160157
// Note: if you find the code below crashing because p.execCfg == nil,
161158
// do not add a test "if p.execCfg == nil { do nothing }" !
@@ -169,8 +166,7 @@ func (p *planner) maybeLogStatementInternal(
169166
slowQueryLogEnabled := slowLogThreshold != 0
170167
slowInternalQueryLogEnabled := slowInternalQueryLogEnabled.Get(&p.execCfg.Settings.SV)
171168
auditEventsDetected := len(p.curPlan.auditEvents) != 0
172-
sampleRate := telemetrySampleRate.Get(&p.execCfg.Settings.SV)
173-
qpsThreshold := telemetryQPSThreshold.Get(&p.execCfg.Settings.SV)
169+
maxEventFrequency := telemetryMaxEventFrequency.Get(&p.execCfg.Settings.SV)
174170

175171
// We only consider non-internal SQL statements for telemetry logging.
176172
telemetryLoggingEnabled := telemetryLoggingEnabled.Get(&p.execCfg.Settings.SV) && execType != executorTypeInternal
@@ -366,28 +362,20 @@ func (p *planner) maybeLogStatementInternal(
366362
}
367363

368364
if telemetryLoggingEnabled {
369-
smoothQPS := telemetryMetrics.expSmoothQPS()
370-
useSamplingMethod := p.stmt.AST.StatementType() == tree.TypeDML && smoothQPS > qpsThreshold
371-
alwaysReportQueries := !useSamplingMethod
372-
// If we DO NOT need to sample the event, log immediately to the telemetry
373-
// channel. Otherwise, log the event to the telemetry channel if it has been
374-
// sampled.
375-
if alwaysReportQueries {
365+
// We only log to the telemetry channel if enough time has elapsed from
366+
// the last event emission.
367+
requiredTimeElapsed := 1.0 / float64(maxEventFrequency)
368+
if p.stmt.AST.StatementType() != tree.TypeDML {
369+
requiredTimeElapsed = 0
370+
}
371+
if telemetryMetrics.maybeUpdateLastEmittedTime(telemetryMetrics.timeNow(), requiredTimeElapsed) {
376372
skippedQueries := telemetryMetrics.resetSkippedQueryCount()
377373
p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SampledQuery{
378374
CommonSQLExecDetails: execDetails,
379375
SkippedQueries: skippedQueries,
380376
}})
381-
} else if useSamplingMethod {
382-
if rng.Float64() < sampleRate {
383-
skippedQueries := telemetryMetrics.resetSkippedQueryCount()
384-
p.logEventsOnlyExternally(ctx, eventLogEntry{event: &eventpb.SampledQuery{
385-
CommonSQLExecDetails: execDetails,
386-
SkippedQueries: skippedQueries,
387-
}})
388-
} else {
389-
telemetryMetrics.incSkippedQueryCount()
390-
}
377+
} else {
378+
telemetryMetrics.incSkippedQueryCount()
391379
}
392380
}
393381
}

pkg/sql/query_sampling.go

Lines changed: 0 additions & 138 deletions
This file was deleted.

0 commit comments

Comments
 (0)