Skip to content

Commit 5ace360

Browse files
committed
sampling: require a default policy (#4729)
* sampling: require a default policy * beater/config: validate policies here too * beater: don't hang if runServer returns error If runServer returns an error without Stop being called, signal the "done" channel. # Conflicts: # changelogs/head.asciidoc
1 parent 9f0a6c7 commit 5ace360

7 files changed

Lines changed: 113 additions & 5 deletions

File tree

beater/beater.go

Lines changed: 12 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -196,14 +196,17 @@ func (bt *beater) start(ctx context.Context, cancelContext context.CancelFunc, b
196196
return nil, err
197197
}
198198
bt.stopServer = func() {
199-
defer close(done)
200-
defer closeTracer()
201199
if bt.config.ShutdownTimeout > 0 {
202200
time.AfterFunc(bt.config.ShutdownTimeout, cancelContext)
203201
}
204202
s.Stop()
205203
}
206204
s.Start()
205+
go func() {
206+
defer close(done)
207+
defer closeTracer()
208+
s.Wait()
209+
}()
207210
}
208211
return done, nil
209212
}
@@ -306,11 +309,18 @@ func (s *serverRunner) String() string {
306309
return "APMServer"
307310
}
308311

312+
// Stop stops the server.
309313
func (s *serverRunner) Stop() {
310314
s.stopOnce.Do(s.cancelRunServerContext)
315+
s.Wait()
316+
}
317+
318+
// Wait waits for the server to stop.
319+
func (s *serverRunner) Wait() {
311320
s.wg.Wait()
312321
}
313322

323+
// Start starts the server.
314324
func (s *serverRunner) Start() {
315325
s.wg.Add(1)
316326
go func() {

beater/config/config_test.go

Lines changed: 3 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -283,6 +283,7 @@ func TestUnpackConfig(t *testing.T) {
283283
"sampling.keep_unsampled": false,
284284
"sampling.tail": map[string]interface{}{
285285
"enabled": true,
286+
"policies": []map[string]interface{}{{"sample_rate": 0.5}},
286287
"interval": "2m",
287288
"ingest_rate_decay": 1.0,
288289
},
@@ -371,6 +372,7 @@ func TestUnpackConfig(t *testing.T) {
371372
KeepUnsampled: false,
372373
Tail: &TailSamplingConfig{
373374
Enabled: true,
375+
Policies: []TailSamplingPolicy{{SampleRate: 0.5}},
374376
ESConfig: elasticsearch.DefaultConfig(),
375377
Interval: 2 * time.Minute,
376378
IngestRateDecayFactor: 1.0,
@@ -564,7 +566,7 @@ func TestAgentConfig(t *testing.T) {
564566
}
565567

566568
func TestNewConfig_ESConfig(t *testing.T) {
567-
ucfg, err := common.NewConfigFrom(`{"rum.enabled":true,"api_key.enabled":true,"sampling.tail.enabled":true}`)
569+
ucfg, err := common.NewConfigFrom(`{"rum.enabled":true,"api_key.enabled":true,"sampling.tail.policies":[{"sample_rate": 0.5}]}`)
568570
require.NoError(t, err)
569571

570572
// no es config given

beater/config/sampling.go

Lines changed: 28 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,12 @@ type SamplingConfig struct {
4141
type TailSamplingConfig struct {
4242
Enabled bool `config:"enabled"`
4343

44-
Policies []TailSamplingPolicy `config:"policies"`
44+
// Policies holds tail-sampling policies.
45+
//
46+
// Policies must include at least one policy that matches all traces, to ensure
47+
// that dropping non-matching traces is intentional.
48+
Policies []TailSamplingPolicy `config:"policies"`
49+
4550
ESConfig *elasticsearch.Config `config:"elasticsearch"`
4651
Interval time.Duration `config:"interval" validate:"min=1s"`
4752
IngestRateDecayFactor float64 `config:"ingest_rate_decay" validate:"min=0, max=1"`
@@ -76,8 +81,30 @@ func (c *TailSamplingConfig) Unpack(in *common.Config) error {
7681
if err := in.Unpack(&cfg); err != nil {
7782
return errors.Wrap(err, "error unpacking tail sampling config")
7883
}
84+
cfg.Enabled = in.Enabled()
7985
*c = TailSamplingConfig(cfg)
8086
c.esConfigured = in.HasField("elasticsearch")
87+
return errors.Wrap(c.Validate(), "invalid tail sampling config")
88+
}
89+
90+
func (c *TailSamplingConfig) Validate() error {
91+
if !c.Enabled {
92+
return nil
93+
}
94+
if len(c.Policies) == 0 {
95+
return errors.New("no policies specified")
96+
}
97+
var anyDefaultPolicy bool
98+
for _, policy := range c.Policies {
99+
if policy == (TailSamplingPolicy{SampleRate: policy.SampleRate}) {
100+
// We have at least one default policy.
101+
anyDefaultPolicy = true
102+
break
103+
}
104+
}
105+
if !anyDefaultPolicy {
106+
return errors.New("no default (empty criteria) policy specified")
107+
}
81108
return nil
82109
}
83110

beater/config/sampling_test.go

Lines changed: 52 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,52 @@
1+
// Licensed to Elasticsearch B.V. under one or more contributor
2+
// license agreements. See the NOTICE file distributed with
3+
// this work for additional information regarding copyright
4+
// ownership. Elasticsearch B.V. licenses this file to you under
5+
// the Apache License, Version 2.0 (the "License"); you may
6+
// not use this file except in compliance with the License.
7+
// You may obtain a copy of the License at
8+
//
9+
// http://www.apache.org/licenses/LICENSE-2.0
10+
//
11+
// Unless required by applicable law or agreed to in writing,
12+
// software distributed under the License is distributed on an
13+
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
14+
// KIND, either express or implied. See the License for the
15+
// specific language governing permissions and limitations
16+
// under the License.
17+
18+
package config
19+
20+
import (
21+
"testing"
22+
23+
"github.com/stretchr/testify/assert"
24+
25+
"github.com/elastic/beats/v7/libbeat/common"
26+
)
27+
28+
func TestSamplingPoliciesValidation(t *testing.T) {
29+
t.Run("MinimallyValid", func(t *testing.T) {
30+
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
31+
"sampling.tail.policies": []map[string]interface{}{{
32+
"sample_rate": 0.5,
33+
}},
34+
}), nil)
35+
assert.NoError(t, err)
36+
})
37+
t.Run("NoPolicies", func(t *testing.T) {
38+
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
39+
"sampling.tail.enabled": true,
40+
}), nil)
41+
assert.EqualError(t, err, "Error processing configuration: invalid tail sampling config: no policies specified accessing 'sampling.tail'")
42+
})
43+
t.Run("NoDefaultPolicies", func(t *testing.T) {
44+
_, err := NewConfig(common.MustNewConfigFrom(map[string]interface{}{
45+
"sampling.tail.policies": []map[string]interface{}{{
46+
"service.name": "foo",
47+
"sample_rate": 0.5,
48+
}},
49+
}), nil)
50+
assert.EqualError(t, err, "Error processing configuration: invalid tail sampling config: no default (empty criteria) policy specified accessing 'sampling.tail'")
51+
})
52+
}

x-pack/apm-server/sampling/config.go

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,9 @@ type LocalSamplingConfig struct {
4646
// Policies holds local tail-sampling policies. Policies are matched in the
4747
// order provided. Policies should therefore be ordered from most to least
4848
// specific.
49+
//
50+
// Policies must include at least one policy that matches all traces, to ensure
51+
// that dropping non-matching traces is intentional.
4952
Policies []Policy
5053

5154
// IngestRateDecayFactor holds the ingest rate decay factor, used for calculating
@@ -173,10 +176,17 @@ func (config LocalSamplingConfig) validate() error {
173176
if len(config.Policies) == 0 {
174177
return errors.New("Policies unspecified")
175178
}
179+
var anyDefaultPolicy bool
176180
for i, policy := range config.Policies {
177181
if err := policy.validate(); err != nil {
178182
return errors.Wrapf(err, "Policy %d invalid", i)
179183
}
184+
if policy.PolicyCriteria == (PolicyCriteria{}) {
185+
anyDefaultPolicy = true
186+
}
187+
}
188+
if !anyDefaultPolicy {
189+
return errors.New("Policies does not contain a default (empty criteria) policy")
180190
}
181191
if config.IngestRateDecayFactor <= 0 || config.IngestRateDecayFactor > 1 {
182192
return errors.New("IngestRateDecayFactor unspecified or out of range (0,1]")

x-pack/apm-server/sampling/config_test.go

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,7 +38,11 @@ func TestNewProcessorConfigInvalid(t *testing.T) {
3838
config.MaxDynamicServices = 1
3939

4040
assertInvalidConfigError("invalid local sampling config: Policies unspecified")
41-
config.Policies = []sampling.Policy{{}}
41+
config.Policies = []sampling.Policy{{
42+
PolicyCriteria: sampling.PolicyCriteria{ServiceName: "foo"},
43+
}}
44+
assertInvalidConfigError("invalid local sampling config: Policies does not contain a default (empty criteria) policy")
45+
config.Policies[0].PolicyCriteria = sampling.PolicyCriteria{}
4246
for _, invalid := range []float64{-1, 1.0, 2.0} {
4347
config.Policies[0].SampleRate = invalid
4448
assertInvalidConfigError("invalid local sampling config: Policy 0 invalid: SampleRate unspecified or out of range [0,1)")

x-pack/apm-server/sampling/processor_test.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -292,6 +292,9 @@ func TestProcessLocalTailSamplingPolicyOrder(t *testing.T) {
292292
}, {
293293
PolicyCriteria: sampling.PolicyCriteria{ServiceName: "service_name"},
294294
SampleRate: 0.1,
295+
}, {
296+
PolicyCriteria: sampling.PolicyCriteria{},
297+
SampleRate: 0,
295298
}}
296299
config.FlushInterval = 10 * time.Millisecond
297300
published := make(chan string)

0 commit comments

Comments
 (0)