Skip to content

Commit bb98f2a

Browse files
fix: send upgrade action to units before adding it to bkgActions (#9634)
* fix: refactor coordinator Upgrade to use opts and introduce pre-upgrade callback to properly handle errs and upgrade details * ci: add unit-tests * doc: add changelog fragment * fix: rename notifyUnitsOfProxiedAction to notifyUnitsOfProxiedActionFn * doc: add comment for notifyUnitsOfProxiedActionFn unit-test assertion
1 parent 6918b7b commit bb98f2a

10 files changed

Lines changed: 508 additions & 29 deletions

File tree

.mockery.yaml

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,9 @@ packages:
2323
logLevelSetter:
2424
config:
2525
mockname: "LogLevelSetter"
26+
upgradeCoordinator:
27+
config:
28+
mockname: "UpgradeCoordinator"
2629
github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker:
2730
interfaces:
2831
Acker:
Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,5 @@
1+
kind: bug-fix
2+
summary: ensure failed upgrade actions are removed from queue and details are set
3+
component: elastic-agent
4+
pr: https://github.com/elastic/elastic-agent/pull/9634
5+
issue: https://github.com/elastic/elastic-agent/issues/9629

internal/pkg/agent/application/actions/handlers/handler_action_upgrade.go

Lines changed: 20 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,8 @@ import (
1010
"fmt"
1111
"sync"
1212

13+
"github.com/elastic/elastic-agent-libs/logp"
14+
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
1315
"github.com/elastic/elastic-agent/internal/pkg/fleetapi"
1416
"github.com/elastic/elastic-agent/internal/pkg/fleetapi/acker"
1517
"github.com/elastic/elastic-agent/pkg/core/logger"
@@ -26,15 +28,17 @@ type Upgrade struct {
2628
bkgCancel context.CancelFunc
2729
bkgMutex sync.Mutex
2830

29-
tamperProtectionFn func() bool // allows to inject the flag for tests, defaults to features.TamperProtection
31+
tamperProtectionFn func() bool // allows to inject the flag for tests, defaults to features.TamperProtection
32+
notifyUnitsOfProxiedActionFn func(ctx context.Context, log *logp.Logger, action dispatchableAction, ucs []unitWithComponent, performAction performActionFunc) error // allows to inject the function for tests, defaults to notifyUnitsOfProxiedAction
3033
}
3134

3235
// NewUpgrade creates a new Upgrade handler.
3336
func NewUpgrade(log *logger.Logger, coord upgradeCoordinator) *Upgrade {
3437
return &Upgrade{
35-
log: log,
36-
coord: coord,
37-
tamperProtectionFn: features.TamperProtection,
38+
log: log,
39+
coord: coord,
40+
tamperProtectionFn: features.TamperProtection,
41+
notifyUnitsOfProxiedActionFn: notifyUnitsOfProxiedAction,
3842
}
3943
}
4044

@@ -55,18 +59,23 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
5559
return nil
5660
}
5761

62+
var uOpts []coordinator.UpgradeOpt
5863
if h.tamperProtectionFn() {
5964
// Find inputs that want to receive UPGRADE action
6065
// Endpoint needs to receive a signed UPGRADE action in order to be able to uncontain itself
6166
state := h.coord.State()
6267
ucs := findMatchingUnitsByActionType(state, a.Type())
6368
if len(ucs) > 0 {
64-
h.log.Debugf("handlerUpgrade: proxy/dispatch action '%+v'", a)
65-
err := notifyUnitsOfProxiedAction(ctx, h.log, action, ucs, h.coord.PerformAction)
66-
h.log.Debugf("handlerUpgrade: after action dispatched '%+v', err: %v", a, err)
67-
if err != nil {
68-
return err
69-
}
69+
h.log.Debugf("Found %d components running for %v action type", len(ucs), a.Type())
70+
uOpts = append(uOpts, coordinator.WithPreUpgradeCallback(func(ctx context.Context, log *logger.Logger, action *fleetapi.ActionUpgrade) error {
71+
log.Debugf("handlerUpgrade: proxy/dispatch action '%+v'", a)
72+
err := h.notifyUnitsOfProxiedActionFn(ctx, log, action, ucs, h.coord.PerformAction)
73+
log.Debugf("handlerUpgrade: after action dispatched '%+v', err: %v", a, err)
74+
if err != nil {
75+
return fmt.Errorf("failed to notify units of proxied action: %w", err)
76+
}
77+
return nil
78+
}))
7079
} else {
7180
// Log and continue
7281
h.log.Debugf("No components running for %v action type", a.Type())
@@ -75,7 +84,7 @@ func (h *Upgrade) Handle(ctx context.Context, a fleetapi.Action, ack acker.Acker
7584

7685
go func() {
7786
h.log.Infof("starting upgrade to version %s in background", action.Data.Version)
78-
if err := h.coord.Upgrade(asyncCtx, action.Data.Version, action.Data.SourceURI, action, false, false); err != nil {
87+
if err := h.coord.Upgrade(asyncCtx, action.Data.Version, action.Data.SourceURI, action, uOpts...); err != nil {
7988
h.log.Errorf("upgrade to version %s failed: %v", action.Data.Version, err)
8089
// If context is cancelled in getAsyncContext, the actions are acked there
8190
if !errors.Is(asyncCtx.Err(), context.Canceled) {

internal/pkg/agent/application/actions/handlers/handler_action_upgrade_test.go

Lines changed: 148 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,9 +11,17 @@ import (
1111
"testing"
1212
"time"
1313

14+
"github.com/stretchr/testify/assert"
1415
"github.com/stretchr/testify/mock"
1516
"github.com/stretchr/testify/require"
1617

18+
"github.com/elastic/elastic-agent-client/v7/pkg/client"
19+
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
20+
"github.com/elastic/elastic-agent-libs/logp"
21+
"github.com/elastic/elastic-agent/pkg/component/runtime"
22+
mockhandlers "github.com/elastic/elastic-agent/testing/mocks/internal_/pkg/agent/application/actions/handlers"
23+
mockAcker "github.com/elastic/elastic-agent/testing/mocks/internal_/pkg/fleetapi/acker"
24+
1725
"github.com/elastic/elastic-agent/internal/pkg/agent/application/coordinator"
1826
"github.com/elastic/elastic-agent/internal/pkg/agent/application/info"
1927
"github.com/elastic/elastic-agent/internal/pkg/agent/application/reexec"
@@ -368,6 +376,146 @@ func TestUpgradeHandlerNewVersion(t *testing.T) {
368376
checkMsg(upgradeCalledChan, "8.5.0", "second call to Upgrade must be with version 8.5.0")
369377
}
370378

379+
func TestEndpointPreUpgradeCallback(t *testing.T) {
380+
ctx, cancel := context.WithCancel(t.Context())
381+
defer cancel()
382+
383+
for _, tc := range []struct {
384+
name string
385+
upgradeAction *fleetapi.ActionUpgrade
386+
shouldProxyToEndpoint bool
387+
coordUpgradeErr error
388+
}{
389+
{
390+
name: "error from coordinator upgrade with notify endpoint",
391+
upgradeAction: &fleetapi.ActionUpgrade{
392+
ActionType: fleetapi.ActionTypeUpgrade,
393+
Data: fleetapi.ActionUpgradeData{
394+
Version: "255.0.0",
395+
SourceURI: "http://localhost",
396+
},
397+
},
398+
shouldProxyToEndpoint: true,
399+
coordUpgradeErr: errors.New("test error"),
400+
},
401+
{
402+
name: "no error from coordinator upgrade with notify endpoint",
403+
upgradeAction: &fleetapi.ActionUpgrade{
404+
ActionType: fleetapi.ActionTypeUpgrade,
405+
Data: fleetapi.ActionUpgradeData{
406+
Version: "255.0.0",
407+
SourceURI: "http://localhost",
408+
},
409+
},
410+
shouldProxyToEndpoint: true,
411+
},
412+
{
413+
name: "error from coordinator upgrade without notify endpoint",
414+
upgradeAction: &fleetapi.ActionUpgrade{
415+
ActionType: fleetapi.ActionTypeUpgrade,
416+
Data: fleetapi.ActionUpgradeData{
417+
Version: "255.0.0",
418+
SourceURI: "http://localhost",
419+
},
420+
},
421+
coordUpgradeErr: errors.New("test error"),
422+
},
423+
{
424+
name: "no error from coordinator upgrade without notify endpoint",
425+
upgradeAction: &fleetapi.ActionUpgrade{
426+
ActionType: fleetapi.ActionTypeUpgrade,
427+
Data: fleetapi.ActionUpgradeData{
428+
Version: "255.0.0",
429+
SourceURI: "http://localhost",
430+
},
431+
},
432+
},
433+
} {
434+
t.Run(tc.name, func(t *testing.T) {
435+
mockCoordinator := mockhandlers.NewUpgradeCoordinator(t)
436+
437+
var coordState coordinator.State
438+
if tc.shouldProxyToEndpoint {
439+
coordState.Components = []runtime.ComponentComponentState{
440+
{
441+
Component: component.Component{
442+
InputSpec: &component.InputRuntimeSpec{
443+
Spec: component.InputSpec{
444+
ProxiedActions: []string{fleetapi.ActionTypeUpgrade},
445+
},
446+
},
447+
InputType: "endpoint",
448+
Units: []component.Unit{
449+
{
450+
Type: client.UnitTypeInput,
451+
Config: &proto.UnitExpectedConfig{
452+
Type: "endpoint",
453+
},
454+
},
455+
},
456+
},
457+
},
458+
}
459+
460+
mockCoordinator.EXPECT().State().Return(coordState)
461+
}
462+
463+
upgradeCalledChan := make(chan struct{})
464+
mockCoordinator.EXPECT().Upgrade(mock.Anything, tc.upgradeAction.Data.Version, tc.upgradeAction.Data.SourceURI, mock.Anything, mock.Anything).
465+
RunAndReturn(func(ctx context.Context, s string, s2 string, actionUpgrade *fleetapi.ActionUpgrade, opt ...coordinator.UpgradeOpt) error {
466+
upgradeCalledChan <- struct{}{}
467+
return tc.coordUpgradeErr
468+
})
469+
470+
log, _ := logger.New("", false)
471+
u := NewUpgrade(log, mockCoordinator)
472+
u.tamperProtectionFn = func() bool { return tc.shouldProxyToEndpoint }
473+
474+
notifyUnitsCalled := atomic.Bool{}
475+
u.notifyUnitsOfProxiedActionFn = func(ctx context.Context, log *logp.Logger, action dispatchableAction, ucs []unitWithComponent, performAction performActionFunc) error {
476+
notifyUnitsCalled.Store(true)
477+
return nil
478+
}
479+
480+
ack := mockAcker.NewAcker(t)
481+
482+
if tc.coordUpgradeErr != nil {
483+
// on a coordinator upgrade error we should ack and commit all the bkg actions
484+
ack.EXPECT().Ack(mock.Anything, mock.Anything).Return(nil)
485+
ack.EXPECT().Commit(mock.Anything).Return(nil)
486+
}
487+
488+
err := u.Handle(ctx, tc.upgradeAction, ack)
489+
require.NoError(t, err, "Handle should not return an error")
490+
491+
select {
492+
case <-upgradeCalledChan:
493+
break
494+
case <-time.After(10 * time.Second):
495+
t.Fatal("mockCoordinator.Upgrade was not called in time")
496+
}
497+
498+
// notifyUnitsOfProxiedActionFn should only ever be passed as a PreUpgradeCallback to the coordinator upgrader.
499+
// This assertion guards against it being called directly in this context.
500+
assert.False(t, notifyUnitsCalled.Load(), "notifyUnitsOfProxiedActionFn should not be called")
501+
502+
assert.Eventually(t, func() bool {
503+
u.bkgMutex.Lock()
504+
defer u.bkgMutex.Unlock()
505+
if tc.coordUpgradeErr == nil {
506+
// yes this is counter-intuitive but when the coordinator upgrade returns a nil error
507+
// actions are not cleaned from bkgActions. This is most likely because after a successful upgrade
508+
// the expectation is for an agent to restart and thus the bkgActions will be lost.
509+
// NOTE if bkgActions gets to be persisted in the future this logic needs to change.
510+
return len(u.bkgActions) == 1
511+
} else {
512+
return len(u.bkgActions) == 0
513+
}
514+
}, 10*time.Second, 100*time.Millisecond)
515+
})
516+
}
517+
}
518+
371519
type fakeAcker struct {
372520
mock.Mock
373521
}

internal/pkg/agent/application/actions/handlers/handler_helpers.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -28,7 +28,7 @@ type actionCoordinator interface {
2828

2929
type upgradeCoordinator interface {
3030
actionCoordinator
31-
Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) error
31+
Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, opts ...coordinator.UpgradeOpt) error
3232
}
3333

3434
type performActionFunc func(context.Context, component.Component, component.Unit, string, map[string]interface{}) (map[string]interface{}, error)

internal/pkg/agent/application/coordinator/coordinator.go

Lines changed: 60 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -695,19 +695,45 @@ func (c *Coordinator) Migrate(ctx context.Context, action *fleetapi.ActionMigrat
695695
return nil
696696
}
697697

698-
// Upgrade runs the upgrade process.
699-
// Called from external goroutines.
700-
func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, skipVerifyOverride bool, skipDefaultPgp bool, pgpBytes ...string) error {
701-
// early check outside of upgrader before overriding the state
702-
if !c.upgradeMgr.Upgradeable() {
703-
return ErrNotUpgradable
698+
type upgradeOpts struct {
699+
skipVerifyOverride bool
700+
skipDefaultPgp bool
701+
pgpBytes []string
702+
preUpgradeCallback func(ctx context.Context, log *logger.Logger, action *fleetapi.ActionUpgrade) error
703+
}
704+
705+
type UpgradeOpt func(*upgradeOpts)
706+
707+
func WithSkipVerifyOverride(skipVerifyOverride bool) UpgradeOpt {
708+
return func(opts *upgradeOpts) {
709+
opts.skipVerifyOverride = skipVerifyOverride
704710
}
711+
}
705712

706-
// early check capabilities to ensure this upgrade actions is allowed
707-
if c.caps != nil {
708-
if !c.caps.AllowUpgrade(version, sourceURI) {
709-
return ErrNotUpgradable
710-
}
713+
func WithSkipDefaultPgp(skipDefaultPgp bool) UpgradeOpt {
714+
return func(opts *upgradeOpts) {
715+
opts.skipDefaultPgp = skipDefaultPgp
716+
}
717+
}
718+
719+
func WithPgpBytes(pgpBytes []string) UpgradeOpt {
720+
return func(opts *upgradeOpts) {
721+
opts.pgpBytes = pgpBytes
722+
}
723+
}
724+
725+
func WithPreUpgradeCallback(preUpgradeCallback func(ctx context.Context, log *logger.Logger, action *fleetapi.ActionUpgrade) error) UpgradeOpt {
726+
return func(opts *upgradeOpts) {
727+
opts.preUpgradeCallback = preUpgradeCallback
728+
}
729+
}
730+
731+
// Upgrade runs the upgrade process.
732+
// Called from external goroutines.
733+
func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI string, action *fleetapi.ActionUpgrade, opts ...UpgradeOpt) error {
734+
var uOpts upgradeOpts
735+
for _, opt := range opts {
736+
opt(&uOpts)
711737
}
712738

713739
// A previous upgrade may be cancelled and needs some time to
@@ -737,7 +763,29 @@ func (c *Coordinator) Upgrade(ctx context.Context, version string, sourceURI str
737763
det := details.NewDetails(version, details.StateRequested, actionID)
738764
det.RegisterObserver(c.SetUpgradeDetails)
739765

740-
cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, det, skipVerifyOverride, skipDefaultPgp, pgpBytes...)
766+
// early check outside of upgrader before overriding the state
767+
if !c.upgradeMgr.Upgradeable() {
768+
det.Fail(ErrNotUpgradable)
769+
return ErrNotUpgradable
770+
}
771+
772+
// early check capabilities to ensure this upgrade actions is allowed
773+
if c.caps != nil {
774+
if !c.caps.AllowUpgrade(version, sourceURI) {
775+
det.Fail(ErrNotUpgradable)
776+
return ErrNotUpgradable
777+
}
778+
}
779+
780+
// run any pre upgrade callback
781+
if uOpts.preUpgradeCallback != nil {
782+
if err := uOpts.preUpgradeCallback(ctx, c.logger, action); err != nil {
783+
det.Fail(err)
784+
return err
785+
}
786+
}
787+
788+
cb, err := c.upgradeMgr.Upgrade(ctx, version, sourceURI, action, det, uOpts.skipVerifyOverride, uOpts.skipDefaultPgp, uOpts.pgpBytes...)
741789
if err != nil {
742790
c.ClearOverrideState()
743791
if errors.Is(err, upgrade.ErrUpgradeSameVersion) {

0 commit comments

Comments
 (0)