Skip to content

Commit 3e1fc2b

Browse files
fix: ensure EDOT subprocess shuts down gracefully on agent termination (#9886)
* fix: ensure EDOT subprocess shuts down gracefully on agent termination * fix: reword returned error * doc: add comment to describe the functionality of testing.go * fix: re-structure shutdown delay in test binary * fix: utilise t.SetEnv in unit-tests * feat: derive otel manager wait to stop timeout from agent.process.stop_timeout
1 parent e9bc0b0 commit 3e1fc2b

7 files changed

Lines changed: 227 additions & 71 deletions

File tree

internal/pkg/agent/application/application.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -240,7 +240,7 @@ func New(
240240
return nil, nil, nil, errors.New(err, "failed to initialize composable controller")
241241
}
242242

243-
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig)
243+
otelManager, err := otelmanager.NewOTelManager(log.Named("otel_manager"), logLevel, baseLogger, otelmanager.EmbeddedExecutionMode, agentInfo, monitor.ComponentMonitoringConfig, cfg.Settings.ProcessConfig.StopTimeout)
244244
if err != nil {
245245
return nil, nil, nil, fmt.Errorf("failed to create otel manager: %w", err)
246246
}

internal/pkg/otel/manager/execution.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package manager
66

77
import (
88
"context"
9+
"time"
910

1011
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
1112
"go.opentelemetry.io/collector/confmap"
@@ -18,5 +19,5 @@ type collectorExecution interface {
1819
}
1920

2021
type collectorHandle interface {
21-
Stop(ctx context.Context)
22+
Stop(waitTime time.Duration)
2223
}

internal/pkg/otel/manager/execution_embedded.go

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,6 +6,7 @@ package manager
66

77
import (
88
"context"
9+
"time"
910

1011
"github.com/open-telemetry/opentelemetry-collector-contrib/pkg/status"
1112
"go.opentelemetry.io/collector/confmap"
@@ -66,15 +67,16 @@ type ctxHandle struct {
6667
}
6768

6869
// Stop stops the collector
69-
func (s *ctxHandle) Stop(ctx context.Context) {
70+
func (s *ctxHandle) Stop(waitTime time.Duration) {
7071
if s.cancel == nil {
7172
return
7273
}
7374

7475
s.cancel()
7576

7677
select {
77-
case <-ctx.Done():
78+
case <-time.After(waitTime):
79+
return
7880
case <-s.collectorDoneCh:
7981
}
8082
}

internal/pkg/otel/manager/execution_subprocess.go

Lines changed: 11 additions & 11 deletions
Original file line numberDiff line numberDiff line change
@@ -30,8 +30,6 @@ import (
3030
)
3131

3232
const (
33-
processKillAfter = 5 * time.Second
34-
3533
OtelSetSupervisedFlagName = "supervised"
3634
OtelSupervisedLoggingLevelFlagName = "supervised.logging.level"
3735
)
@@ -56,6 +54,7 @@ func newSubprocessExecution(logLevel logp.Level, collectorPath string) (*subproc
5654
},
5755
logLevel: logLevel,
5856
healthCheckExtensionID: healthCheckExtensionID,
57+
reportErrFn: reportErr,
5958
}, nil
6059
}
6160

@@ -64,6 +63,7 @@ type subprocessExecution struct {
6463
collectorArgs []string
6564
logLevel logp.Level
6665
healthCheckExtensionID string
66+
reportErrFn func(ctx context.Context, errCh chan error, err error) // required for testing
6767
}
6868

6969
// startCollector starts a supervised collector and monitors its health. Process exit errors are sent to the
@@ -106,7 +106,6 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
106106
procCtx, procCtxCancel := context.WithCancel(ctx)
107107
processInfo, err := process.Start(r.collectorPath,
108108
process.WithArgs(r.collectorArgs),
109-
process.WithContext(procCtx),
110109
process.WithEnv(os.Environ()),
111110
process.WithCmdOptions(func(c *exec.Cmd) error {
112111
c.Stdin = bytes.NewReader(confBytes)
@@ -130,6 +129,7 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
130129
ctl := &procHandle{
131130
processDoneCh: make(chan struct{}),
132131
processInfo: processInfo,
132+
log: logger,
133133
}
134134

135135
healthCheckDone := make(chan struct{})
@@ -196,14 +196,14 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
196196
if procErr == nil {
197197
if procState.Success() {
198198
// report nil error so that the caller can be notified that the process has exited without error
199-
reportErr(ctx, processErrCh, nil)
199+
r.reportErrFn(ctx, processErrCh, nil)
200200
} else {
201-
reportErr(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String()))
201+
r.reportErrFn(ctx, processErrCh, fmt.Errorf("supervised collector (pid: %d) exited with error: %s", procState.Pid(), procState.String()))
202202
}
203203
return
204204
}
205205

206-
reportErr(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr))
206+
r.reportErrFn(ctx, processErrCh, fmt.Errorf("failed to wait supervised collector process: %w", procErr))
207207
}()
208208

209209
return ctl, nil
@@ -212,11 +212,12 @@ func (r *subprocessExecution) startCollector(ctx context.Context, logger *logger
212212
type procHandle struct {
213213
processDoneCh chan struct{}
214214
processInfo *process.Info
215+
log *logger.Logger
215216
}
216217

217218
// Stop stops the process. If the process is already stopped, it does nothing. If the process does not stop within
218219
// processKillAfter or due to an error, it will be killed.
219-
func (s *procHandle) Stop(ctx context.Context) {
220+
func (s *procHandle) Stop(waitTime time.Duration) {
220221
select {
221222
case <-s.processDoneCh:
222223
// process has already exited
@@ -225,19 +226,18 @@ func (s *procHandle) Stop(ctx context.Context) {
225226
}
226227

227228
if err := s.processInfo.Stop(); err != nil {
229+
s.log.Warnf("failed to send stop signal to the supervised collector: %v", err)
228230
// we failed to stop the process just kill it and return
229231
_ = s.processInfo.Kill()
230232
return
231233
}
232234

233235
select {
234-
case <-ctx.Done():
236+
case <-time.After(waitTime):
237+
s.log.Warnf("timeout waiting (%s) for the supervised collector to stop, killing it", waitTime.String())
235238
// our caller ctx is Done; kill the process just in case
236239
_ = s.processInfo.Kill()
237240
case <-s.processDoneCh:
238241
// process has already exited
239-
case <-time.After(processKillAfter):
240-
// process is still running kill it
241-
_ = s.processInfo.Kill()
242242
}
243243
}

internal/pkg/otel/manager/manager.go

Lines changed: 31 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -101,6 +101,12 @@ type OTelManager struct {
101101
execution collectorExecution
102102

103103
proc collectorHandle
104+
105+
// collectorRunErr is used to signal that the collector has exited.
106+
collectorRunErr chan error
107+
108+
// stopTimeout is the timeout to wait for the collector to stop.
109+
stopTimeout time.Duration
104110
}
105111

106112
// NewOTelManager returns a OTelManager.
@@ -111,6 +117,7 @@ func NewOTelManager(
111117
mode ExecutionMode,
112118
agentInfo info.Agent,
113119
beatMonitoringConfigGetter translate.BeatMonitoringConfigGetter,
120+
stopTimeout time.Duration,
114121
) (*OTelManager, error) {
115122
var exec collectorExecution
116123
var recoveryTimer collectorRecoveryTimer
@@ -131,7 +138,7 @@ func NewOTelManager(
131138
recoveryTimer = newRestarterNoop()
132139
exec = newExecutionEmbedded()
133140
default:
134-
return nil, errors.New("unknown otel collector exec")
141+
return nil, fmt.Errorf("unknown otel collector execution mode: %q", mode)
135142
}
136143

137144
logger.Debugf("Using collector execution mode: %s", mode)
@@ -144,10 +151,12 @@ func NewOTelManager(
144151
errCh: make(chan error, 1), // holds at most one error
145152
collectorStatusCh: make(chan *status.AggregateStatus, 1),
146153
componentStateCh: make(chan []runtime.ComponentComponentState, 1),
147-
updateCh: make(chan configUpdate),
154+
updateCh: make(chan configUpdate, 1),
148155
doneChan: make(chan struct{}),
149156
execution: exec,
150157
recoveryTimer: recoveryTimer,
158+
collectorRunErr: make(chan error),
159+
stopTimeout: stopTimeout,
151160
}, nil
152161
}
153162

@@ -156,24 +165,21 @@ func (m *OTelManager) Run(ctx context.Context) error {
156165
var err error
157166
m.proc = nil
158167

159-
// signal that the run loop is ended to unblock any incoming update calls
160-
defer close(m.doneChan)
161-
162-
// collectorRunErr is used to signal that the collector has exited.
163-
collectorRunErr := make(chan error)
164-
165168
// collectorStatusCh is used internally by the otel collector to send status updates to the manager
166169
// this channel is buffered because it's possible for the collector to send a status update while the manager is
167170
// waiting for the collector to exit
168171
collectorStatusCh := make(chan *status.AggregateStatus, 1)
169172
for {
170173
select {
171174
case <-ctx.Done():
175+
// signal that the run loop is ended to unblock any incoming update calls
176+
close(m.doneChan)
177+
172178
m.recoveryTimer.Stop()
173179
// our caller context is cancelled so stop the collector and return
174180
// has exited.
175181
if m.proc != nil {
176-
m.proc.Stop(ctx)
182+
m.proc.Stop(m.stopTimeout)
177183
}
178184
return ctx.Err()
179185
case <-m.recoveryTimer.C():
@@ -187,7 +193,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
187193

188194
newRetries := m.recoveryRetries.Add(1)
189195
m.logger.Infof("collector recovery restarting, total retries: %d", newRetries)
190-
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh)
196+
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh)
191197
if err != nil {
192198
reportErr(ctx, m.errCh, err)
193199
// reset the restart timer to the next backoff
@@ -197,12 +203,12 @@ func (m *OTelManager) Run(ctx context.Context) error {
197203
reportErr(ctx, m.errCh, nil)
198204
}
199205

200-
case err = <-collectorRunErr:
206+
case err = <-m.collectorRunErr:
201207
m.recoveryTimer.Stop()
202208
if err == nil {
203209
// err is nil means that the collector has exited cleanly without an error
204210
if m.proc != nil {
205-
m.proc.Stop(ctx)
211+
m.proc.Stop(m.stopTimeout)
206212
m.proc = nil
207213
updateErr := m.reportOtelStatusUpdate(ctx, nil)
208214
if updateErr != nil {
@@ -223,7 +229,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
223229

224230
// in this rare case the collector stopped running but a configuration was
225231
// provided and the collector stopped with a clean exit
226-
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, collectorRunErr, collectorStatusCh)
232+
m.proc, err = m.execution.startCollector(ctx, m.baseLogger, m.mergedCollectorCfg, m.collectorRunErr, collectorStatusCh)
227233
if err != nil {
228234
// failed to create the collector (this is different then
229235
// it's failing to run). we do not retry creation on failure
@@ -245,7 +251,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
245251
// in the case that the configuration is invalid there is no reason to
246252
// try again as it will keep failing so we do not trigger a restart
247253
if m.proc != nil {
248-
m.proc.Stop(ctx)
254+
m.proc.Stop(m.stopTimeout)
249255
m.proc = nil
250256
// don't wait here for <-collectorRunErr, already occurred
251257
// clear status, no longer running
@@ -281,7 +287,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
281287
m.components = cfgUpdate.components
282288
m.mx.Unlock()
283289

284-
err = m.applyMergedConfig(ctx, collectorStatusCh, collectorRunErr)
290+
err = m.applyMergedConfig(ctx, collectorStatusCh, m.collectorRunErr)
285291
// report the error unconditionally to indicate that the config was applied
286292
reportErr(ctx, m.errCh, err)
287293

@@ -340,7 +346,7 @@ func buildMergedConfig(cfgUpdate configUpdate, agentInfo info.Agent, monitoringC
340346

341347
func (m *OTelManager) applyMergedConfig(ctx context.Context, collectorStatusCh chan *status.AggregateStatus, collectorRunErr chan error) error {
342348
if m.proc != nil {
343-
m.proc.Stop(ctx)
349+
m.proc.Stop(m.stopTimeout)
344350
m.proc = nil
345351
select {
346352
case <-collectorRunErr:
@@ -402,6 +408,15 @@ func (m *OTelManager) Update(cfg *confmap.Conf, components []component.Component
402408
collectorCfg: cfg,
403409
components: components,
404410
}
411+
412+
// we care only about the latest config update
413+
select {
414+
case <-m.updateCh:
415+
case <-m.doneChan:
416+
return
417+
default:
418+
}
419+
405420
select {
406421
case m.updateCh <- cfgUpdate:
407422
case <-m.doneChan:

0 commit comments

Comments
 (0)