Skip to content

Commit 3377927

Browse files
fristoniojoestringer
authored andcommitted
daemon: extract restored endpoints regen handling to a separate method
Signed-off-by: Deepesh Pathak <deepesh.pathak@isovalent.com>
1 parent c4870be commit 3377927

1 file changed

Lines changed: 66 additions & 66 deletions

File tree

daemon/cmd/state.go

Lines changed: 66 additions & 66 deletions
Original file line numberDiff line numberDiff line change
@@ -278,10 +278,6 @@ func (d *Daemon) regenerateRestoredEndpoints(state *endpointRestoreState, endpoi
278278
// match this Cilium userspace instance. If not, it must be removed
279279
ctmap.DeleteIfUpgradeNeeded()
280280

281-
// we need to signalize when the endpoints are regenerated, i.e., when
282-
// they have finished to rebuild after being restored.
283-
epRegenerated := make(chan bool, len(state.restored))
284-
285281
// Insert all endpoints into the endpoint list first before starting
286282
// the regeneration. This is required to ensure that if an individual
287283
// regeneration causes an identity change of an endpoint, the new
@@ -304,57 +300,32 @@ func (d *Daemon) regenerateRestoredEndpoints(state *endpointRestoreState, endpoi
304300
}
305301
}
306302

307-
if option.Config.EnableIPSec {
308-
// To support v1.18 VinE upgrades, we need to restore the host
309-
// endpoint before any other endpoint, to ensure a drop-less upgrade.
310-
// This is because in v1.18 'bpf_lxc' programs stop issuing IPsec hooks
311-
// which trigger encryption.
312-
//
313-
// Instead, 'bpf_host' is responsible for performing IPsec hooks.
314-
// Therefore, we want 'bpf_host' to regenerate BEFORE 'bpf_lxc' so the
315-
// IPsec hooks are always present while 'bpf_lxc' programs regen,
316-
// ensuring no IPsec leaks occur.
317-
//
318-
// This can be removed in v1.19.
319-
for _, ep := range state.restored {
320-
if ep.IsHost() {
321-
d.logger.Info("Successfully restored endpoint. Scheduling regeneration", logfields.EndpointID, ep.ID)
322-
if err := ep.RegenerateAfterRestore(endpointsRegenerator, d.endpointMetadata.FetchK8sMetadataForEndpoint); err != nil {
323-
d.logger.Debug(
324-
"error regenerating during restore",
325-
logfields.Error, err,
326-
logfields.EndpointID, ep.ID,
327-
)
328-
epRegenerated <- false
329-
} else {
330-
epRegenerated <- true
331-
}
332-
break
333-
}
334-
}
335-
}
336-
303+
endpointsToRegenerate := make([]*endpoint.Endpoint, 0, len(state.restored))
337304
for _, ep := range state.restored {
338305
if ep.IsHost() && option.Config.EnableIPSec {
339-
// The host endpoint was handled above.
340-
continue
341-
}
342-
d.logger.Info(
343-
"Successfully restored endpoint. Scheduling regeneration",
344-
logfields.EndpointID, ep.ID,
345-
)
346-
go func(ep *endpoint.Endpoint, epRegenerated chan<- bool) {
306+
// To support v1.18 VinE upgrades, we need to restore the host
307+
// endpoint before any other endpoint, to ensure a drop-less upgrade.
308+
// This is because in v1.18 'bpf_lxc' programs stop issuing IPsec hooks
309+
// which trigger encryption.
310+
//
311+
// Instead, 'bpf_host' is responsible for performing IPsec hooks.
312+
// Therefore, we want 'bpf_host' to regenerate BEFORE 'bpf_lxc' so the
313+
// IPsec hooks are always present while 'bpf_lxc' programs regen,
314+
// ensuring no IPsec leaks occur.
315+
//
316+
// This can be removed in v1.19.
317+
d.logger.Info("Successfully restored Host endpoint. Scheduling regeneration", logfields.EndpointID, ep.ID)
347318
if err := ep.RegenerateAfterRestore(endpointsRegenerator, d.endpointMetadata.FetchK8sMetadataForEndpoint); err != nil {
348319
d.logger.Debug(
349-
"error regenerating during restore",
320+
"Error regenerating Host endpoint during restore",
350321
logfields.Error, err,
351322
logfields.EndpointID, ep.ID,
352323
)
353-
epRegenerated <- false
354-
return
355324
}
356-
epRegenerated <- true
357-
}(ep, epRegenerated)
325+
continue
326+
}
327+
328+
endpointsToRegenerate = append(endpointsToRegenerate, ep)
358329
}
359330

360331
var endpointCleanupCompleted sync.WaitGroup
@@ -375,35 +346,64 @@ func (d *Daemon) regenerateRestoredEndpoints(state *endpointRestoreState, endpoi
375346
}
376347
endpointCleanupCompleted.Wait()
377348

349+
// Trigger regeneration for relevant restored endopints in a separate goroutine.
350+
go d.handleRestoredEndpointsRegeneration(endpointsToRegenerate, endpointsRegenerator)
351+
378352
go func() {
379353
for _, ep := range state.restored {
380354
<-ep.InitialEnvoyPolicyComputed
381355
}
382356
close(d.endpointInitialPolicyComplete)
383357
}()
358+
}
384359

385-
go func() {
386-
regenerated, total := 0, 0
387-
if len(state.restored) > 0 {
388-
for buildSuccess := range epRegenerated {
389-
if buildSuccess {
390-
regenerated++
391-
}
392-
total++
393-
if total >= len(state.restored) {
394-
break
395-
}
396-
}
397-
}
398-
close(epRegenerated)
360+
func (d *Daemon) handleRestoredEndpointsRegeneration(endpoints []*endpoint.Endpoint, endpointsRegenerator *endpoint.Regenerator) {
361+
regenWg := &sync.WaitGroup{}
362+
epRegenerated := make(chan bool, len(endpoints))
399363

364+
for _, ep := range endpoints {
400365
d.logger.Info(
401-
"Finished regenerating restored endpoints",
402-
logfields.Regenerated, regenerated,
403-
logfields.Total, total,
366+
"Successfully restored endpoint. Scheduling regeneration",
367+
logfields.EndpointID, ep.ID,
404368
)
405-
close(d.endpointRestoreComplete)
406-
}()
369+
370+
regenWg.Add(1)
371+
go func(ep *endpoint.Endpoint, wg *sync.WaitGroup, endpointsRegenerated chan<- bool) {
372+
defer wg.Done()
373+
374+
if err := ep.RegenerateAfterRestore(endpointsRegenerator, d.endpointMetadata.FetchK8sMetadataForEndpoint); err != nil {
375+
d.logger.Debug(
376+
"Error regenerating endpoint during restore",
377+
logfields.Error, err,
378+
logfields.EndpointID, ep.ID,
379+
)
380+
endpointsRegenerated <- false
381+
} else {
382+
endpointsRegenerated <- true
383+
}
384+
}(ep, regenWg, epRegenerated)
385+
}
386+
387+
regenWg.Wait()
388+
close(epRegenerated)
389+
390+
total, regenerated, failed := 0, 0, 0
391+
for buildSuccess := range epRegenerated {
392+
total++
393+
if buildSuccess {
394+
regenerated++
395+
} else {
396+
failed++
397+
}
398+
}
399+
400+
d.logger.Info(
401+
"Finished regenerating restored endpoints",
402+
logfields.Regenerated, regenerated,
403+
logfields.Failed, failed,
404+
logfields.Total, total,
405+
)
406+
close(d.endpointRestoreComplete)
407407
}
408408

409409
func (d *Daemon) allocateIPsLocked(ep *endpoint.Endpoint) (err error) {

0 commit comments

Comments
 (0)