Skip to content

Commit 903f7ec

Browse files
authored
fix: Clear EndpointStatuses for PodMonitoring without endpoints (#1197)
Clear EndPointStatuses when PodMonitoring does not have an endpoint. This fixes a bug where PodMonitorings without any endpoints have a stale list of endpoints in their status.
2 parents 7081496 + a342d9b commit 903f7ec

File tree

2 files changed

+172
-57
lines changed

2 files changed

+172
-57
lines changed

pkg/operator/target_status.go

Lines changed: 45 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -128,7 +128,27 @@ func setupTargetStatusPoller(op *Operator, registry prometheus.Registerer, httpC
128128
return nil
129129
}
130130

131-
// shouldPoll verifies if polling collectors is configured or necessary.
131+
// fetchAllPodMonitorings fetches all ClusterPodMonitoring and PodMonitoring CRs deployed in the cluster. This excludes ClusterNodeMonitoring CRs.
132+
func fetchAllPodMonitorings(ctx context.Context, kubeClient client.Client) ([]monitoringv1.PodMonitoringCRD, error) {
133+
var combinedList []monitoringv1.PodMonitoringCRD
134+
var podMonitoringList monitoringv1.PodMonitoringList
135+
if err := kubeClient.List(ctx, &podMonitoringList); err != nil {
136+
return nil, err
137+
}
138+
for _, pm := range podMonitoringList.Items {
139+
combinedList = append(combinedList, &pm)
140+
}
141+
var clusterPodMonitoringList monitoringv1.ClusterPodMonitoringList
142+
if err := kubeClient.List(ctx, &clusterPodMonitoringList); err != nil {
143+
return nil, err
144+
}
145+
for _, pm := range clusterPodMonitoringList.Items {
146+
combinedList = append(combinedList, &pm)
147+
}
148+
return combinedList, nil
149+
}
150+
151+
// shouldPoll verifies if polling collectors is configured.
132152
func shouldPoll(ctx context.Context, cfgNamespacedName types.NamespacedName, kubeClient client.Client) (bool, error) {
133153
// Check if target status is enabled.
134154
var config monitoringv1.OperatorConfig
@@ -141,19 +161,6 @@ func shouldPoll(ctx context.Context, cfgNamespacedName types.NamespacedName, kub
141161
if !config.Features.TargetStatus.Enabled {
142162
return false, nil
143163
}
144-
145-
// No need to poll if there's no PodMonitorings.
146-
var podMonitoringList monitoringv1.PodMonitoringList
147-
if err := kubeClient.List(ctx, &podMonitoringList); err != nil {
148-
return false, err
149-
} else if len(podMonitoringList.Items) == 0 {
150-
var clusterPodMonitoringList monitoringv1.ClusterPodMonitoringList
151-
if err := kubeClient.List(ctx, &clusterPodMonitoringList); err != nil {
152-
return false, err
153-
} else if len(clusterPodMonitoringList.Items) == 0 {
154-
return false, nil
155-
}
156-
}
157164
return true, nil
158165
}
159166

@@ -196,12 +203,20 @@ func (r *targetStatusReconciler) Reconcile(ctx context.Context, _ reconcile.Requ
196203

197204
// pollAndUpdate fetches and updates the target status in each collector pod.
198205
func pollAndUpdate(ctx context.Context, logger logr.Logger, opts Options, httpClient *http.Client, getTarget getTargetFn, kubeClient client.Client) error {
206+
allPodMonitorings, err := fetchAllPodMonitorings(ctx, kubeClient)
207+
if err != nil {
208+
return err
209+
}
210+
if len(allPodMonitorings) == 0 {
211+
// Nothing to update.
212+
return nil
213+
}
199214
targets, err := fetchTargets(ctx, logger, opts, httpClient, getTarget, kubeClient)
200215
if err != nil {
201216
return err
202217
}
203218

204-
return updateTargetStatus(ctx, logger, kubeClient, targets)
219+
return updateTargetStatus(ctx, logger, kubeClient, targets, allPodMonitorings)
205220
}
206221

207222
// fetchTargets retrieves the Prometheus targets using the given target function
@@ -307,13 +322,14 @@ func patchPodMonitoringStatus(ctx context.Context, kubeClient client.Client, obj
307322

308323
// updateTargetStatus populates the status object of each pod using the given
309324
// Prometheus targets.
310-
func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient client.Client, targets []*prometheusv1.TargetsResult) error {
325+
func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient client.Client, targets []*prometheusv1.TargetsResult, podMonitorings []monitoringv1.PodMonitoringCRD) error {
311326
endpointMap, err := buildEndpointStatuses(targets)
312327
if err != nil {
313328
return err
314329
}
315330

316331
var errs []error
332+
withStatuses := map[string]bool{}
317333
for job, endpointStatuses := range endpointMap {
318334
pm, err := getObjectByScrapeJobKey(job)
319335
if err != nil {
@@ -324,6 +340,7 @@ func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient clie
324340
// Skip hard-coded jobs which we do not patch.
325341
continue
326342
}
343+
withStatuses[pm.GetName()] = true
327344
pm.GetPodMonitoringStatus().EndpointStatuses = endpointStatuses
328345

329346
if err := patchPodMonitoringStatus(ctx, kubeClient, pm, pm.GetPodMonitoringStatus()); err != nil {
@@ -335,6 +352,18 @@ func updateTargetStatus(ctx context.Context, logger logr.Logger, kubeClient clie
335352
}
336353
}
337354

355+
// Any pod monitorings that exist but don't have endpoints should also be updated.
356+
for _, pm := range podMonitorings {
357+
if _, exists := withStatuses[pm.GetName()]; !exists {
358+
pm.GetPodMonitoringStatus().EndpointStatuses = []monitoringv1.ScrapeEndpointStatus{}
359+
if err := patchPodMonitoringStatus(ctx, kubeClient, pm, pm.GetPodMonitoringStatus()); err != nil {
360+
// Same reasoning as above for error handling.
361+
errs = append(errs, err)
362+
logger.Error(err, "patching empty status", "pm", pm.GetName(), "gvk", pm.GetObjectKind().GroupVersionKind())
363+
}
364+
}
365+
}
366+
338367
return errors.Join(errs...)
339368
}
340369

pkg/operator/target_status_test.go

Lines changed: 127 additions & 41 deletions
Original file line numberDiff line numberDiff line change
@@ -45,11 +45,26 @@ import (
4545
)
4646

4747
type updateTargetStatusTestCase struct {
48-
desc string
49-
targets []*prometheusv1.TargetsResult
50-
podMonitorings []monitoringv1.PodMonitoring
51-
clusterPodMonitorings []monitoringv1.ClusterPodMonitoring
52-
expErr func(err error) bool
48+
desc string
49+
targets []*prometheusv1.TargetsResult
50+
podMonitorings []monitoringv1.PodMonitoring
51+
initializeStatus []monitoringv1.PodMonitoringStatus
52+
clusterPodMonitorings []monitoringv1.ClusterPodMonitoring
53+
initializeClusterStatus []monitoringv1.PodMonitoringStatus
54+
expErr func(err error) bool
55+
}
56+
57+
func (tc *updateTargetStatusTestCase) getPodMonitoringCRDs() []monitoringv1.PodMonitoringCRD {
58+
var combinedList []monitoringv1.PodMonitoringCRD
59+
60+
for _, pm := range tc.podMonitorings {
61+
combinedList = append(combinedList, &pm)
62+
}
63+
64+
for _, pm := range tc.clusterPodMonitorings {
65+
combinedList = append(combinedList, &pm)
66+
}
67+
return combinedList
5368
}
5469

5570
// Given a list of test cases on PodMonitoring, creates a new list containing
@@ -84,6 +99,7 @@ func expand(testCases []updateTargetStatusTestCase) []updateTargetStatusTestCase
8499
}
85100
clusterTargets = append(clusterTargets, targetClusterPodMonitoring)
86101
}
102+
87103
for _, pm := range tc.podMonitorings {
88104
pmCopy := pm.DeepCopy()
89105
cpm := monitoringv1.ClusterPodMonitoring{
@@ -103,26 +119,41 @@ func expand(testCases []updateTargetStatusTestCase) []updateTargetStatusTestCase
103119
}
104120
clusterPodMonitorings = append(clusterPodMonitorings, cpm)
105121
}
122+
123+
initializeClusterStatus := make([]monitoringv1.PodMonitoringStatus, 0, len(tc.initializeStatus))
124+
for _, status := range tc.initializeStatus {
125+
statusCopy := status.DeepCopy()
126+
127+
for idx, status := range statusCopy.EndpointStatuses {
128+
statusCopy.EndpointStatuses[idx].Name = podMonitoringScrapePoolToClusterPodMonitoringScrapePool(status.Name)
129+
}
130+
initializeClusterStatus = append(initializeClusterStatus, *statusCopy)
131+
}
132+
106133
dataPodMonitorings := updateTargetStatusTestCase{
107-
desc: tc.desc + "-pod-monitoring",
108-
targets: tc.targets,
109-
podMonitorings: tc.podMonitorings,
110-
expErr: tc.expErr,
134+
desc: tc.desc + "-pod-monitoring",
135+
targets: tc.targets,
136+
podMonitorings: tc.podMonitorings,
137+
initializeStatus: tc.initializeStatus,
138+
expErr: tc.expErr,
111139
}
112140
dataFinal = append(dataFinal, dataPodMonitorings)
113141
dataClusterPodMonitorings := updateTargetStatusTestCase{
114-
desc: tc.desc + "-cluster-pod-monitoring",
115-
targets: clusterTargets,
116-
clusterPodMonitorings: clusterPodMonitorings,
117-
expErr: tc.expErr,
142+
desc: tc.desc + "-cluster-pod-monitoring",
143+
targets: clusterTargets,
144+
clusterPodMonitorings: clusterPodMonitorings,
145+
initializeClusterStatus: initializeClusterStatus,
146+
expErr: tc.expErr,
118147
}
119148
prometheusTargetsBoth := append(tc.targets, clusterTargets...)
120149
dataBoth := updateTargetStatusTestCase{
121-
desc: tc.desc + "-both",
122-
targets: prometheusTargetsBoth,
123-
podMonitorings: tc.podMonitorings,
124-
clusterPodMonitorings: clusterPodMonitorings,
125-
expErr: tc.expErr,
150+
desc: tc.desc + "-both",
151+
targets: prometheusTargetsBoth,
152+
podMonitorings: tc.podMonitorings,
153+
initializeStatus: tc.initializeStatus,
154+
clusterPodMonitorings: clusterPodMonitorings,
155+
initializeClusterStatus: initializeClusterStatus,
156+
expErr: tc.expErr,
126157
}
127158
dataFinal = append(dataFinal, dataClusterPodMonitorings)
128159
dataFinal = append(dataFinal, dataBoth)
@@ -1225,27 +1256,100 @@ func TestUpdateTargetStatus(t *testing.T) {
12251256
return err.Error() == "unknown scrape kind \"unknown\""
12261257
},
12271258
},
1259+
// No targets, with PodMonitoring config.
1260+
{
1261+
desc: "no-targets-no-match",
1262+
podMonitorings: []monitoringv1.PodMonitoring{
1263+
{
1264+
ObjectMeta: metav1.ObjectMeta{Name: "prom-example-1", Namespace: "gmp-test"},
1265+
Spec: monitoringv1.PodMonitoringSpec{},
1266+
1267+
Status: monitoringv1.PodMonitoringStatus{
1268+
MonitoringStatus: monitoringv1.MonitoringStatus{
1269+
ObservedGeneration: 2,
1270+
Conditions: []monitoringv1.MonitoringCondition{{
1271+
Type: monitoringv1.ConfigurationCreateSuccess,
1272+
Status: corev1.ConditionTrue,
1273+
LastUpdateTime: metav1.Time{},
1274+
LastTransitionTime: metav1.Time{},
1275+
Reason: "",
1276+
Message: "",
1277+
}},
1278+
},
1279+
},
1280+
},
1281+
},
1282+
initializeStatus: []monitoringv1.PodMonitoringStatus{
1283+
{
1284+
MonitoringStatus: monitoringv1.MonitoringStatus{
1285+
ObservedGeneration: 2,
1286+
Conditions: []monitoringv1.MonitoringCondition{{
1287+
Type: monitoringv1.ConfigurationCreateSuccess,
1288+
Status: corev1.ConditionTrue,
1289+
LastUpdateTime: metav1.Time{},
1290+
LastTransitionTime: metav1.Time{},
1291+
Reason: "",
1292+
Message: "",
1293+
}},
1294+
},
1295+
EndpointStatuses: []monitoringv1.ScrapeEndpointStatus{
1296+
{
1297+
Name: "PodMonitoring/gmp-test/prom-example-1/metrics",
1298+
ActiveTargets: 1,
1299+
UnhealthyTargets: 0,
1300+
LastUpdateTime: date,
1301+
SampleGroups: []monitoringv1.SampleGroup{
1302+
{
1303+
SampleTargets: []monitoringv1.SampleTarget{
1304+
{
1305+
Health: "up",
1306+
Labels: map[model.LabelName]model.LabelValue{
1307+
"instance": "a",
1308+
},
1309+
LastScrapeDurationSeconds: "1.2",
1310+
},
1311+
},
1312+
Count: ptr.To(int32(1)),
1313+
},
1314+
},
1315+
CollectorsFraction: "1",
1316+
},
1317+
},
1318+
},
1319+
},
1320+
},
12281321
})
12291322

12301323
for _, testCase := range testCases {
12311324
t.Run(fmt.Sprintf("target-status-conversion-%s", testCase.desc), func(t *testing.T) {
12321325
clientBuilder := newFakeClientBuilder()
1233-
for _, podMonitoring := range testCase.podMonitorings {
1326+
for i, podMonitoring := range testCase.podMonitorings {
12341327
pmCopy := podMonitoring.DeepCopy()
1235-
pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil
1328+
if len(testCase.initializeStatus) > 0 {
1329+
pmCopy.Status = testCase.initializeStatus[i]
1330+
} else {
1331+
pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil
1332+
}
12361333
clientBuilder.WithObjects(pmCopy)
12371334
}
1238-
for _, clusterPodMonitoring := range testCase.clusterPodMonitorings {
1335+
for i, clusterPodMonitoring := range testCase.clusterPodMonitorings {
12391336
pmCopy := clusterPodMonitoring.DeepCopy()
1240-
pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil
1337+
if len(testCase.initializeClusterStatus) > 0 {
1338+
pmCopy.Status = testCase.initializeClusterStatus[i]
1339+
} else {
1340+
pmCopy.GetPodMonitoringStatus().EndpointStatuses = nil
1341+
}
12411342
clientBuilder.WithObjects(pmCopy)
12421343
}
12431344

12441345
kubeClient := clientBuilder.Build()
12451346

1246-
err := updateTargetStatus(context.Background(), testr.New(t), kubeClient, testCase.targets)
1347+
// fetchTargets(ctx, logger, opts, nil, targetFetchFromMap(prometheusTargetMap), kubeClient)
1348+
err := updateTargetStatus(context.Background(), testr.New(t), kubeClient, testCase.targets, testCase.getPodMonitoringCRDs())
12471349
if err != nil && (testCase.expErr == nil || !testCase.expErr(err)) {
12481350
t.Fatalf("unexpected error updating target status: %s", err)
1351+
} else if err == nil && (testCase.expErr != nil) {
1352+
t.Fatalf("expected error missing when updating target status")
12491353
}
12501354

12511355
for _, podMonitoring := range testCase.podMonitorings {
@@ -1602,24 +1706,6 @@ func TestShouldPoll(t *testing.T) {
16021706
should: false,
16031707
expErr: true,
16041708
},
1605-
{
1606-
desc: "should not poll targets - no podmonitorings",
1607-
objs: []client.Object{
1608-
&monitoringv1.OperatorConfig{
1609-
ObjectMeta: metav1.ObjectMeta{
1610-
Name: "config",
1611-
Namespace: "gmp-public",
1612-
},
1613-
Features: monitoringv1.OperatorFeatures{
1614-
TargetStatus: monitoringv1.TargetStatusSpec{
1615-
Enabled: true,
1616-
},
1617-
},
1618-
},
1619-
},
1620-
should: false,
1621-
expErr: false,
1622-
},
16231709
{
16241710
desc: "should not poll targets - disabled",
16251711
objs: []client.Object{

0 commit comments

Comments
 (0)