Skip to content

Commit fa35fa0

Browse files
committed
node/manager: Add configurable cluster mutator in Node obj handling
Introduce the possibility of configuring a prefix cluster mutator to customize the prefixes from the Node object. Signed-off-by: Fabio Falzoi <fabio.falzoi@isovalent.com>
1 parent ade6c0c commit fa35fa0

3 files changed

Lines changed: 97 additions & 47 deletions

File tree

pkg/clustermesh/script_test.go

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -256,6 +256,11 @@ func (d dummyNodeManager) Unsubscribe(types.NodeHandler) {
256256
panic("unimplemented")
257257
}
258258

259+
// SetPrefixClusterMutatorFn implements manager.NodeManager
260+
func (d dummyNodeManager) SetPrefixClusterMutatorFn(mutator func(*nodeTypes.Node) []cmtypes.PrefixClusterOpts) {
261+
panic("unimplemented")
262+
}
263+
259264
var _ nodemanager.NodeManager = dummyNodeManager{}
260265

261266
type dummyRemoteIdentityWatcher struct{}

pkg/node/manager/cell.go

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8,6 +8,7 @@ import (
88
"github.com/cilium/hive/job"
99
"github.com/cilium/statedb"
1010

11+
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
1112
"github.com/cilium/cilium/pkg/datapath/iptables/ipset"
1213
"github.com/cilium/cilium/pkg/datapath/tables"
1314
datapath "github.com/cilium/cilium/pkg/datapath/types"
@@ -79,6 +80,11 @@ type NodeManager interface {
7980
// StartNodeNeighborLinkUpdater spawns a controller that watches a queue
8081
// for node neighbor link updates.
8182
StartNodeNeighborLinkUpdater(nh datapath.NodeNeighbors)
83+
84+
// SetPrefixClusterMutatorFn allows to inject a custom prefix cluster mutator.
85+
// The mutator may then be applied to the PrefixCluster(s) using cmtypes.PrefixClusterFrom,
86+
// cmtypes.PrefixClusterFromCIDR and the like.
87+
SetPrefixClusterMutatorFn(mutator func(*types.Node) []cmtypes.PrefixClusterOpts)
8288
}
8389

8490
func newAllNodeManager(in struct {

pkg/node/manager/manager.go

Lines changed: 86 additions & 47 deletions
Original file line numberDiff line numberDiff line change
@@ -178,6 +178,9 @@ type manager struct {
178178
db *statedb.DB
179179
// The devices table
180180
devices statedb.Table[*tables.Device]
181+
182+
// custom mutator function to enrich prefixCluster(s) from node objects.
183+
prefixClusterMutatorFn func(node *nodeTypes.Node) []cmtypes.PrefixClusterOpts
181184
}
182185

183186
type nodeQueueEntry struct {
@@ -315,20 +318,21 @@ func New(
315318
}
316319

317320
m := &manager{
318-
nodes: map[nodeTypes.Identity]*nodeEntry{},
319-
restoredNodes: map[nodeTypes.Identity]*nodeTypes.Node{},
320-
conf: c,
321-
controllerManager: controller.NewManager(),
322-
nodeHandlers: map[datapath.NodeHandler]struct{}{},
323-
ipcache: ipCache,
324-
ipsetMgr: ipsetMgr,
325-
ipsetInitializer: ipsetMgr.NewInitializer(),
326-
ipsetFilter: ipsetFilter,
327-
metrics: nodeMetrics,
328-
health: health,
329-
jobGroup: jobGroup,
330-
db: db,
331-
devices: devices,
321+
nodes: map[nodeTypes.Identity]*nodeEntry{},
322+
restoredNodes: map[nodeTypes.Identity]*nodeTypes.Node{},
323+
conf: c,
324+
controllerManager: controller.NewManager(),
325+
nodeHandlers: map[datapath.NodeHandler]struct{}{},
326+
ipcache: ipCache,
327+
ipsetMgr: ipsetMgr,
328+
ipsetInitializer: ipsetMgr.NewInitializer(),
329+
ipsetFilter: ipsetFilter,
330+
metrics: nodeMetrics,
331+
health: health,
332+
jobGroup: jobGroup,
333+
db: db,
334+
devices: devices,
335+
prefixClusterMutatorFn: func(node *nodeTypes.Node) []cmtypes.PrefixClusterOpts { return nil },
332336
}
333337

334338
return m, nil
@@ -818,10 +822,16 @@ func (m *manager) NodeUpdated(n nodeTypes.Node) {
818822
var nodeIPsAdded, healthIPsAdded, ingressIPsAdded, podCIDRsAdded []netip.Prefix
819823

820824
for _, address := range n.IPAddresses {
821-
prefixCluster := cmtypes.NewLocalPrefixCluster(ip.IPToNetPrefix(address.IP))
825+
prefix := ip.IPToNetPrefix(address.IP)
826+
var prefixCluster cmtypes.PrefixCluster
827+
if address.Type == addressing.NodeCiliumInternalIP {
828+
prefixCluster = cmtypes.PrefixClusterFrom(prefix.Addr(), prefix.Bits(), m.prefixClusterMutatorFn(&n)...)
829+
} else {
830+
prefixCluster = cmtypes.NewLocalPrefixCluster(prefix)
831+
}
822832

823833
if address.Type == addressing.NodeInternalIP && !m.ipsetFilter(&n) {
824-
ipsetEntries = append(ipsetEntries, prefixCluster.AsPrefix())
834+
ipsetEntries = append(ipsetEntries, prefix)
825835
}
826836

827837
// Always set the tunnelIP so it can be used for metadata like DSR info
@@ -897,47 +907,53 @@ func (m *manager) NodeUpdated(n nodeTypes.Node) {
897907
ipv6PodCIDRs := n.GetIPv6AllocCIDRs()
898908

899909
mu := make([]ipcache.MU, 0, len(ipv4PodCIDRs)+len(ipv6PodCIDRs))
900-
for entry := range m.podCIDREntries(n.Source, resource, ipv4PodCIDRs, nodeIP, n.EncryptionKey) {
910+
for entry := range m.podCIDREntries(n.Source, resource, m.cidrsToPrefixesCluster(&n, ipv4PodCIDRs...), nodeIP, n.EncryptionKey) {
901911
mu = append(mu, entry)
902912
podCIDRsAdded = append(podCIDRsAdded, entry.Prefix.AsPrefix())
903913
}
904-
for entry := range m.podCIDREntries(n.Source, resource, ipv6PodCIDRs, nodeIP, n.EncryptionKey) {
914+
for entry := range m.podCIDREntries(n.Source, resource, m.cidrsToPrefixesCluster(&n, ipv6PodCIDRs...), nodeIP, n.EncryptionKey) {
905915
mu = append(mu, entry)
906916
podCIDRsAdded = append(podCIDRsAdded, entry.Prefix.AsPrefix())
907917
}
908918
m.ipcache.UpsertMetadataBatch(mu...)
909919
}
910920

911921
for _, address := range []net.IP{n.IPv4HealthIP, n.IPv6HealthIP} {
912-
healthIP := cmtypes.NewLocalPrefixCluster(ip.IPToNetPrefix(address))
913-
if !healthIP.AsPrefix().IsValid() {
922+
prefix := ip.IPToNetPrefix(address)
923+
if !prefix.IsValid() {
914924
continue
915925
}
916-
if !source.AllowOverwrite(m.ipcache.GetMetadataSourceByPrefix(healthIP), n.Source) {
926+
927+
prefixCluster := cmtypes.PrefixClusterFrom(prefix.Addr(), prefix.Bits(), m.prefixClusterMutatorFn(&n)...)
928+
929+
if !source.AllowOverwrite(m.ipcache.GetMetadataSourceByPrefix(prefixCluster), n.Source) {
917930
dpUpdate = false
918931
}
919932

920-
m.ipcache.UpsertMetadata(healthIP, n.Source, resource,
933+
m.ipcache.UpsertMetadata(prefixCluster, n.Source, resource,
921934
labels.LabelHealth,
922935
ipcacheTypes.TunnelPeer{Addr: nodeIP},
923936
m.endpointEncryptionKey(&n))
924-
healthIPsAdded = append(healthIPsAdded, healthIP.AsPrefix())
937+
healthIPsAdded = append(healthIPsAdded, prefixCluster.AsPrefix())
925938
}
926939

927940
for _, address := range []net.IP{n.IPv4IngressIP, n.IPv6IngressIP} {
928-
ingressIP := cmtypes.NewLocalPrefixCluster(ip.IPToNetPrefix(address))
929-
if !ingressIP.AsPrefix().IsValid() {
941+
prefix := ip.IPToNetPrefix(address)
942+
if !prefix.IsValid() {
930943
continue
931944
}
932-
if !source.AllowOverwrite(m.ipcache.GetMetadataSourceByPrefix(ingressIP), n.Source) {
945+
946+
prefixCluster := cmtypes.PrefixClusterFrom(prefix.Addr(), prefix.Bits(), m.prefixClusterMutatorFn(&n)...)
947+
948+
if !source.AllowOverwrite(m.ipcache.GetMetadataSourceByPrefix(prefixCluster), n.Source) {
933949
dpUpdate = false
934950
}
935951

936-
m.ipcache.UpsertMetadata(ingressIP, n.Source, resource,
952+
m.ipcache.UpsertMetadata(prefixCluster, n.Source, resource,
937953
labels.LabelIngress,
938954
ipcacheTypes.TunnelPeer{Addr: nodeIP},
939955
m.endpointEncryptionKey(&n))
940-
ingressIPsAdded = append(ingressIPsAdded, ingressIP.AsPrefix())
956+
ingressIPsAdded = append(ingressIPsAdded, prefixCluster.AsPrefix())
941957
}
942958

943959
m.mutex.Lock()
@@ -1018,28 +1034,33 @@ func (m *manager) NodeUpdated(n nodeTypes.Node) {
10181034
}
10191035
}
10201036

1021-
func (m *manager) podCIDREntries(source source.Source, resource ipcacheTypes.ResourceID, podCIDRs []*cidr.CIDR, tunnelIP netip.Addr, encryptKey uint8) iter.Seq[ipcache.MU] {
1022-
return func(yield func(ipcache.MU) bool) {
1023-
for _, podCIDR := range podCIDRs {
1024-
if podCIDR == nil {
1025-
continue
1037+
func (m *manager) cidrsToPrefixesCluster(n *nodeTypes.Node, cidrs ...*cidr.CIDR) iter.Seq[cmtypes.PrefixCluster] {
1038+
return func(yield func(cmtypes.PrefixCluster) bool) {
1039+
for _, cidr := range cidrs {
1040+
if !yield(cmtypes.PrefixClusterFromCIDR(cidr, m.prefixClusterMutatorFn(n)...)) {
1041+
return
10261042
}
1043+
}
1044+
}
1045+
}
10271046

1028-
prefix, ok := netipx.FromStdIPNet(podCIDR.IPNet)
1029-
if !ok {
1047+
func (m *manager) podCIDREntries(source source.Source, resource ipcacheTypes.ResourceID, prefixes iter.Seq[cmtypes.PrefixCluster], tunnelIP netip.Addr, encryptKey uint8) iter.Seq[ipcache.MU] {
1048+
return func(yield func(ipcache.MU) bool) {
1049+
for prefix := range prefixes {
1050+
if !prefix.IsValid() {
10301051
continue
10311052
}
10321053

10331054
metadata := []ipcache.IPMetadata{
1034-
worldLabelForPrefix(prefix),
1055+
worldLabelForPrefix(prefix.AsPrefix()),
10351056
ipcacheTypes.TunnelPeer{Addr: tunnelIP},
10361057
}
10371058
if m.nodeAddressHasEncryptKey() {
10381059
metadata = append(metadata, ipcacheTypes.EncryptKey(encryptKey))
10391060
}
10401061

10411062
if !yield(ipcache.MU{
1042-
Prefix: cmtypes.NewLocalPrefixCluster(prefix),
1063+
Prefix: prefix,
10431064
Source: source,
10441065
Resource: resource,
10451066
Metadata: metadata,
@@ -1070,11 +1091,18 @@ func (m *manager) removeNodeFromIPCache(oldNode nodeTypes.Node, resource ipcache
10701091
// Delete the old node IP addresses if they have changed in this node.
10711092
var v4Addrs, v6Addrs []netip.Addr
10721093
for _, address := range oldNode.IPAddresses {
1073-
oldPrefixCluster := cmtypes.NewLocalPrefixCluster(ip.IPToNetPrefix(address.IP))
1074-
if slices.Contains(nodeIPsAdded, oldPrefixCluster.AsPrefix()) {
1094+
prefix := ip.IPToNetPrefix(address.IP)
1095+
if slices.Contains(nodeIPsAdded, prefix) {
10751096
continue
10761097
}
10771098

1099+
var oldPrefixCluster cmtypes.PrefixCluster
1100+
if address.Type == addressing.NodeCiliumInternalIP {
1101+
oldPrefixCluster = cmtypes.PrefixClusterFrom(prefix.Addr(), prefix.Bits(), m.prefixClusterMutatorFn(&oldNode)...)
1102+
} else {
1103+
oldPrefixCluster = cmtypes.NewLocalPrefixCluster(prefix)
1104+
}
1105+
10781106
if address.Type == addressing.NodeInternalIP && !slices.Contains(ipsetEntries, oldPrefixCluster.AsPrefix()) {
10791107
addr, ok := netipx.FromStdIP(address.IP)
10801108
if !ok {
@@ -1119,13 +1147,13 @@ func (m *manager) removeNodeFromIPCache(oldNode nodeTypes.Node, resource ipcache
11191147
oldIPv6PodCIDRs := oldNode.GetIPv6AllocCIDRs()
11201148

11211149
mu := make([]ipcache.MU, 0, len(oldIPv4PodCIDRs)+len(oldIPv6PodCIDRs))
1122-
for entry := range m.podCIDREntries(oldNode.Source, resource, oldIPv4PodCIDRs, oldNodeIP, oldNode.EncryptionKey) {
1150+
for entry := range m.podCIDREntries(oldNode.Source, resource, m.cidrsToPrefixesCluster(&oldNode, oldIPv4PodCIDRs...), oldNodeIP, oldNode.EncryptionKey) {
11231151
if slices.Contains(podCIDRsAdded, entry.Prefix.AsPrefix()) {
11241152
continue
11251153
}
11261154
mu = append(mu, entry)
11271155
}
1128-
for entry := range m.podCIDREntries(oldNode.Source, resource, oldIPv6PodCIDRs, oldNodeIP, oldNode.EncryptionKey) {
1156+
for entry := range m.podCIDREntries(oldNode.Source, resource, m.cidrsToPrefixesCluster(&oldNode, oldIPv6PodCIDRs...), oldNodeIP, oldNode.EncryptionKey) {
11291157
if slices.Contains(podCIDRsAdded, entry.Prefix.AsPrefix()) {
11301158
continue
11311159
}
@@ -1136,25 +1164,29 @@ func (m *manager) removeNodeFromIPCache(oldNode nodeTypes.Node, resource ipcache
11361164

11371165
// Delete the old health IP addresses if they have changed in this node.
11381166
for _, address := range []net.IP{oldNode.IPv4HealthIP, oldNode.IPv6HealthIP} {
1139-
healthIP := cmtypes.NewLocalPrefixCluster(ip.IPToNetPrefix(address))
1140-
if !healthIP.IsValid() || slices.Contains(healthIPsAdded, healthIP.AsPrefix()) {
1167+
prefix := ip.IPToNetPrefix(address)
1168+
if !prefix.IsValid() || slices.Contains(healthIPsAdded, prefix) {
11411169
continue
11421170
}
11431171

1144-
m.ipcache.RemoveMetadata(healthIP, resource,
1172+
prefixCluster := cmtypes.PrefixClusterFrom(prefix.Addr(), prefix.Bits(), m.prefixClusterMutatorFn(&oldNode)...)
1173+
1174+
m.ipcache.RemoveMetadata(prefixCluster, resource,
11451175
labels.LabelHealth,
11461176
ipcacheTypes.TunnelPeer{Addr: oldNodeIP},
11471177
m.endpointEncryptionKey(&oldNode))
11481178
}
11491179

11501180
// Delete the old ingress IP addresses if they have changed in this node.
11511181
for _, address := range []net.IP{oldNode.IPv4IngressIP, oldNode.IPv6IngressIP} {
1152-
ingressIP := cmtypes.NewLocalPrefixCluster(ip.IPToNetPrefix(address))
1153-
if !ingressIP.IsValid() || slices.Contains(ingressIPsAdded, ingressIP.AsPrefix()) {
1182+
prefix := ip.IPToNetPrefix(address)
1183+
if !prefix.IsValid() || slices.Contains(ingressIPsAdded, prefix) {
11541184
continue
11551185
}
11561186

1157-
m.ipcache.RemoveMetadata(ingressIP, resource,
1187+
prefixCluster := cmtypes.PrefixClusterFrom(prefix.Addr(), prefix.Bits(), m.prefixClusterMutatorFn(&oldNode)...)
1188+
1189+
m.ipcache.RemoveMetadata(prefixCluster, resource,
11581190
labels.LabelIngress,
11591191
ipcacheTypes.TunnelPeer{Addr: oldNodeIP},
11601192
m.endpointEncryptionKey(&oldNode))
@@ -1414,3 +1446,10 @@ func (m *manager) StartNeighborRefresh(nh datapath.NodeNeighbors) {
14141446
},
14151447
)
14161448
}
1449+
1450+
// SetPrefixClusterMutatorFn allows to inject a custom prefix cluster mutator.
1451+
// The mutator may then be applied to the PrefixCluster(s) using cmtypes.PrefixClusterFrom,
1452+
// cmtypes.PrefixClusterFromCIDR and the like.
1453+
func (m *manager) SetPrefixClusterMutatorFn(mutator func(*nodeTypes.Node) []cmtypes.PrefixClusterOpts) {
1454+
m.prefixClusterMutatorFn = mutator
1455+
}

0 commit comments

Comments
 (0)