Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@ import (
"istio.io/istio/pilot/pkg/features"
"istio.io/istio/pilot/pkg/model"
"istio.io/istio/pilot/pkg/serviceregistry/kube/controller/ambient/multicluster"
"istio.io/istio/pilot/pkg/serviceregistry/util/xdsfake"
"istio.io/istio/pkg/cluster"
"istio.io/istio/pkg/config/constants"
"istio.io/istio/pkg/kube/kclient/clienttest"
Expand Down Expand Up @@ -68,55 +67,58 @@ func TestAmbientMulticlusterIndex_WaypointForWorkloadTraffic(t *testing.T) {
cases := []struct {
name string
trafficType string
podAssertion func(s *ambientTestServer)
svcAssertion func(s *ambientTestServer)
podAssertion func(s *ambientTestServer, c cluster.ID)
svcAssertion func(s *ambientTestServer, svcKey string)
}{
{
name: "service traffic",
trafficType: constants.ServiceTraffic,
podAssertion: func(s *ambientTestServer) {
podAssertion: func(s *ambientTestServer, c cluster.ID) {
s.t.Helper()
s.assertNoEvent(s.t)
// Test that there's no events for workloads in our cluster
// as a result of our actions (we can't control pushes for
// workloads from other clusters)
s.assertNoMatchingEvent(s.t, c.String())
},
svcAssertion: func(s *ambientTestServer) {
svcAssertion: func(s *ambientTestServer, _ string) {
s.t.Helper()
s.assertEvent(s.t, s.svcXdsName("svc2"))
},
},
{
name: "all traffic",
trafficType: constants.AllTraffic,
podAssertion: func(s *ambientTestServer) {
podAssertion: func(s *ambientTestServer, _ cluster.ID) {
s.t.Helper()
s.assertEvent(s.t, s.podXdsName("pod1"))
},
svcAssertion: func(s *ambientTestServer) {
svcAssertion: func(s *ambientTestServer, _ string) {
s.t.Helper()
s.assertEvent(s.t, s.svcXdsName("svc2"))
},
},
{
name: "workload traffic",
trafficType: constants.WorkloadTraffic,
podAssertion: func(s *ambientTestServer) {
podAssertion: func(s *ambientTestServer, c cluster.ID) {
s.t.Helper()
s.assertEvent(s.t, s.podXdsName("pod1"))
},
svcAssertion: func(s *ambientTestServer) {
svcAssertion: func(s *ambientTestServer, svcKey string) {
s.t.Helper()
s.assertNoEvent(s.t)
s.assertNoMatchingEvent(s.t, svcKey)
},
},
{
name: "no traffic",
trafficType: constants.NoTraffic,
podAssertion: func(s *ambientTestServer) {
podAssertion: func(s *ambientTestServer, c cluster.ID) {
s.t.Helper()
s.assertNoEvent(s.t)
s.assertNoMatchingEvent(s.t, c.String())
},
svcAssertion: func(s *ambientTestServer) {
svcAssertion: func(s *ambientTestServer, svcKey string) {
s.t.Helper()
s.assertNoEvent(s.t)
s.assertNoMatchingEvent(s.t, svcKey)
},
},
}
Expand Down Expand Up @@ -215,6 +217,23 @@ func TestAmbientMulticlusterIndex_WaypointForWorkloadTraffic(t *testing.T) {
ControllerName: constants.ManagedGatewayMeshController,
},
})

// Ensure the namespace network is set in up in the collection before doing other assertions
assert.EventuallyEqual(t, func() bool {
networks := s.networks.SystemNamespaceNetworkByCluster.Lookup(client.clusterID)
if len(networks) == 0 {
return false
}
singleton := networks[0]
if singleton == nil {
return false
}
nw := singleton.Get()
if nw == nil {
return false
}
return *nw == clusterToNetwork[client.clusterID]
}, true)
}
if networkGatewayIps[client.clusterID] != "" {
s.addNetworkGatewayForClient(t, networkGatewayIps[client.clusterID], clusterToNetwork[client.clusterID], client.grc)
Expand Down Expand Up @@ -268,40 +287,18 @@ func TestAmbientMulticlusterIndex_WaypointForWorkloadTraffic(t *testing.T) {
events = append(events, s.svcXdsName("svc2"))
s.assertEvent(t, events...)

t.Run("xds event filtering", func(t *testing.T) {
// Test that label selector change doesn't cause xDS push
// especially in the context of the merge implementation.
svc2 := s.sc.Get("svc2", testNS)
tmp := svc2.DeepCopy()
tmp.Spec.Selector["foo"] = "bar"
s.sc.Update(tmp)
// The new selector should disqualify pod1 from being a part
// of this service. We should NOT get a service event though
s.fx.StrictMatchOrFail(t, xdsfake.Event{
Type: "xds",
ID: s.podXdsName("pod1"),
})
s.sc.Update(svc2)
// We should get another event from the pod being a part of the
// service again. Again, we should NOT get a service event.
s.fx.StrictMatchOrFail(t, xdsfake.Event{
Type: "xds",
ID: s.podXdsName("pod1"),
})
})

// Label the pod and check that the correct event is produced.
s.labelPod(t, "pod1", testNS,
map[string]string{"app": "a", label.IoIstioUseWaypoint.Name: "test-wp"})
c.podAssertion(s)
c.podAssertion(s, testC)

// Label the service and check that the correct event is produced.
s.labelService(t, "svc2", testNS,
map[string]string{
label.IoIstioUseWaypoint.Name: "test-wp",
"istio.io/global": "true",
})
c.svcAssertion(s)
c.svcAssertion(s, s.svcXdsName("svc2"))

// clean up resources
s.deleteService(t, "svc2")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2658,6 +2658,11 @@ func (s *ambientTestServer) assertUnorderedEvent(t *testing.T, ip ...string) {
s.fx.MatchOrFail(t, ev...)
}

func (s *ambientTestServer) assertNoMatchingEvent(t *testing.T, ip string) {
t.Helper()
s.fx.AssertNoMatch(t, time.Millisecond*10, xdsfake.EventMatcher{Type: "xds", IDPrefix: ip})
}

func (s *ambientTestServer) assertNoEvent(t *testing.T) {
t.Helper()
s.fx.AssertEmpty(t, time.Millisecond*10)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ func MergedGlobalWorkloadsCollection(
return c.ID
},
func(hc krt.HandlerContext) network.ID {
nwPtr := krt.FetchOne(ctx, globalNetworks.RemoteSystemNamespaceNetworks, krt.FilterIndex(globalNetworks.SystemNamespaceNetworkByCluster, c.ID))
nwPtr := krt.FetchOne(hc, globalNetworks.RemoteSystemNamespaceNetworks, krt.FilterIndex(globalNetworks.SystemNamespaceNetworkByCluster, c.ID))
if nwPtr == nil {
log.Warnf("Cluster %s does not have a network, skipping global workloads", c.ID)
hc.DiscardResult()
Expand Down
41 changes: 41 additions & 0 deletions pilot/pkg/serviceregistry/util/xdsfake/updater.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,6 +118,16 @@ type Event struct {
EndpointCount int
}

type EventMatcher struct {
// Type must match exactly
Type string
// A prefix to match the id of incoming events.
IDPrefix string

// A prefix to match the namespace of incoming events.
NamespacePrefix string
}

func (fx *Updater) EDSUpdate(c model.ShardKey, hostname string, ns string, entry []*model.IstioEndpoint) {
select {
case fx.Events <- Event{Type: "eds", ID: hostname, Endpoints: entry, Namespace: ns}:
Expand Down Expand Up @@ -259,3 +269,34 @@ func (fx *Updater) AssertEmpty(t test.Failer, dur time.Duration) {
}
}
}

func (fx *Updater) AssertNoMatch(t test.Failer, dur time.Duration, matchers ...EventMatcher) {
t.Helper()
if dur == 0 {
select {
case e := <-fx.Events:
t.Logf("got event %q/%v", e.Type, e.ID)
for _, m := range matchers {
if e.Type == m.Type &&
(m.IDPrefix != "" && strings.HasPrefix(e.ID, m.IDPrefix)) ||
(m.NamespacePrefix != "" && strings.HasPrefix(e.Namespace, m.NamespacePrefix)) {
t.Fatalf("got unexpected matching event %+v", e)
}
}
default:
}
} else {
select {
case e := <-fx.Events:
t.Logf("got event %q/%v", e.Type, e.ID)
for _, m := range matchers {
if e.Type == m.Type &&
(m.IDPrefix != "" && strings.HasPrefix(e.ID, m.IDPrefix)) ||
(m.NamespacePrefix != "" && strings.HasPrefix(e.Namespace, m.NamespacePrefix)) {
t.Fatalf("got unexpected matching event before timeout %+v", e)
}
}
case <-time.After(dur):
}
}
}
5 changes: 4 additions & 1 deletion pkg/kube/krt/collection.go
Original file line number Diff line number Diff line change
Expand Up @@ -795,8 +795,11 @@ func (i *collectionDependencyTracker[I, O]) registerDependency(
) {
i.d = append(i.d, d)

i.mu.Lock()
existed := i.dependencyState.collectionDependencies.InsertContains(d.id)
i.mu.Unlock()
// For any new collections we depend on, start watching them if its the first time we have watched them.
if !i.dependencyState.collectionDependencies.InsertContains(d.id) {
if !existed {
i.log.WithLabels("collection", d.collectionName).Debugf("register new dependency")
syncer.WaitUntilSynced(i.stop)
register(func(o []Event[any]) {
Expand Down