Skip to content

Commit f90af88

Browse files
Preisschildsmira
authored andcommitted
fix: use node podCIDRs for kubespan advertiseKubernetesNetworks
This commit changes the way kubespan gets the podCIDR to advertise when `advertiseKubernetesNetworks` is enabled. Before, it used the interface address, but some CNIs (such as Cilium in NativeRouting) only set a single /32 IP to a single interface (`cilium_host` in cilium's case). This adds the `v1.Node`'s `.spec.podCIDRs` array to the `k8s.NodeStatus` object and uses this to advertise the kubernetes network. Signed-off-by: Florian Ströger <stroeger@youniqx.com> Signed-off-by: Andrey Smirnov <andrey.smirnov@siderolabs.com> (cherry picked from commit 5629207)
1 parent a025ea4 commit f90af88

File tree

11 files changed

+236
-141
lines changed

11 files changed

+236
-141
lines changed

api/resource/definitions/k8s/k8s.proto

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -213,6 +213,7 @@ message NodeStatusSpec {
213213
bool unschedulable = 3;
214214
map<string, string> labels = 4;
215215
map<string, string> annotations = 5;
216+
repeated common.NetIPPrefix pod_cid_rs = 6;
216217
}
217218

218219
// NodeTaintSpecSpec represents a label that's attached to a Talos node.

internal/app/machined/pkg/controllers/cluster/local_affiliate.go

Lines changed: 9 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -68,6 +68,11 @@ func (ctrl *LocalAffiliateController) Inputs() []controller.Input {
6868
Type: network.NodeAddressType,
6969
Kind: controller.InputWeak,
7070
},
71+
{
72+
Namespace: k8s.NamespaceName,
73+
Type: k8s.NodeStatusType,
74+
Kind: controller.InputWeak,
75+
},
7176
{
7277
Namespace: kubespan.NamespaceName,
7378
Type: kubespan.IdentityType,
@@ -196,9 +201,9 @@ func (ctrl *LocalAffiliateController) Run(ctx context.Context, r controller.Runt
196201
return fmt.Errorf("error getting kubespan config: %w", err)
197202
}
198203

199-
ksAdditionalAddresses, err := safe.ReaderGetByID[*network.NodeAddress](ctx, r, network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterOnlyK8s))
204+
nodeStatus, err := safe.ReaderGetByID[*k8s.NodeStatus](ctx, r, nodename.TypedSpec().Nodename)
200205
if err != nil && !state.IsNotFoundError(err) {
201-
return fmt.Errorf("error getting kubespan additional addresses: %w", err)
206+
return fmt.Errorf("error getting node status: %w", err)
202207
}
203208

204209
discoveredPublicIPs, err := safe.ReaderList[*network.AddressStatus](ctx, r, resource.NewMetadata(cluster.NamespaceName, network.AddressStatusType, "", resource.VersionUndefined))
@@ -245,8 +250,8 @@ func (ctrl *LocalAffiliateController) Run(ctx context.Context, r controller.Runt
245250
spec.KubeSpan.Address = kubespanIdentity.TypedSpec().Address.Addr()
246251
spec.KubeSpan.PublicKey = kubespanIdentity.TypedSpec().PublicKey
247252

248-
if kubespanConfig.TypedSpec().AdvertiseKubernetesNetworks && ksAdditionalAddresses != nil {
249-
spec.KubeSpan.AdditionalAddresses = slices.Clone(ksAdditionalAddresses.TypedSpec().Addresses)
253+
if kubespanConfig.TypedSpec().AdvertiseKubernetesNetworks && nodeStatus != nil {
254+
spec.KubeSpan.AdditionalAddresses = slices.Clone(nodeStatus.TypedSpec().PodCIDRs)
250255
} else {
251256
spec.KubeSpan.AdditionalAddresses = nil
252257
}

internal/app/machined/pkg/controllers/cluster/local_affiliate_test.go

Lines changed: 7 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -37,7 +37,7 @@ func (suite *LocalAffiliateSuite) TestGeneration() {
3737

3838
suite.Require().NoError(suite.runtime.RegisterController(&clusterctrl.LocalAffiliateController{}))
3939

40-
nodeIdentity, nonK8sRoutedAddresses, discoveryConfig := suite.createResources()
40+
nodeIdentity, nonK8sRoutedAddresses, nodeName, discoveryConfig := suite.createResources()
4141

4242
machineType := config.NewMachineType()
4343
machineType.SetMachineType(machine.TypeWorker)
@@ -82,9 +82,9 @@ func (suite *LocalAffiliateSuite) TestGeneration() {
8282
nonK8sRoutedAddresses.TypedSpec().Addresses = append(nonK8sRoutedAddresses.TypedSpec().Addresses, ksIdentity.TypedSpec().Address)
8383
suite.Require().NoError(suite.state.Update(suite.ctx, nonK8sRoutedAddresses))
8484

85-
onlyK8sAddresses := network.NewNodeAddress(network.NamespaceName, network.FilteredNodeAddressID(network.NodeAddressCurrentID, k8s.NodeAddressFilterOnlyK8s))
86-
onlyK8sAddresses.TypedSpec().Addresses = []netip.Prefix{netip.MustParsePrefix("10.244.1.0/24")}
87-
suite.Require().NoError(suite.state.Create(suite.ctx, onlyK8sAddresses))
85+
nodeStatus := k8s.NewNodeStatus(k8s.NamespaceName, nodeName.TypedSpec().Nodename)
86+
nodeStatus.TypedSpec().PodCIDRs = []netip.Prefix{netip.MustParsePrefix("10.244.1.0/24")}
87+
suite.Require().NoError(suite.state.Create(suite.ctx, nodeStatus))
8888

8989
// add discovered public IPs
9090
for _, addr := range []netip.Addr{
@@ -151,7 +151,7 @@ func (suite *LocalAffiliateSuite) TestCPGeneration() {
151151

152152
suite.Require().NoError(suite.runtime.RegisterController(&clusterctrl.LocalAffiliateController{}))
153153

154-
nodeIdentity, _, discoveryConfig := suite.createResources()
154+
nodeIdentity, _, _, discoveryConfig := suite.createResources()
155155

156156
machineType := config.NewMachineType()
157157
machineType.SetMachineType(machine.TypeControlPlane)
@@ -185,7 +185,7 @@ func (suite *LocalAffiliateSuite) TestCPGeneration() {
185185
ctest.AssertNoResource[*cluster.Affiliate](suite, nodeIdentity.TypedSpec().NodeID)
186186
}
187187

188-
func (suite *LocalAffiliateSuite) createResources() (*cluster.Identity, *network.NodeAddress, *cluster.Config) {
188+
func (suite *LocalAffiliateSuite) createResources() (*cluster.Identity, *network.NodeAddress, *k8s.Nodename, *cluster.Config) {
189189
// regular discovery affiliate
190190
discoveryConfig := cluster.NewConfig(config.NamespaceName, cluster.ConfigID)
191191
discoveryConfig.TypedSpec().DiscoveryEnabled = true
@@ -224,7 +224,7 @@ func (suite *LocalAffiliateSuite) createResources() (*cluster.Identity, *network
224224
}
225225
suite.Require().NoError(suite.state.Create(suite.ctx, nonK8sRoutedAddresses))
226226

227-
return nodeIdentity, nonK8sRoutedAddresses, discoveryConfig
227+
return nodeIdentity, nonK8sRoutedAddresses, nodename, discoveryConfig
228228
}
229229

230230
func TestLocalAffiliateSuite(t *testing.T) {

internal/app/machined/pkg/controllers/k8s/address_filter.go

Lines changed: 30 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -38,6 +38,17 @@ func (ctrl *AddressFilterController) Inputs() []controller.Input {
3838
ID: optional.Some(config.ActiveID),
3939
Kind: controller.InputWeak,
4040
},
41+
{
42+
Namespace: k8s.NamespaceName,
43+
Type: k8s.NodenameType,
44+
ID: optional.Some(k8s.NodenameID),
45+
Kind: controller.InputWeak,
46+
},
47+
{
48+
Namespace: k8s.NamespaceName,
49+
Type: k8s.NodeStatusType,
50+
Kind: controller.InputWeak,
51+
},
4152
}
4253
}
4354

@@ -53,7 +64,7 @@ func (ctrl *AddressFilterController) Outputs() []controller.Output {
5364

5465
// Run implements controller.Controller interface.
5566
//
56-
//nolint:gocyclo
67+
//nolint:gocyclo,cyclop
5768
func (ctrl *AddressFilterController) Run(ctx context.Context, r controller.Runtime, _ *zap.Logger) error {
5869
for {
5970
select {
@@ -67,6 +78,20 @@ func (ctrl *AddressFilterController) Run(ctx context.Context, r controller.Runti
6778
return fmt.Errorf("error getting config: %w", err)
6879
}
6980

81+
nodeName, err := safe.ReaderGetByID[*k8s.Nodename](ctx, r, k8s.NodenameID)
82+
if err != nil && !state.IsNotFoundError(err) {
83+
return fmt.Errorf("error getting nodename: %w", err)
84+
}
85+
86+
var nodeStatus *k8s.NodeStatus
87+
88+
if nodeName != nil {
89+
nodeStatus, err = safe.ReaderGetByID[*k8s.NodeStatus](ctx, r, nodeName.TypedSpec().Nodename)
90+
if err != nil && !state.IsNotFoundError(err) {
91+
return fmt.Errorf("error getting nodename: %w", err)
92+
}
93+
}
94+
7095
r.StartTrackingOutputs()
7196

7297
if cfg != nil && cfg.Config().Cluster() != nil {
@@ -85,6 +110,10 @@ func (ctrl *AddressFilterController) Run(ctx context.Context, r controller.Runti
85110
podCIDRs = append(podCIDRs, ipPrefix)
86111
}
87112

113+
if nodeStatus != nil {
114+
podCIDRs = append(podCIDRs, nodeStatus.TypedSpec().PodCIDRs...)
115+
}
116+
88117
for _, cidr := range cfgProvider.Cluster().Network().ServiceCIDRs() {
89118
var ipPrefix netip.Prefix
90119

internal/app/machined/pkg/controllers/k8s/address_filter_test.go

Lines changed: 60 additions & 112 deletions
Original file line numberDiff line numberDiff line change
@@ -6,22 +6,16 @@
66
package k8s_test
77

88
import (
9-
"context"
109
"fmt"
10+
"net/netip"
1111
"net/url"
12-
"sync"
1312
"testing"
1413
"time"
1514

16-
"github.com/cosi-project/runtime/pkg/controller/runtime"
17-
"github.com/cosi-project/runtime/pkg/resource"
18-
"github.com/cosi-project/runtime/pkg/state"
19-
"github.com/cosi-project/runtime/pkg/state/impl/inmem"
20-
"github.com/cosi-project/runtime/pkg/state/impl/namespaced"
21-
"github.com/siderolabs/go-retry/retry"
15+
"github.com/stretchr/testify/assert"
2216
"github.com/stretchr/testify/suite"
23-
"go.uber.org/zap/zaptest"
2417

18+
"github.com/siderolabs/talos/internal/app/machined/pkg/controllers/ctest"
2519
k8sctrl "github.com/siderolabs/talos/internal/app/machined/pkg/controllers/k8s"
2620
"github.com/siderolabs/talos/pkg/machinery/config/container"
2721
"github.com/siderolabs/talos/pkg/machinery/config/types/v1alpha1"
@@ -31,55 +25,7 @@ import (
3125
)
3226

3327
type K8sAddressFilterSuite struct {
34-
suite.Suite
35-
36-
state state.State
37-
38-
runtime *runtime.Runtime
39-
wg sync.WaitGroup
40-
41-
//nolint:containedctx
42-
ctx context.Context
43-
ctxCancel context.CancelFunc
44-
}
45-
46-
func (suite *K8sAddressFilterSuite) SetupTest() {
47-
suite.ctx, suite.ctxCancel = context.WithTimeout(context.Background(), 3*time.Minute)
48-
49-
suite.state = state.WrapCore(namespaced.NewState(inmem.Build))
50-
51-
var err error
52-
53-
suite.runtime, err = runtime.NewRuntime(suite.state, zaptest.NewLogger(suite.T()))
54-
suite.Require().NoError(err)
55-
56-
suite.Require().NoError(suite.runtime.RegisterController(&k8sctrl.AddressFilterController{}))
57-
58-
suite.startRuntime()
59-
}
60-
61-
func (suite *K8sAddressFilterSuite) startRuntime() {
62-
suite.wg.Go(func() {
63-
suite.Assert().NoError(suite.runtime.Run(suite.ctx))
64-
})
65-
}
66-
67-
func (suite *K8sAddressFilterSuite) assertResource(
68-
md resource.Metadata,
69-
check func(res resource.Resource) error,
70-
) func() error {
71-
return func() error {
72-
r, err := suite.state.Get(suite.ctx, md)
73-
if err != nil {
74-
if state.IsNotFoundError(err) {
75-
return retry.ExpectedError(err)
76-
}
77-
78-
return err
79-
}
80-
81-
return check(r)
82-
}
28+
ctest.DefaultSuite
8329
}
8430

8531
func (suite *K8sAddressFilterSuite) TestReconcile() {
@@ -111,67 +57,69 @@ func (suite *K8sAddressFilterSuite) TestReconcile() {
11157
},
11258
),
11359
)
114-
suite.Require().NoError(suite.state.Create(suite.ctx, cfg))
115-
116-
suite.Assert().NoError(
117-
retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
118-
suite.assertResource(
119-
resource.NewMetadata(
120-
network.NamespaceName,
121-
network.NodeAddressFilterType,
122-
k8s.NodeAddressFilterOnlyK8s,
123-
resource.VersionUndefined,
124-
),
125-
func(res resource.Resource) error {
126-
spec := res.(*network.NodeAddressFilter).TypedSpec()
127-
128-
suite.Assert().Equal(
129-
"[10.32.0.0/12 fd00:10:32::/102 10.200.0.0/22 fd40:10:200::/112]",
130-
fmt.Sprintf("%s", spec.IncludeSubnets),
131-
)
132-
suite.Assert().Empty(spec.ExcludeSubnets)
133-
134-
return nil
135-
},
136-
),
137-
),
138-
)
60+
suite.Create(cfg)
13961

140-
suite.Assert().NoError(
141-
retry.Constant(3*time.Second, retry.WithUnits(100*time.Millisecond)).Retry(
142-
suite.assertResource(
143-
resource.NewMetadata(
144-
network.NamespaceName,
145-
network.NodeAddressFilterType,
146-
k8s.NodeAddressFilterNoK8s,
147-
resource.VersionUndefined,
148-
),
149-
func(res resource.Resource) error {
150-
spec := res.(*network.NodeAddressFilter).TypedSpec()
151-
152-
suite.Assert().Empty(spec.IncludeSubnets)
153-
suite.Assert().Equal(
154-
"[10.32.0.0/12 fd00:10:32::/102 10.200.0.0/22 fd40:10:200::/112]",
155-
fmt.Sprintf("%s", spec.ExcludeSubnets),
156-
)
157-
158-
return nil
159-
},
160-
),
161-
),
162-
)
163-
}
62+
ctest.AssertResource(suite, k8s.NodeAddressFilterOnlyK8s, func(res *network.NodeAddressFilter, asrt *assert.Assertions) {
63+
spec := res.TypedSpec()
64+
65+
asrt.Equal(
66+
"[10.32.0.0/12 fd00:10:32::/102 10.200.0.0/22 fd40:10:200::/112]",
67+
fmt.Sprintf("%s", spec.IncludeSubnets),
68+
)
69+
asrt.Empty(spec.ExcludeSubnets)
70+
})
71+
72+
ctest.AssertResource(suite, k8s.NodeAddressFilterNoK8s, func(res *network.NodeAddressFilter, asrt *assert.Assertions) {
73+
spec := res.TypedSpec()
74+
75+
asrt.Empty(spec.IncludeSubnets)
76+
asrt.Equal(
77+
"[10.32.0.0/12 fd00:10:32::/102 10.200.0.0/22 fd40:10:200::/112]",
78+
fmt.Sprintf("%s", spec.ExcludeSubnets),
79+
)
80+
})
81+
82+
// create NodeStatus with PodCIDRs
83+
nodeName := k8s.NewNodename(k8s.NamespaceName, k8s.NodenameID)
84+
nodeName.TypedSpec().Nodename = "test-node"
85+
suite.Create(nodeName)
16486

165-
func (suite *K8sAddressFilterSuite) TearDownTest() {
166-
suite.T().Log("tear down")
87+
nodeStatus := k8s.NewNodeStatus(k8s.NamespaceName, "test-node")
88+
nodeStatus.TypedSpec().PodCIDRs = []netip.Prefix{
89+
netip.MustParsePrefix("192.168.0.0/24"),
90+
}
91+
suite.Create(nodeStatus)
16792

168-
suite.ctxCancel()
93+
ctest.AssertResource(suite, k8s.NodeAddressFilterOnlyK8s, func(res *network.NodeAddressFilter, asrt *assert.Assertions) {
94+
spec := res.TypedSpec()
95+
96+
asrt.Equal(
97+
"[10.32.0.0/12 fd00:10:32::/102 192.168.0.0/24 10.200.0.0/22 fd40:10:200::/112]",
98+
fmt.Sprintf("%s", spec.IncludeSubnets),
99+
)
100+
asrt.Empty(spec.ExcludeSubnets)
101+
})
169102

170-
suite.wg.Wait()
103+
ctest.AssertResource(suite, k8s.NodeAddressFilterNoK8s, func(res *network.NodeAddressFilter, asrt *assert.Assertions) {
104+
spec := res.TypedSpec()
105+
106+
asrt.Empty(spec.IncludeSubnets)
107+
asrt.Equal(
108+
"[10.32.0.0/12 fd00:10:32::/102 192.168.0.0/24 10.200.0.0/22 fd40:10:200::/112]",
109+
fmt.Sprintf("%s", spec.ExcludeSubnets),
110+
)
111+
})
171112
}
172113

173114
func TestK8sAddressFilterSuite(t *testing.T) {
174115
t.Parallel()
175116

176-
suite.Run(t, new(K8sAddressFilterSuite))
117+
suite.Run(t, &K8sAddressFilterSuite{
118+
DefaultSuite: ctest.DefaultSuite{
119+
Timeout: 10 * time.Second,
120+
AfterSetup: func(suite *ctest.DefaultSuite) {
121+
suite.Require().NoError(suite.Runtime().RegisterController(&k8sctrl.AddressFilterController{}))
122+
},
123+
},
124+
})
177125
}

internal/app/machined/pkg/controllers/k8s/node_status.go

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -7,6 +7,7 @@ package k8s
77
import (
88
"context"
99
"fmt"
10+
"net/netip"
1011

1112
"github.com/cosi-project/runtime/pkg/controller"
1213
"github.com/cosi-project/runtime/pkg/resource"
@@ -179,13 +180,26 @@ func (ctrl *NodeStatusController) Run(ctx context.Context, r controller.Runtime,
179180
}
180181

181182
if node != nil {
183+
podCIDRs := make([]netip.Prefix, 0, len(node.Spec.PodCIDRs))
184+
for _, cidr := range node.Spec.PodCIDRs {
185+
prefix, err := netip.ParsePrefix(cidr)
186+
if err != nil {
187+
logger.Warn("error parsing pod CIDR", zap.String("cidr", cidr), zap.Error(err))
188+
189+
continue
190+
}
191+
192+
podCIDRs = append(podCIDRs, prefix)
193+
}
194+
182195
if err = safe.WriterModify(ctx, r, k8s.NewNodeStatus(k8s.NamespaceName, node.Name),
183196
func(res *k8s.NodeStatus) error {
184197
res.TypedSpec().Nodename = node.Name
185198
res.TypedSpec().Unschedulable = node.Spec.Unschedulable
186199
res.TypedSpec().Labels = node.Labels
187200
res.TypedSpec().Annotations = node.Annotations
188201
res.TypedSpec().NodeReady = false
202+
res.TypedSpec().PodCIDRs = podCIDRs
189203

190204
for _, condition := range node.Status.Conditions {
191205
if condition.Type == v1.NodeReady {

0 commit comments

Comments
 (0)