Skip to content

Commit f017e24

Browse files
authored
[Elastic Agent] Add docker composable dynamic provider. (elastic#20842)
* Add docker provider. * Add changelog. * Update docker start message to info.
1 parent 3390aa8 commit f017e24

6 files changed

Lines changed: 252 additions & 0 deletions

File tree

x-pack/elastic-agent/CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
- Add restart CLI cmd {pull}20359[20359]
1818
- Add new `synthetics/*` inputs to run Heartbeat {pull}20387[20387]
1919
- Users of the Docker image can now pass `FLEET_ENROLL_INSECURE=1` to include the `--insecure` flag with the `elastic-agent enroll` command {issue}20312[20312] {pull}20713[20713]
20+
- Add `docker` composable dynamic provider. {pull}20842[20842]
2021
- Add support for dynamic inputs with providers and `{{variable|"default"}}` substitution. {pull}20839[20839]
2122
- Add support for EQL based condition on inputs {pull}20994[20994]
2223
- Send `fleet.host.id` to Endpoint Security {pull}21042[21042]

x-pack/elastic-agent/pkg/agent/cmd/include.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package cmd
77
import (
88
// include the composable providers
99
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/agent"
10+
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/docker"
1011
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/env"
1112
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/host"
1213
_ "github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable/providers/local"
Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,24 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package docker
6+
7+
import (
8+
"time"
9+
10+
"github.com/elastic/beats/v7/libbeat/common/docker"
11+
)
12+
13+
// Config for docker provider
14+
type Config struct {
15+
Host string `config:"host"`
16+
TLS *docker.TLSConfig `config:"ssl"`
17+
CleanupTimeout time.Duration `config:"cleanup_timeout" validate:"positive"`
18+
}
19+
20+
// InitDefaults initializes the default values for the config.
21+
func (c *Config) InitDefaults() {
22+
c.Host = "unix:///var/run/docker.sock"
23+
c.CleanupTimeout = 60 * time.Second
24+
}
Lines changed: 157 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,157 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package docker
6+
7+
import (
8+
"fmt"
9+
"time"
10+
11+
"github.com/elastic/beats/v7/libbeat/common"
12+
"github.com/elastic/beats/v7/libbeat/common/bus"
13+
"github.com/elastic/beats/v7/libbeat/common/docker"
14+
"github.com/elastic/beats/v7/libbeat/common/safemapstr"
15+
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
16+
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/composable"
17+
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/config"
18+
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
19+
)
20+
21+
func init() {
22+
composable.Providers.AddDynamicProvider("docker", DynamicProviderBuilder)
23+
}
24+
25+
type dockerContainerData struct {
26+
container *docker.Container
27+
mapping map[string]interface{}
28+
processors []map[string]interface{}
29+
}
30+
type dynamicProvider struct {
31+
logger *logger.Logger
32+
config *Config
33+
}
34+
35+
// Run runs the environment context provider.
36+
func (c *dynamicProvider) Run(comm composable.DynamicProviderComm) error {
37+
watcher, err := docker.NewWatcher(c.logger, c.config.Host, c.config.TLS, false)
38+
if err != nil {
39+
// info only; return nil (do nothing)
40+
c.logger.Infof("Docker provider skipped, unable to connect: %s", err)
41+
return nil
42+
}
43+
startListener := watcher.ListenStart()
44+
stopListener := watcher.ListenStop()
45+
stoppers := map[string]*time.Timer{}
46+
stopTrigger := make(chan *dockerContainerData)
47+
48+
if err := watcher.Start(); err != nil {
49+
// info only; return nil (do nothing)
50+
c.logger.Infof("Docker provider skipped, unable to connect: %s", err)
51+
return nil
52+
}
53+
54+
go func() {
55+
for {
56+
select {
57+
case <-comm.Done():
58+
startListener.Stop()
59+
stopListener.Stop()
60+
61+
// Stop all timers before closing the channel
62+
for _, stopper := range stoppers {
63+
stopper.Stop()
64+
}
65+
close(stopTrigger)
66+
return
67+
case event := <-startListener.Events():
68+
data, err := generateData(event)
69+
if err != nil {
70+
c.logger.Errorf("%s", err)
71+
continue
72+
}
73+
if stopper, ok := stoppers[data.container.ID]; ok {
74+
c.logger.Debugf("container %s is restarting, aborting pending stop", data.container.ID)
75+
stopper.Stop()
76+
delete(stoppers, data.container.ID)
77+
return
78+
}
79+
comm.AddOrUpdate(data.container.ID, data.mapping, data.processors)
80+
case event := <-stopListener.Events():
81+
data, err := generateData(event)
82+
if err != nil {
83+
c.logger.Errorf("%s", err)
84+
continue
85+
}
86+
stopper := time.AfterFunc(c.config.CleanupTimeout, func() {
87+
stopTrigger <- data
88+
})
89+
stoppers[data.container.ID] = stopper
90+
case data := <-stopTrigger:
91+
if _, ok := stoppers[data.container.ID]; ok {
92+
delete(stoppers, data.container.ID)
93+
}
94+
comm.Remove(data.container.ID)
95+
}
96+
}
97+
}()
98+
99+
return nil
100+
}
101+
102+
// DynamicProviderBuilder builds the dynamic provider.
103+
func DynamicProviderBuilder(c *config.Config) (composable.DynamicProvider, error) {
104+
logger, err := logger.New("composable.providers.docker")
105+
if err != nil {
106+
return nil, err
107+
}
108+
var cfg Config
109+
if c == nil {
110+
c = config.New()
111+
}
112+
err = c.Unpack(&cfg)
113+
if err != nil {
114+
return nil, errors.New(err, "failed to unpack configuration")
115+
}
116+
return &dynamicProvider{logger, &cfg}, nil
117+
}
118+
119+
func generateData(event bus.Event) (*dockerContainerData, error) {
120+
container, ok := event["container"].(*docker.Container)
121+
if !ok {
122+
return nil, fmt.Errorf("unable to get container from watcher event")
123+
}
124+
125+
labelMap := common.MapStr{}
126+
processorLabelMap := common.MapStr{}
127+
for k, v := range container.Labels {
128+
safemapstr.Put(labelMap, k, v)
129+
processorLabelMap.Put(common.DeDot(k), v)
130+
}
131+
132+
data := &dockerContainerData{
133+
container: container,
134+
mapping: map[string]interface{}{
135+
"container": map[string]interface{}{
136+
"id": container.ID,
137+
"name": container.Name,
138+
"image": container.Image,
139+
"labels": labelMap,
140+
},
141+
},
142+
processors: []map[string]interface{}{
143+
{
144+
"add_fields": map[string]interface{}{
145+
"fields": map[string]interface{}{
146+
"id": container.ID,
147+
"name": container.Name,
148+
"image": container.Image,
149+
"labels": processorLabelMap,
150+
},
151+
"to": "container",
152+
},
153+
},
154+
},
155+
}
156+
return data, nil
157+
}
Lines changed: 64 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,64 @@
1+
// Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
2+
// or more contributor license agreements. Licensed under the Elastic License;
3+
// you may not use this file except in compliance with the Elastic License.
4+
5+
package docker
6+
7+
import (
8+
"testing"
9+
10+
"github.com/stretchr/testify/assert"
11+
"github.com/stretchr/testify/require"
12+
13+
"github.com/elastic/beats/v7/libbeat/common"
14+
"github.com/elastic/beats/v7/libbeat/common/bus"
15+
"github.com/elastic/beats/v7/libbeat/common/docker"
16+
)
17+
18+
func TestGenerateData(t *testing.T) {
19+
container := &docker.Container{
20+
ID: "abc",
21+
Name: "foobar",
22+
Labels: map[string]string{
23+
"do.not.include": "true",
24+
"co.elastic.logs/disable": "true",
25+
},
26+
}
27+
event := bus.Event{
28+
"container": container,
29+
}
30+
31+
data, err := generateData(event)
32+
require.NoError(t, err)
33+
mapping := map[string]interface{}{
34+
"container": map[string]interface{}{
35+
"id": container.ID,
36+
"name": container.Name,
37+
"image": container.Image,
38+
"labels": common.MapStr{
39+
"do": common.MapStr{"not": common.MapStr{"include": "true"}},
40+
"co": common.MapStr{"elastic": common.MapStr{"logs/disable": "true"}},
41+
},
42+
},
43+
}
44+
processors := []map[string]interface{}{
45+
{
46+
"add_fields": map[string]interface{}{
47+
"fields": map[string]interface{}{
48+
"id": container.ID,
49+
"name": container.Name,
50+
"image": container.Image,
51+
"labels": common.MapStr{
52+
"do_not_include": "true",
53+
"co_elastic_logs/disable": "true",
54+
},
55+
},
56+
"to": "container",
57+
},
58+
},
59+
}
60+
61+
assert.Equal(t, container, data.container)
62+
assert.Equal(t, mapping, data.mapping)
63+
assert.Equal(t, processors, data.processors)
64+
}

x-pack/elastic-agent/pkg/config/config.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -36,6 +36,11 @@ func LoadYAML(path string, opts ...ucfg.Option) (*Config, error) {
3636
return newConfigFrom(config), nil
3737
}
3838

39+
// New creates a new empty config.
40+
func New() *Config {
41+
return newConfigFrom(ucfg.New())
42+
}
43+
3944
// NewConfigFrom takes a interface and read the configuration like it was YAML.
4045
func NewConfigFrom(from interface{}, opts ...ucfg.Option) (*Config, error) {
4146
if len(opts) == 0 {

0 commit comments

Comments
 (0)