@@ -30,6 +30,7 @@ import (
3030var (
3131 // ErrAppNotRunning is returned when configuration is performed on not running application.
3232 ErrAppNotRunning = errors .New ("application is not running" , errors .TypeApplication )
33+ procExitTimeout = 10 * time .Second
3334)
3435
3536// Application encapsulates a concrete application ran by elastic-agent e.g Beat.
@@ -47,6 +48,7 @@ type Application struct {
4748 tag app.Taggable
4849 state state.State
4950 reporter state.Reporter
51+ watchClosers map [int ]context.CancelFunc
5052
5153 uid int
5254 gid int
@@ -105,6 +107,7 @@ func NewApplication(
105107 uid : uid ,
106108 gid : gid ,
107109 statusReporter : statusController .RegisterApp (id , appName ),
110+ watchClosers : make (map [int ]context.CancelFunc ),
108111 }, nil
109112}
110113
@@ -159,6 +162,8 @@ func (a *Application) Stop() {
159162
160163 a .srvState = nil
161164 if a .state .ProcessInfo != nil {
165+ // stop and clean watcher
166+ a .stopWatcher (a .state .ProcessInfo )
162167 if err := a .state .ProcessInfo .Process .Signal (stopSig ); err == nil {
163168 // no error on signal, so wait for it to stop
164169 _ , _ = a .state .ProcessInfo .Process .Wait ()
@@ -192,33 +197,52 @@ func (a *Application) watch(ctx context.Context, p app.Taggable, proc *process.I
192197 case ps := <- a .waitProc (proc .Process ):
193198 procState = ps
194199 case <- a .bgContext .Done ():
195- a .Stop ()
200+ return
201+ case <- ctx .Done ():
202+ // closer called
196203 return
197204 }
198205
199206 a .appLock .Lock ()
207+ defer a .appLock .Unlock ()
200208 if a .state .ProcessInfo != proc {
201209 // already another process started, another watcher is watching instead
202- a .appLock .Unlock ()
210+ gracefulKill (proc )
211+ return
212+ }
213+
214+ // stop the watcher
215+ a .stopWatcher (a .state .ProcessInfo )
216+
217+ // was already stopped by Stop, do not restart
218+ if a .state .Status == state .Stopped {
203219 return
204220 }
221+
205222 a .state .ProcessInfo = nil
206223 srvState := a .srvState
207224
208225 if srvState == nil || srvState .Expected () == proto .StateExpected_STOPPING {
209- a .appLock .Unlock ()
210226 return
211227 }
212228
213229 msg := fmt .Sprintf ("exited with code: %d" , procState .ExitCode ())
214- a .setState (state .Crashed , msg , nil )
230+ a .setState (state .Restarting , msg , nil )
215231
216232 // it was a crash
217- a .start (ctx , p , cfg )
218- a .appLock .Unlock ()
233+ a .start (ctx , p , cfg , true )
219234 }()
220235}
221236
237+ func (a * Application ) stopWatcher (procInfo * process.Info ) {
238+ if procInfo != nil {
239+ if closer , ok := a .watchClosers [procInfo .PID ]; ok {
240+ closer ()
241+ delete (a .watchClosers , procInfo .PID )
242+ }
243+ }
244+ }
245+
222246func (a * Application ) waitProc (proc * os.Process ) <- chan * os.ProcessState {
223247 resChan := make (chan * os.ProcessState )
224248
@@ -250,3 +274,31 @@ func (a *Application) setState(s state.Status, msg string, payload map[string]in
250274func (a * Application ) cleanUp () {
251275 a .monitor .Cleanup (a .desc .Spec (), a .pipelineID )
252276}
277+
278+ func gracefulKill (proc * process.Info ) {
279+ if proc == nil || proc .Process == nil {
280+ return
281+ }
282+
283+ // send stop signal to request stop
284+ proc .Stop ()
285+
286+ var wg sync.WaitGroup
287+ doneChan := make (chan struct {})
288+ wg .Add (1 )
289+ go func () {
290+ wg .Done ()
291+ _ , _ = proc .Process .Wait ()
292+ close (doneChan )
293+ }()
294+
295+ // wait for awaiter
296+ wg .Wait ()
297+
298+ // kill in case it's still running after timeout
299+ select {
300+ case <- doneChan :
301+ case <- time .After (procExitTimeout ):
302+ _ = proc .Process .Kill ()
303+ }
304+ }
0 commit comments