Skip to content

Commit 6a96074

Browse files
committed
Add namespace filtering to service sync and MCS API service exports
Add global namespace filtering support to: - operator/watchers/service_sync.go: Filter services based on namespace global status before syncing to kvstore - pkg/clustermesh/mcsapi/serviceexportsync.go: Filter MCS API service exports based on namespace global status - pkg/clustermesh/mcsapi/serviceexportsync_test.go: Add unit tests for namespace filtering functionality
1 parent aea05a6 commit 6a96074

4 files changed

Lines changed: 377 additions & 15 deletions

File tree

clustermesh-apiserver/clustermesh/namespacers.go

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -6,7 +6,6 @@ package clustermesh
66
import (
77
"k8s.io/apimachinery/pkg/runtime"
88

9-
cmk8s "github.com/cilium/cilium/clustermesh-apiserver/clustermesh/k8s"
109
cilium_api_v2 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2"
1110
cilium_api_v2a1 "github.com/cilium/cilium/pkg/k8s/apis/cilium.io/v2alpha1"
1211
"github.com/cilium/cilium/pkg/k8s/resource"
@@ -26,7 +25,7 @@ func (gn *GenericNamespacer[T]) ExtractNamespace(event resource.Event[T]) (names
2625
func newCiliumIdentityNamespacer() Namespacer[*cilium_api_v2.CiliumIdentity] {
2726
return &GenericNamespacer[*cilium_api_v2.CiliumIdentity]{
2827
extract: func(obj *cilium_api_v2.CiliumIdentity) string {
29-
return obj.SecurityLabels[cmk8s.PodPrefixLbl]
28+
return obj.SecurityLabels[podPrefixLbl]
3029
},
3130
}
3231
}

operator/watchers/service_sync.go

Lines changed: 107 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -12,9 +12,11 @@ import (
1212
"github.com/cilium/hive/cell"
1313
"github.com/cilium/hive/job"
1414
v1 "k8s.io/api/core/v1"
15+
"k8s.io/client-go/tools/cache"
1516

1617
operatorK8s "github.com/cilium/cilium/operator/k8s"
1718
"github.com/cilium/cilium/pkg/annotation"
19+
cmnamespace "github.com/cilium/cilium/pkg/clustermesh/namespace"
1820
serviceStore "github.com/cilium/cilium/pkg/clustermesh/store"
1921
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
2022
"github.com/cilium/cilium/pkg/k8s"
@@ -64,6 +66,13 @@ type ServiceSyncParams struct {
6466
Endpoints resource.Resource[*k8s.Endpoints]
6567
StoreFactory store.Factory
6668
ClusterServiceConverter ClusterServiceConverter
69+
70+
// NamespaceManager is used to determine if a namespace is global.
71+
// Optional - if not provided, namespace filtering is disabled.
72+
NamespaceManager cmnamespace.Manager `optional:"true"`
73+
// Namespaces is the resource for watching namespace events.
74+
// Optional - if not provided, namespace filtering is disabled.
75+
Namespaces resource.Resource[*slim_corev1.Namespace] `optional:"true"`
6776
}
6877

6978
type serviceSync struct {
@@ -121,6 +130,33 @@ func (s *serviceSync) loop(ctx context.Context, health cell.Health) error {
121130
serviceEvents := s.Services.Events(ctx)
122131
endpointEvents := s.Endpoints.Events(ctx)
123132

133+
// Setup namespace events channel if namespace filtering is enabled
134+
var namespaceEvents <-chan resource.Event[*slim_corev1.Namespace]
135+
namespaceFilteringEnabled := s.NamespaceManager != nil && s.Namespaces != nil
136+
if namespaceFilteringEnabled {
137+
s.Log.Info("Namespace filtering is enabled for service sync")
138+
namespaceEvents = s.Namespaces.Events(ctx)
139+
} else {
140+
s.Log.Info("Namespace filtering is disabled for service sync")
141+
}
142+
143+
// isNamespaceGlobal checks if the namespace is global. If namespace filtering
144+
// is disabled, all namespaces are considered global.
145+
isNamespaceGlobal := func(namespace string) bool {
146+
if !namespaceFilteringEnabled {
147+
return true
148+
}
149+
isGlobal, err := s.NamespaceManager.IsGlobalNamespaceByName(namespace)
150+
if err != nil {
151+
s.Log.Warn("Failed to determine if namespace is global, assuming not global",
152+
logfields.Error, err,
153+
logfields.K8sNamespace, namespace,
154+
)
155+
return false
156+
}
157+
return isGlobal
158+
}
159+
124160
upsert := func(cs *serviceStore.ClusterService) {
125161
if err := s.store.UpsertKey(ctx, cs); err != nil {
126162
// An error is triggered only in case it concerns service marshaling,
@@ -133,6 +169,17 @@ func (s *serviceSync) loop(ctx context.Context, health cell.Health) error {
133169
}
134170
}
135171

172+
// syncService handles upserting or deleting a service based on namespace global status
173+
syncService := func(svc *slim_corev1.Service, forceDelete bool) {
174+
cs, toUpsert := converter.Convert(svc, getEndpoints)
175+
// Only upsert if namespace is global, otherwise delete
176+
if toUpsert && !forceDelete && isNamespaceGlobal(svc.Namespace) {
177+
upsert(cs)
178+
} else {
179+
s.store.DeleteKey(ctx, cs)
180+
}
181+
}
182+
136183
for serviceEvents != nil || endpointEvents != nil {
137184
select {
138185
case ev, ok := <-serviceEvents:
@@ -152,12 +199,17 @@ func (s *serviceSync) loop(ctx context.Context, health cell.Health) error {
152199
}
153200
case resource.Upsert:
154201
svc := ev.Object
155-
cs, toUpsert := converter.Convert(svc, getEndpoints)
156-
if toUpsert {
157-
upsert(cs)
158-
} else {
159-
s.store.DeleteKey(ctx, cs)
202+
// Check namespace global status before syncing
203+
if !isNamespaceGlobal(svc.Namespace) {
204+
s.Log.Debug("Skipping service sync for non-global namespace",
205+
logfields.K8sSvcName, svc.Name,
206+
logfields.K8sNamespace, svc.Namespace,
207+
)
208+
// Delete the service from kvstore if it exists (namespace may have become non-global)
209+
s.store.DeleteKey(ctx, converter.ForDeletion(svc))
210+
continue
160211
}
212+
syncService(svc, false)
161213
case resource.Delete:
162214
s.store.DeleteKey(ctx, converter.ForDeletion(ev.Object))
163215
}
@@ -182,9 +234,59 @@ func (s *serviceSync) loop(ctx context.Context, health cell.Health) error {
182234
continue
183235
}
184236

237+
// Check namespace global status before syncing
238+
if !isNamespaceGlobal(svc.Namespace) {
239+
continue
240+
}
241+
185242
if cs, toUpsert := converter.Convert(svc, getEndpoints); toUpsert {
186243
upsert(cs)
187244
}
245+
246+
case ev, ok := <-namespaceEvents:
247+
if !ok {
248+
s.Log.Info("Namespace event channel closed, ignoring future namespace events")
249+
namespaceEvents = nil
250+
continue
251+
}
252+
ev.Done(nil)
253+
254+
if ev.Kind == resource.Sync {
255+
continue
256+
}
257+
258+
// Handle namespace changes - resync all services in this namespace
259+
if ev.Object == nil {
260+
continue
261+
}
262+
263+
nsName := ev.Key.Name
264+
isGlobal := s.NamespaceManager.IsGlobalNamespaceByObject(ev.Object)
265+
266+
s.Log.Info("Namespace global status changed, resyncing services",
267+
logfields.K8sNamespace, nsName,
268+
"isGlobal", isGlobal,
269+
)
270+
271+
// Get all services in this namespace and resync them
272+
svcs, err := services.ByIndex(cache.NamespaceIndex, nsName)
273+
if err != nil {
274+
s.Log.Warn("Failed to list services for namespace update",
275+
logfields.Error, err,
276+
logfields.K8sNamespace, nsName,
277+
)
278+
continue
279+
}
280+
281+
for _, svc := range svcs {
282+
if ev.Kind == resource.Delete || !isGlobal {
283+
// Namespace deleted or no longer global - delete all services
284+
s.store.DeleteKey(ctx, converter.ForDeletion(svc))
285+
} else {
286+
// Namespace became global - upsert services
287+
syncService(svc, false)
288+
}
289+
}
188290
}
189291
}
190292

pkg/clustermesh/mcsapi/serviceexportsync.go

Lines changed: 105 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -10,11 +10,13 @@ import (
1010
"github.com/cilium/hive/cell"
1111
"github.com/cilium/hive/job"
1212
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
13+
"k8s.io/client-go/tools/cache"
1314
"k8s.io/client-go/util/workqueue"
1415
mcsapiv1alpha1 "sigs.k8s.io/mcs-api/pkg/apis/v1alpha1"
1516

1617
"github.com/cilium/cilium/pkg/clustermesh/mcsapi/types"
1718
mcsapitypes "github.com/cilium/cilium/pkg/clustermesh/mcsapi/types"
19+
cmnamespace "github.com/cilium/cilium/pkg/clustermesh/namespace"
1820
cmtypes "github.com/cilium/cilium/pkg/clustermesh/types"
1921
"github.com/cilium/cilium/pkg/k8s/client"
2022
"github.com/cilium/cilium/pkg/k8s/resource"
@@ -57,6 +59,13 @@ type ServiceExportSyncParameters struct {
5759
Services resource.Resource[*slim_corev1.Service]
5860

5961
SyncCallback ServiceExportSyncCallback `optional:"true"`
62+
63+
// NamespaceManager is used to determine if a namespace is global.
64+
// Optional - if not provided, namespace filtering is disabled.
65+
NamespaceManager cmnamespace.Manager `optional:"true"`
66+
// Namespaces is the resource for watching namespace events.
67+
// Optional - if not provided, namespace filtering is disabled.
68+
Namespaces resource.Resource[*slim_corev1.Namespace] `optional:"true"`
6069
}
6170

6271
func registerServiceExportSync(jg job.Group, cfg ServiceExportSyncParameters) {
@@ -85,6 +94,9 @@ func registerServiceExportSync(jg job.Group, cfg ServiceExportSyncParameters) {
8594

8695
store: store,
8796
syncCallback: cfg.SyncCallback,
97+
98+
namespaceManager: cfg.NamespaceManager,
99+
namespaces: cfg.Namespaces,
88100
}).loop(ctx)
89101
return nil
90102
},
@@ -110,6 +122,10 @@ type serviceExportSync struct {
110122

111123
store store.SyncStore
112124
syncCallback ServiceExportSyncCallback
125+
126+
// Namespace filtering support
127+
namespaceManager cmnamespace.Manager
128+
namespaces resource.Resource[*slim_corev1.Namespace]
113129
}
114130

115131
func (s *serviceExportSync) loop(ctx context.Context) {
@@ -151,6 +167,47 @@ func (s *serviceExportSync) loop(ctx context.Context) {
151167
return
152168
}
153169

170+
// Setup namespace events channel if namespace filtering is enabled
171+
var namespaceEvents <-chan resource.Event[*slim_corev1.Namespace]
172+
namespaceFilteringEnabled := s.namespaceManager != nil && s.namespaces != nil
173+
if namespaceFilteringEnabled {
174+
s.logger.Info("Namespace filtering is enabled for service export sync")
175+
namespaceEvents = s.namespaces.Events(ctx)
176+
} else {
177+
s.logger.Info("Namespace filtering is disabled for service export sync")
178+
}
179+
180+
// isNamespaceGlobal checks if the namespace is global. If namespace filtering
181+
// is disabled, all namespaces are considered global.
182+
isNamespaceGlobal := func(namespace string) bool {
183+
if !namespaceFilteringEnabled {
184+
return true
185+
}
186+
isGlobal, err := s.namespaceManager.IsGlobalNamespaceByName(namespace)
187+
if err != nil {
188+
s.logger.Warn("Failed to determine if namespace is global, assuming not global",
189+
logfields.Error, err,
190+
logfields.K8sNamespace, namespace,
191+
)
192+
return false
193+
}
194+
return isGlobal
195+
}
196+
197+
// syncServiceExport syncs a service export based on namespace global status
198+
syncServiceExport := func(key resource.Key) error {
199+
// Check namespace global status before syncing
200+
if !isNamespaceGlobal(key.Namespace) {
201+
s.logger.Debug("Skipping service export sync for non-global namespace",
202+
logfields.K8sSvcName, key.Name,
203+
logfields.K8sNamespace, key.Namespace,
204+
)
205+
// Delete the service export from kvstore if it exists (namespace may have become non-global)
206+
return s.store.DeleteKey(ctx, types.NewEmptyMCSAPIServiceSpec(s.clusterName, key.Namespace, key.Name))
207+
}
208+
return s.syncMCSAPIServiceSpec(ctx, serviceStore, serviceExportStore, key)
209+
}
210+
154211
servicesSynced, serviceExportsSynced := false, false
155212
for serviceEvents != nil || serviceExportsEvents != nil {
156213
select {
@@ -169,8 +226,7 @@ func (s *serviceExportSync) loop(ctx context.Context) {
169226
continue
170227
}
171228

172-
ev.Done(s.syncMCSAPIServiceSpec(ctx,
173-
serviceStore, serviceExportStore, ev.Key))
229+
ev.Done(syncServiceExport(ev.Key))
174230

175231
case ev, ok := <-serviceExportsEvents:
176232
if !ok {
@@ -187,8 +243,53 @@ func (s *serviceExportSync) loop(ctx context.Context) {
187243
continue
188244
}
189245

190-
ev.Done(s.syncMCSAPIServiceSpec(ctx,
191-
serviceStore, serviceExportStore, ev.Key))
246+
ev.Done(syncServiceExport(ev.Key))
247+
248+
case ev, ok := <-namespaceEvents:
249+
if !ok {
250+
s.logger.Info("Namespace event channel closed, ignoring future namespace events")
251+
namespaceEvents = nil
252+
continue
253+
}
254+
ev.Done(nil)
255+
256+
if ev.Kind == resource.Sync {
257+
continue
258+
}
259+
260+
// Handle namespace changes - resync all service exports in this namespace
261+
if ev.Object == nil {
262+
continue
263+
}
264+
265+
nsName := ev.Key.Name
266+
isGlobal := s.namespaceManager.IsGlobalNamespaceByObject(ev.Object)
267+
268+
s.logger.Info("Namespace global status changed, resyncing service exports",
269+
logfields.K8sNamespace, nsName,
270+
"isGlobal", isGlobal,
271+
)
272+
273+
// Get all service exports in this namespace and resync them
274+
svcExports, err := serviceExportStore.ByIndex(cache.NamespaceIndex, nsName)
275+
if err != nil {
276+
s.logger.Warn("Failed to list service exports for namespace update",
277+
logfields.Error, err,
278+
logfields.K8sNamespace, nsName,
279+
)
280+
continue
281+
}
282+
283+
for _, svcExport := range svcExports {
284+
key := resource.Key{Namespace: svcExport.Namespace, Name: svcExport.Name}
285+
if ev.Kind == resource.Delete || !isGlobal {
286+
// Namespace deleted or no longer global - delete all service exports
287+
s.store.DeleteKey(ctx, types.NewEmptyMCSAPIServiceSpec(s.clusterName, key.Namespace, key.Name))
288+
} else {
289+
// Namespace became global - upsert service exports
290+
s.syncMCSAPIServiceSpec(ctx, serviceStore, serviceExportStore, key)
291+
}
292+
}
192293
}
193294
}
194295
}

0 commit comments

Comments
 (0)