@@ -101,6 +101,12 @@ type OTelManager struct {
101101 execution collectorExecution
102102
103103 proc collectorHandle
104+
105+ // collectorRunErr is used to signal that the collector has exited.
106+ collectorRunErr chan error
107+
108+ // stopTimeout is the timeout to wait for the collector to stop.
109+ stopTimeout time.Duration
104110}
105111
106112// NewOTelManager returns a OTelManager.
@@ -111,6 +117,7 @@ func NewOTelManager(
111117 mode ExecutionMode ,
112118 agentInfo info.Agent ,
113119 beatMonitoringConfigGetter translate.BeatMonitoringConfigGetter ,
120+ stopTimeout time.Duration ,
114121) (* OTelManager , error ) {
115122 var exec collectorExecution
116123 var recoveryTimer collectorRecoveryTimer
@@ -131,7 +138,7 @@ func NewOTelManager(
131138 recoveryTimer = newRestarterNoop ()
132139 exec = newExecutionEmbedded ()
133140 default :
134- return nil , errors . New ("unknown otel collector exec" )
141+ return nil , fmt . Errorf ("unknown otel collector execution mode: %q" , mode )
135142 }
136143
137144 logger .Debugf ("Using collector execution mode: %s" , mode )
@@ -144,10 +151,12 @@ func NewOTelManager(
144151 errCh : make (chan error , 1 ), // holds at most one error
145152 collectorStatusCh : make (chan * status.AggregateStatus , 1 ),
146153 componentStateCh : make (chan []runtime.ComponentComponentState , 1 ),
147- updateCh : make (chan configUpdate ),
154+ updateCh : make (chan configUpdate , 1 ),
148155 doneChan : make (chan struct {}),
149156 execution : exec ,
150157 recoveryTimer : recoveryTimer ,
158+ collectorRunErr : make (chan error ),
159+ stopTimeout : stopTimeout ,
151160 }, nil
152161}
153162
@@ -156,24 +165,21 @@ func (m *OTelManager) Run(ctx context.Context) error {
156165 var err error
157166 m .proc = nil
158167
159- // signal that the run loop is ended to unblock any incoming update calls
160- defer close (m .doneChan )
161-
162- // collectorRunErr is used to signal that the collector has exited.
163- collectorRunErr := make (chan error )
164-
165168 // collectorStatusCh is used internally by the otel collector to send status updates to the manager
166169 // this channel is buffered because it's possible for the collector to send a status update while the manager is
167170 // waiting for the collector to exit
168171 collectorStatusCh := make (chan * status.AggregateStatus , 1 )
169172 for {
170173 select {
171174 case <- ctx .Done ():
175+ // signal that the run loop is ended to unblock any incoming update calls
176+ close (m .doneChan )
177+
172178 m .recoveryTimer .Stop ()
173179 // our caller context is cancelled so stop the collector and return
174180 // has exited.
175181 if m .proc != nil {
176- m .proc .Stop (ctx )
182+ m .proc .Stop (m . stopTimeout )
177183 }
178184 return ctx .Err ()
179185 case <- m .recoveryTimer .C ():
@@ -187,7 +193,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
187193
188194 newRetries := m .recoveryRetries .Add (1 )
189195 m .logger .Infof ("collector recovery restarting, total retries: %d" , newRetries )
190- m .proc , err = m .execution .startCollector (ctx , m .baseLogger , m .mergedCollectorCfg , collectorRunErr , collectorStatusCh )
196+ m .proc , err = m .execution .startCollector (ctx , m .baseLogger , m .mergedCollectorCfg , m . collectorRunErr , collectorStatusCh )
191197 if err != nil {
192198 reportErr (ctx , m .errCh , err )
193199 // reset the restart timer to the next backoff
@@ -197,12 +203,12 @@ func (m *OTelManager) Run(ctx context.Context) error {
197203 reportErr (ctx , m .errCh , nil )
198204 }
199205
200- case err = <- collectorRunErr :
206+ case err = <- m . collectorRunErr :
201207 m .recoveryTimer .Stop ()
202208 if err == nil {
203209 // err is nil means that the collector has exited cleanly without an error
204210 if m .proc != nil {
205- m .proc .Stop (ctx )
211+ m .proc .Stop (m . stopTimeout )
206212 m .proc = nil
207213 updateErr := m .reportOtelStatusUpdate (ctx , nil )
208214 if updateErr != nil {
@@ -223,7 +229,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
223229
224230 // in this rare case the collector stopped running but a configuration was
225231 // provided and the collector stopped with a clean exit
226- m .proc , err = m .execution .startCollector (ctx , m .baseLogger , m .mergedCollectorCfg , collectorRunErr , collectorStatusCh )
232+ m .proc , err = m .execution .startCollector (ctx , m .baseLogger , m .mergedCollectorCfg , m . collectorRunErr , collectorStatusCh )
227233 if err != nil {
228234 // failed to create the collector (this is different then
229235 // it's failing to run). we do not retry creation on failure
@@ -245,7 +251,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
245251 // in the case that the configuration is invalid there is no reason to
246252 // try again as it will keep failing so we do not trigger a restart
247253 if m .proc != nil {
248- m .proc .Stop (ctx )
254+ m .proc .Stop (m . stopTimeout )
249255 m .proc = nil
250256 // don't wait here for <-collectorRunErr, already occurred
251257 // clear status, no longer running
@@ -281,7 +287,7 @@ func (m *OTelManager) Run(ctx context.Context) error {
281287 m .components = cfgUpdate .components
282288 m .mx .Unlock ()
283289
284- err = m .applyMergedConfig (ctx , collectorStatusCh , collectorRunErr )
290+ err = m .applyMergedConfig (ctx , collectorStatusCh , m . collectorRunErr )
285291 // report the error unconditionally to indicate that the config was applied
286292 reportErr (ctx , m .errCh , err )
287293
@@ -340,7 +346,7 @@ func buildMergedConfig(cfgUpdate configUpdate, agentInfo info.Agent, monitoringC
340346
341347func (m * OTelManager ) applyMergedConfig (ctx context.Context , collectorStatusCh chan * status.AggregateStatus , collectorRunErr chan error ) error {
342348 if m .proc != nil {
343- m .proc .Stop (ctx )
349+ m .proc .Stop (m . stopTimeout )
344350 m .proc = nil
345351 select {
346352 case <- collectorRunErr :
@@ -402,6 +408,15 @@ func (m *OTelManager) Update(cfg *confmap.Conf, components []component.Component
402408 collectorCfg : cfg ,
403409 components : components ,
404410 }
411+
412+ // we care only about the latest config update
413+ select {
414+ case <- m .updateCh :
415+ case <- m .doneChan :
416+ return
417+ default :
418+ }
419+
405420 select {
406421 case m .updateCh <- cfgUpdate :
407422 case <- m .doneChan :
0 commit comments