Skip to content

Commit 05c9065

Browse files
authored
Rename to management.Manager, add UpdateStatus to Manager interface. (elastic#19114)
* Rename management.ConfigManager to management.Manager, add UpdateStatus to Manager interface. * Update docstring for Failed status. * Add to developer changelog. * Add StatusReporter interface, wrap client.Status in lock. Add tests for statusToProtoStatus.
1 parent 2991320 commit 05c9065

10 files changed

Lines changed: 149 additions & 36 deletions

File tree

CHANGELOG-developer.next.asciidoc

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -46,6 +46,8 @@ The list below covers the major changes between 7.0.0-rc2 and master only.
4646
Your magefile.go will require a change to adapt the devtool API. See the pull request for
4747
more details. {pull}18148[18148]
4848
- The Elasticsearch client settings expect the API key to be raw (not base64-encoded). {issue}18939[18939] {pull}18945[18945]
49+
- `management.ConfigManager` has been renamed to `management.Manager`. {pull}19114[19114]
50+
- `UpdateStatus` has been added to the `management.Manager` interface. {pull}19114[19114]
4951

5052
==== Bugfixes
5153

filebeat/beater/filebeat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -114,7 +114,7 @@ func New(b *beat.Beat, rawConfig *common.Config) (beat.Beater, error) {
114114
haveEnabledInputs = true
115115
}
116116

117-
if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil && !b.ConfigManager.Enabled() {
117+
if !config.ConfigInput.Enabled() && !config.ConfigModules.Enabled() && !haveEnabledInputs && config.Autodiscover == nil && !b.Manager.Enabled() {
118118
if !b.InSetupCmd {
119119
return nil, errors.New("no modules or inputs enabled and configuration reloading disabled. What files do you want me to watch?")
120120
}

heartbeat/beater/heartbeat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -86,7 +86,7 @@ func (bt *Heartbeat) Run(b *beat.Beat) error {
8686
return err
8787
}
8888

89-
if b.ConfigManager.Enabled() {
89+
if b.Manager.Enabled() {
9090
bt.RunCentralMgmtMonitors(b)
9191
}
9292

libbeat/beat/beat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -66,7 +66,7 @@ type Beat struct {
6666

6767
Fields []byte // Data from fields.yml
6868

69-
ConfigManager management.ConfigManager // config manager
69+
Manager management.Manager // manager
7070

7171
Keystore keystore.Keystore
7272
}

libbeat/cmd/instance/beat.go

Lines changed: 6 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -330,12 +330,12 @@ func (b *Beat) createBeater(bt beat.Creator) (beat.Beater, error) {
330330

331331
// Report central management state
332332
mgmt := monitoring.GetNamespace("state").GetRegistry().NewRegistry("management")
333-
monitoring.NewBool(mgmt, "enabled").Set(b.ConfigManager.Enabled())
333+
monitoring.NewBool(mgmt, "enabled").Set(b.Manager.Enabled())
334334

335335
debugf("Initializing output plugins")
336336
outputEnabled := b.Config.Output.IsSet() && b.Config.Output.Config().Enabled()
337337
if !outputEnabled {
338-
if b.ConfigManager.Enabled() {
338+
if b.Manager.Enabled() {
339339
logp.Info("Output is configured through Central Management")
340340
} else {
341341
msg := "No outputs are defined. Please define one under the output section."
@@ -462,8 +462,8 @@ func (b *Beat) launch(settings Settings, bt beat.Creator) error {
462462
logp.Info("%s start running.", b.Info.Beat)
463463

464464
// Launch config manager
465-
b.ConfigManager.Start(beater.Stop)
466-
defer b.ConfigManager.Stop()
465+
b.Manager.Start(beater.Stop)
466+
defer b.Manager.Stop()
467467

468468
return beater.Run(&b.Beat)
469469
}
@@ -643,12 +643,12 @@ func (b *Beat) configure(settings Settings) error {
643643
logp.Info("Beat ID: %v", b.Info.ID)
644644

645645
// initialize config manager
646-
b.ConfigManager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID)
646+
b.Manager, err = management.Factory(b.Config.Management)(b.Config.Management, reload.Register, b.Beat.Info.ID)
647647
if err != nil {
648648
return err
649649
}
650650

651-
if err := b.ConfigManager.CheckRawConfig(b.RawConfig); err != nil {
651+
if err := b.Manager.CheckRawConfig(b.RawConfig); err != nil {
652652
return err
653653
}
654654

libbeat/management/management.go

Lines changed: 69 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,11 +18,36 @@
1818
package management
1919

2020
import (
21+
"sync"
22+
2123
"github.com/gofrs/uuid"
2224

2325
"github.com/elastic/beats/v7/libbeat/common"
2426
"github.com/elastic/beats/v7/libbeat/common/reload"
2527
"github.com/elastic/beats/v7/libbeat/feature"
28+
"github.com/elastic/beats/v7/libbeat/logp"
29+
)
30+
31+
// Status describes the current status of the beat.
32+
type Status int
33+
34+
const (
35+
// Unknown is initial status when none has been reported.
36+
Unknown Status = iota
37+
// Starting is status describing application is starting.
38+
Starting
39+
// Configuring is status describing application is configuring.
40+
Configuring
41+
// Running is status describing application is running.
42+
Running
43+
// Degraded is status describing application is degraded.
44+
Degraded
45+
// Failed is status describing application is failed. This status should
46+
// only be used in the case the beat should stop running as the failure
47+
// cannot be recovered.
48+
Failed
49+
// Stopping is status describing application is stopping.
50+
Stopping
2651
)
2752

2853
// Namespace is the feature namespace for queue definition.
@@ -33,27 +58,36 @@ var DebugK = "centralmgmt"
3358

3459
var centralMgmtKey = "x-pack-cm"
3560

36-
// ConfigManager interacts with the beat to update configurations
37-
// from an external source
38-
type ConfigManager interface {
39-
// Enabled returns true if config manager is enabled
61+
// StatusReporter provides a method to update current status of the beat.
62+
type StatusReporter interface {
63+
// UpdateStatus called when the status of the beat has changed.
64+
UpdateStatus(status Status, msg string)
65+
}
66+
67+
// Manager interacts with the beat to provide status updates and to receive
68+
// configurations.
69+
type Manager interface {
70+
StatusReporter
71+
72+
// Enabled returns true if manager is enabled.
4073
Enabled() bool
4174

42-
// Start the config manager
43-
Start(func())
75+
// Start the config manager giving it a stopFunc callback
76+
// so the beat can be told when to stop.
77+
Start(stopFunc func())
4478

45-
// Stop the config manager
79+
// Stop the config manager.
4680
Stop()
4781

48-
// CheckRawConfig check settings are correct before launching the beat
82+
// CheckRawConfig check settings are correct before launching the beat.
4983
CheckRawConfig(cfg *common.Config) error
5084
}
5185

5286
// PluginFunc for creating FactoryFunc if it matches a config
5387
type PluginFunc func(*common.Config) FactoryFunc
5488

5589
// FactoryFunc for creating a config manager
56-
type FactoryFunc func(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, error)
90+
type FactoryFunc func(*common.Config, *reload.Registry, uuid.UUID) (Manager, error)
5791

5892
// Register a config manager
5993
func Register(name string, fn PluginFunc, stability feature.Stability) {
@@ -91,13 +125,32 @@ func defaultModeConfig() *modeConfig {
91125
}
92126

93127
// nilManager, fallback when no manager is present
94-
type nilManager struct{}
128+
type nilManager struct {
129+
logger *logp.Logger
130+
lock sync.Mutex
131+
status Status
132+
msg string
133+
}
95134

96-
func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (ConfigManager, error) {
97-
return nilManager{}, nil
135+
func nilFactory(*common.Config, *reload.Registry, uuid.UUID) (Manager, error) {
136+
log := logp.NewLogger("mgmt")
137+
return &nilManager{
138+
logger: log,
139+
status: Unknown,
140+
msg: "",
141+
}, nil
98142
}
99143

100-
func (nilManager) Enabled() bool { return false }
101-
func (nilManager) Start(_ func()) {}
102-
func (nilManager) Stop() {}
103-
func (nilManager) CheckRawConfig(cfg *common.Config) error { return nil }
144+
func (*nilManager) Enabled() bool { return false }
145+
func (*nilManager) Start(_ func()) {}
146+
func (*nilManager) Stop() {}
147+
func (*nilManager) CheckRawConfig(cfg *common.Config) error { return nil }
148+
func (n *nilManager) UpdateStatus(status Status, msg string) {
149+
n.lock.Lock()
150+
defer n.lock.Unlock()
151+
if n.status != status || n.msg != msg {
152+
n.status = status
153+
n.msg = msg
154+
n.logger.Infof("Status change to %s: %s", status, msg)
155+
}
156+
}

metricbeat/beater/metricbeat.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -134,7 +134,7 @@ func newMetricbeat(b *beat.Beat, c *common.Config, options ...Option) (*Metricbe
134134
return nil, errors.Wrap(err, "error reading configuration file")
135135
}
136136

137-
dynamicCfgEnabled := config.ConfigModules.Enabled() || config.Autodiscover != nil || b.ConfigManager.Enabled()
137+
dynamicCfgEnabled := config.ConfigModules.Enabled() || config.Autodiscover != nil || b.Manager.Enabled()
138138
if !dynamicCfgEnabled && len(config.Modules) == 0 {
139139
return nil, mb.ErrEmptyConfig
140140
}

x-pack/libbeat/management/fleet/manager.go

Lines changed: 46 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@ import (
99
"fmt"
1010
"os"
1111
"sort"
12+
"sync"
1213

1314
"github.com/gofrs/uuid"
1415
"github.com/pkg/errors"
@@ -35,12 +36,15 @@ type Manager struct {
3536
registry *reload.Registry
3637
blacklist *xmanagement.ConfigBlacklist
3738
client *client.Client
39+
lock sync.Mutex
40+
status management.Status
41+
msg string
3842

3943
stopFunc func()
4044
}
4145

4246
// NewFleetManager returns a X-Pack Beats Fleet Management manager.
43-
func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) {
47+
func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) {
4448
c := defaultConfig()
4549
if config.Enabled() {
4650
if err := config.Unpack(&c); err != nil {
@@ -51,7 +55,7 @@ func NewFleetManager(config *common.Config, registry *reload.Registry, beatUUID
5155
}
5256

5357
// NewFleetManagerWithConfig returns a X-Pack Beats Fleet Management manager.
54-
func NewFleetManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) {
58+
func NewFleetManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) {
5559
log := logp.NewLogger(management.DebugK)
5660

5761
m := &Manager{
@@ -122,38 +126,51 @@ func (cm *Manager) CheckRawConfig(cfg *common.Config) error {
122126
return nil
123127
}
124128

129+
// UpdateStatus updates the manager with the current status for the beat.
130+
func (cm *Manager) UpdateStatus(status management.Status, msg string) {
131+
cm.lock.Lock()
132+
defer cm.lock.Unlock()
133+
134+
if cm.status != status || cm.msg != msg {
135+
cm.status = status
136+
cm.msg = msg
137+
cm.client.Status(statusToProtoStatus(status), msg)
138+
cm.logger.Infof("Status change to %s: %s", status, msg)
139+
}
140+
}
141+
125142
func (cm *Manager) OnConfig(s string) {
126-
cm.client.Status(proto.StateObserved_CONFIGURING, "Updating configuration")
143+
cm.UpdateStatus(management.Configuring, "Updating configuration")
127144

128145
var configMap common.MapStr
129146
uconfig, err := common.NewConfigFrom(s)
130147
if err != nil {
131148
err = errors.Wrap(err, "config blocks unsuccessfully generated")
132149
cm.logger.Error(err)
133-
cm.client.Status(proto.StateObserved_FAILED, err.Error())
150+
cm.UpdateStatus(management.Failed, err.Error())
134151
return
135152
}
136153

137154
err = uconfig.Unpack(&configMap)
138155
if err != nil {
139156
err = errors.Wrap(err, "config blocks unsuccessfully generated")
140157
cm.logger.Error(err)
141-
cm.client.Status(proto.StateObserved_FAILED, err.Error())
158+
cm.UpdateStatus(management.Failed, err.Error())
142159
return
143160
}
144161

145162
blocks, err := cm.toConfigBlocks(configMap)
146163
if err != nil {
147164
err = errors.Wrap(err, "could not apply the configuration")
148165
cm.logger.Error(err)
149-
cm.client.Status(proto.StateObserved_FAILED, err.Error())
166+
cm.UpdateStatus(management.Failed, err.Error())
150167
return
151168
}
152169

153170
if errs := cm.apply(blocks); !errs.IsEmpty() {
154171
err = errors.Wrap(err, "could not apply the configuration")
155172
cm.logger.Error(err)
156-
cm.client.Status(proto.StateObserved_FAILED, err.Error())
173+
cm.UpdateStatus(management.Failed, err.Error())
157174
return
158175
}
159176

@@ -285,3 +302,25 @@ func (cm *Manager) toConfigBlocks(cfg common.MapStr) (api.ConfigBlocks, error) {
285302

286303
return res, nil
287304
}
305+
306+
func statusToProtoStatus(status management.Status) proto.StateObserved_Status {
307+
switch status {
308+
case management.Unknown:
309+
// unknown is reported as healthy, as the status is unknown
310+
return proto.StateObserved_HEALTHY
311+
case management.Starting:
312+
return proto.StateObserved_STARTING
313+
case management.Configuring:
314+
return proto.StateObserved_CONFIGURING
315+
case management.Running:
316+
return proto.StateObserved_HEALTHY
317+
case management.Degraded:
318+
return proto.StateObserved_DEGRADED
319+
case management.Failed:
320+
return proto.StateObserved_FAILED
321+
case management.Stopping:
322+
return proto.StateObserved_STOPPING
323+
}
324+
// unknown status, still reported as healthy
325+
return proto.StateObserved_HEALTHY
326+
}

x-pack/libbeat/management/fleet/manager_test.go

Lines changed: 15 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -7,9 +7,13 @@ package fleet
77
import (
88
"testing"
99

10-
"github.com/elastic/beats/v7/libbeat/common"
10+
"github.com/stretchr/testify/assert"
11+
12+
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
1113

14+
"github.com/elastic/beats/v7/libbeat/common"
1215
"github.com/elastic/beats/v7/libbeat/common/reload"
16+
"github.com/elastic/beats/v7/libbeat/management"
1317
)
1418

1519
func TestConfigBlocks(t *testing.T) {
@@ -53,6 +57,16 @@ output:
5357
}
5458
}
5559

60+
func TestStatusToProtoStatus(t *testing.T) {
61+
assert.Equal(t, proto.StateObserved_HEALTHY, statusToProtoStatus(management.Unknown))
62+
assert.Equal(t, proto.StateObserved_STARTING, statusToProtoStatus(management.Starting))
63+
assert.Equal(t, proto.StateObserved_CONFIGURING, statusToProtoStatus(management.Configuring))
64+
assert.Equal(t, proto.StateObserved_HEALTHY, statusToProtoStatus(management.Running))
65+
assert.Equal(t, proto.StateObserved_DEGRADED, statusToProtoStatus(management.Degraded))
66+
assert.Equal(t, proto.StateObserved_FAILED, statusToProtoStatus(management.Failed))
67+
assert.Equal(t, proto.StateObserved_STOPPING, statusToProtoStatus(management.Stopping))
68+
}
69+
5670
type dummyReloadable struct{}
5771

5872
func (dummyReloadable) Reload(config *reload.ConfigWithMeta) error {

x-pack/libbeat/management/manager.go

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -43,7 +43,7 @@ type ConfigManager struct {
4343
}
4444

4545
// NewConfigManager returns a X-Pack Beats Central Management manager
46-
func NewConfigManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) {
46+
func NewConfigManager(config *common.Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) {
4747
c := defaultConfig()
4848
if config.Enabled() {
4949
if err := config.Unpack(&c); err != nil {
@@ -54,7 +54,7 @@ func NewConfigManager(config *common.Config, registry *reload.Registry, beatUUID
5454
}
5555

5656
// NewConfigManagerWithConfig returns a X-Pack Beats Central Management manager
57-
func NewConfigManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.ConfigManager, error) {
57+
func NewConfigManagerWithConfig(c *Config, registry *reload.Registry, beatUUID uuid.UUID) (management.Manager, error) {
5858
var client *api.Client
5959
var cache *Cache
6060
var blacklist *ConfigBlacklist
@@ -152,6 +152,11 @@ func (cm *ConfigManager) CheckRawConfig(cfg *common.Config) error {
152152
return nil
153153
}
154154

155+
// UpdateStatus updates the manager with the current status for the beat.
156+
func (cm *ConfigManager) UpdateStatus(_ management.Status, _ string) {
157+
// do nothing; no longer under development and has been deprecated
158+
}
159+
155160
func (cm *ConfigManager) worker() {
156161
defer cm.wg.Done()
157162

0 commit comments

Comments
 (0)