Skip to content

Commit 4ca5651

Browse files
committed
Fix startup with failing configuration (#26126)
Fix startup with failing configuration (#26126)
1 parent 38d759c commit 4ca5651

5 files changed

Lines changed: 117 additions & 22 deletions

File tree

x-pack/elastic-agent/CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -69,6 +69,7 @@
6969
- Handle case where policy doesn't contain Fleet connection information {pull}25707[25707]
7070
- Fix fleet-server.yml spec to not overwrite existing keys {pull}25741[25741]
7171
- Agent sends wrong log level to Endpoint {issue}25583[25583]
72+
- Fix startup with failing configuration {pull}26057[26057]
7273
- Change timestamp in elatic-agent-json.log to use UTC {issue}25391[25391]
7374

7475
==== New features

x-pack/elastic-agent/pkg/agent/operation/operation_config.go

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,6 +10,7 @@ import (
1010
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/configuration"
1111
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/agent/errors"
1212
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/logger"
13+
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/plugin/process"
1314
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
1415
)
1516

@@ -53,7 +54,13 @@ func (o *operationConfig) Check(_ context.Context, _ Application) (bool, error)
5354
func (o *operationConfig) Run(ctx context.Context, application Application) (err error) {
5455
defer func() {
5556
if err != nil {
56-
application.SetState(state.Failed, err.Error(), nil)
57+
// application failed to apply config but is running.
58+
s := state.Degraded
59+
if errors.Is(err, process.ErrAppNotRunning) {
60+
s = state.Failed
61+
}
62+
63+
application.SetState(s, err.Error(), nil)
5764
}
5865
}()
5966
return application.Configure(ctx, o.cfg)

x-pack/elastic-agent/pkg/core/plugin/process/app.go

Lines changed: 58 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
var (
3131
// ErrAppNotRunning is returned when configuration is performed on not running application.
3232
ErrAppNotRunning = errors.New("application is not running", errors.TypeApplication)
33+
procExitTimeout = 10 * time.Second
3334
)
3435

3536
// Application encapsulates a concrete application ran by elastic-agent e.g Beat.
@@ -47,6 +48,7 @@ type Application struct {
4748
tag app.Taggable
4849
state state.State
4950
reporter state.Reporter
51+
watchClosers map[int]context.CancelFunc
5052

5153
uid int
5254
gid int
@@ -105,6 +107,7 @@ func NewApplication(
105107
uid: uid,
106108
gid: gid,
107109
statusReporter: statusController.RegisterApp(id, appName),
110+
watchClosers: make(map[int]context.CancelFunc),
108111
}, nil
109112
}
110113

@@ -159,6 +162,8 @@ func (a *Application) Stop() {
159162

160163
a.srvState = nil
161164
if a.state.ProcessInfo != nil {
165+
// stop and clean watcher
166+
a.stopWatcher(a.state.ProcessInfo)
162167
if err := a.state.ProcessInfo.Process.Signal(stopSig); err == nil {
163168
// no error on signal, so wait for it to stop
164169
_, _ = a.state.ProcessInfo.Process.Wait()
@@ -192,33 +197,52 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I
192197
case ps := <-a.waitProc(proc.Process):
193198
procState = ps
194199
case <-a.bgContext.Done():
195-
a.Stop()
200+
return
201+
case <-ctx.Done():
202+
// closer called
196203
return
197204
}
198205

199206
a.appLock.Lock()
207+
defer a.appLock.Unlock()
200208
if a.state.ProcessInfo != proc {
201209
// already another process started, another watcher is watching instead
202-
a.appLock.Unlock()
210+
gracefulKill(proc)
211+
return
212+
}
213+
214+
// stop the watcher
215+
a.stopWatcher(a.state.ProcessInfo)
216+
217+
// was already stopped by Stop, do not restart
218+
if a.state.Status == state.Stopped {
203219
return
204220
}
221+
205222
a.state.ProcessInfo = nil
206223
srvState := a.srvState
207224

208225
if srvState == nil || srvState.Expected() == proto.StateExpected_STOPPING {
209-
a.appLock.Unlock()
210226
return
211227
}
212228

213229
msg := fmt.Sprintf("exited with code: %d", procState.ExitCode())
214-
a.setState(state.Crashed, msg, nil)
230+
a.setState(state.Restarting, msg, nil)
215231

216232
// it was a crash
217-
a.start(ctx, p, cfg)
218-
a.appLock.Unlock()
233+
a.start(ctx, p, cfg, true)
219234
}()
220235
}
221236

237+
func (a *Application) stopWatcher(procInfo *process.Info) {
238+
if procInfo != nil {
239+
if closer, ok := a.watchClosers[procInfo.PID]; ok {
240+
closer()
241+
delete(a.watchClosers, procInfo.PID)
242+
}
243+
}
244+
}
245+
222246
func (a *Application) waitProc(proc *os.Process) <-chan *os.ProcessState {
223247
resChan := make(chan *os.ProcessState)
224248

@@ -250,3 +274,31 @@ func (a *Application) setState(s state.Status, msg string, payload map[string]in
250274
func (a *Application) cleanUp() {
251275
a.monitor.Cleanup(a.desc.Spec(), a.pipelineID)
252276
}
277+
278+
func gracefulKill(proc *process.Info) {
279+
if proc == nil || proc.Process == nil {
280+
return
281+
}
282+
283+
// send stop signal to request stop
284+
proc.Stop()
285+
286+
var wg sync.WaitGroup
287+
doneChan := make(chan struct{})
288+
wg.Add(1)
289+
go func() {
290+
wg.Done()
291+
_, _ = proc.Process.Wait()
292+
close(doneChan)
293+
}()
294+
295+
// wait for awaiter
296+
wg.Wait()
297+
298+
// kill in case it's still running after timeout
299+
select {
300+
case <-doneChan:
301+
case <-time.After(procExitTimeout):
302+
_ = proc.Process.Kill()
303+
}
304+
}

x-pack/elastic-agent/pkg/core/plugin/process/start.go

Lines changed: 28 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -26,20 +26,37 @@ func (a *Application) Start(ctx context.Context, t app.Taggable, cfg map[string]
2626
a.appLock.Lock()
2727
defer a.appLock.Unlock()
2828

29-
return a.start(ctx, t, cfg)
29+
return a.start(ctx, t, cfg, false)
3030
}
3131

3232
// Start starts the application without grabbing the lock.
33-
func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}) (err error) {
33+
func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]interface{}, isRestart bool) (err error) {
3434
defer func() {
3535
if err != nil {
3636
// inject App metadata
3737
err = errors.New(err, errors.M(errors.MetaKeyAppName, a.name), errors.M(errors.MetaKeyAppName, a.id))
3838
}
3939
}()
4040

41-
// already started if not stopped or crashed
42-
if a.Started() {
41+
// starting only if it's not running
42+
// or if it is, then only in case it's restart and this call initiates from restart call
43+
if a.Started() && a.state.Status != state.Restarting {
44+
if a.state.ProcessInfo == nil {
45+
// already started if not stopped or crashed
46+
return nil
47+
}
48+
49+
// in case app reported status it might still be running and failure timer
50+
// in progress. Stop timer and stop failing process
51+
a.stopFailedTimer()
52+
a.stopWatcher(a.state.ProcessInfo)
53+
54+
// kill the process
55+
_ = a.state.ProcessInfo.Process.Kill()
56+
a.state.ProcessInfo = nil
57+
}
58+
59+
if a.state.Status == state.Restarting && !isRestart {
4360
return nil
4461
}
4562

@@ -69,7 +86,8 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]
6986
if a.state.Status != state.Stopped {
7087
// restarting as it was previously in a different state
7188
a.setState(state.Restarting, "Restarting", nil)
72-
} else {
89+
} else if a.state.Status != state.Restarting {
90+
// keep restarting state otherwise it's starting
7391
a.setState(state.Starting, "Starting", nil)
7492
}
7593

@@ -116,12 +134,15 @@ func (a *Application) start(ctx context.Context, t app.Taggable, cfg map[string]
116134
if err != nil {
117135
return err
118136
}
119-
120137
// write connect info to stdin
121138
go a.writeToStdin(a.srvState, a.state.ProcessInfo.Stdin)
122139

140+
// create closer for watcher, used to terminate watcher without
141+
// side effect of restarting process during shutdown
142+
cancelCtx, cancel := context.WithCancel(ctx)
143+
a.watchClosers[a.state.ProcessInfo.PID] = cancel
123144
// setup watcher
124-
a.watch(ctx, t, a.state.ProcessInfo, cfg)
145+
a.watch(cancelCtx, t, a.state.ProcessInfo, cfg)
125146

126147
return nil
127148
}

x-pack/elastic-agent/pkg/core/plugin/process/status.go

Lines changed: 22 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,7 @@ import (
1313

1414
"github.com/elastic/elastic-agent-client/v7/pkg/proto"
1515

16+
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/process"
1617
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/server"
1718
"github.com/elastic/beats/v7/x-pack/elastic-agent/pkg/core/state"
1819
)
@@ -42,7 +43,8 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St
4243
_ = yaml.Unmarshal([]byte(s.Config()), &cfg)
4344

4445
// start the failed timer
45-
a.startFailedTimer(cfg)
46+
// pass process info to avoid killing new process spun up in a meantime
47+
a.startFailedTimer(cfg, a.state.ProcessInfo)
4648
} else {
4749
a.stopFailedTimer()
4850
}
@@ -51,7 +53,7 @@ func (a *Application) OnStatusChange(s *server.ApplicationState, status proto.St
5153
// startFailedTimer starts a timer that will restart the application if it doesn't exit failed after a period of time.
5254
//
5355
// This does not grab the appLock, that must be managed by the caller.
54-
func (a *Application) startFailedTimer(cfg map[string]interface{}) {
56+
func (a *Application) startFailedTimer(cfg map[string]interface{}, proc *process.Info) {
5557
if a.restartCanceller != nil {
5658
// already have running failed timer; just update config
5759
a.restartConfig = cfg
@@ -74,7 +76,7 @@ func (a *Application) startFailedTimer(cfg map[string]interface{}) {
7476
case <-ctx.Done():
7577
return
7678
case <-t.C:
77-
a.restart()
79+
a.restart(proc)
7880
}
7981
}()
8082
}
@@ -91,19 +93,31 @@ func (a *Application) stopFailedTimer() {
9193
}
9294

9395
// restart restarts the application
94-
func (a *Application) restart() {
96+
func (a *Application) restart(proc *process.Info) {
9597
a.appLock.Lock()
9698
defer a.appLock.Unlock()
9799

100+
// stop the watcher
101+
a.stopWatcher(proc)
102+
98103
// kill the process
99-
if a.state.ProcessInfo != nil {
100-
_ = a.state.ProcessInfo.Process.Kill()
101-
a.state.ProcessInfo = nil
104+
if proc != nil && proc.Process != nil {
105+
_ = proc.Process.Kill()
106+
}
107+
108+
if proc != a.state.ProcessInfo {
109+
// we're restarting different process than actually running
110+
// no need to start another one
111+
return
102112
}
113+
114+
a.state.ProcessInfo = nil
115+
103116
ctx := a.startContext
104117
tag := a.tag
105118

106-
err := a.start(ctx, tag, a.restartConfig)
119+
a.setState(state.Restarting, "", nil)
120+
err := a.start(ctx, tag, a.restartConfig, true)
107121
if err != nil {
108122
a.setState(state.Crashed, fmt.Sprintf("failed to restart: %s", err), nil)
109123
}

0 commit comments

Comments
 (0)