Skip to content

Commit 8195d8c

Browse files
axwmergify-bot
authored andcommitted
libbeat/publisher/pipeline: expand monitoring (#24700)
* libbeat/publisher/pipeline: expand monitoring Expand monitoring to report: - pipeline.queue.max_events (queue capacity) - output.batch_size (e.g. ES bulk_max_size) - output.clients (e.g. number of ES hosts * workers) Queue capacity is recorded in stats rather than state so it can be used along with queue size to calculate saturation. * Move changelog entry to the right section (cherry picked from commit c24fb8e)
1 parent 5eb0344 commit 8195d8c

5 files changed

Lines changed: 95 additions & 23 deletions

File tree

CHANGELOG.next.asciidoc

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -358,6 +358,7 @@ https://github.com/elastic/beats/compare/v7.0.0-alpha2...master[Check the HEAD d
358358
- Add option to select the type of index template to load: legacy, component, index. {pull}21212[21212]
359359
- Add `wineventlog` schema to `decode_xml` processor. {issue}23910[23910] {pull}24726[24726]
360360
- Add new ECS 1.9 field `cloud.service.name` to `add_cloud_metadata` processor. {pull}24993[24993]
361+
- Libbeat: report queue capacity, output batch size, and output client count to monitoring. {pull}24700[24700]
361362

362363
*Auditbeat*
363364

libbeat/publisher/pipeline/client_test.go

Lines changed: 54 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -23,10 +23,16 @@ import (
2323
"testing"
2424
"time"
2525

26+
"github.com/stretchr/testify/assert"
27+
"github.com/stretchr/testify/require"
28+
2629
"github.com/elastic/beats/v7/libbeat/beat"
30+
"github.com/elastic/beats/v7/libbeat/common"
2731
"github.com/elastic/beats/v7/libbeat/logp"
32+
"github.com/elastic/beats/v7/libbeat/monitoring"
2833
"github.com/elastic/beats/v7/libbeat/outputs"
2934
"github.com/elastic/beats/v7/libbeat/publisher"
35+
"github.com/elastic/beats/v7/libbeat/publisher/processing"
3036
"github.com/elastic/beats/v7/libbeat/publisher/queue"
3137
"github.com/elastic/beats/v7/libbeat/publisher/queue/memqueue"
3238
"github.com/elastic/beats/v7/libbeat/tests/resources"
@@ -205,3 +211,51 @@ func TestClientWaitClose(t *testing.T) {
205211
}
206212
})
207213
}
214+
215+
func TestMonitoring(t *testing.T) {
216+
const (
217+
maxEvents = 123
218+
batchSize = 456
219+
numClients = 42
220+
)
221+
var config Config
222+
err := common.MustNewConfigFrom(map[string]interface{}{
223+
"queue.mem.events": maxEvents,
224+
"queue.mem.flush.min_events": 1,
225+
}).Unpack(&config)
226+
require.NoError(t, err)
227+
228+
metrics := monitoring.NewRegistry()
229+
telemetry := monitoring.NewRegistry()
230+
pipeline, err := Load(
231+
beat.Info{},
232+
Monitors{
233+
Metrics: metrics,
234+
Telemetry: telemetry,
235+
},
236+
config,
237+
processing.Supporter(nil),
238+
func(outputs.Observer) (string, outputs.Group, error) {
239+
clients := make([]outputs.Client, numClients)
240+
for i := range clients {
241+
clients[i] = newMockClient(func(publisher.Batch) error {
242+
return nil
243+
})
244+
}
245+
return "output_name", outputs.Group{
246+
BatchSize: batchSize,
247+
Clients: clients,
248+
}, nil
249+
},
250+
)
251+
require.NoError(t, err)
252+
defer pipeline.Close()
253+
254+
metricsSnapshot := monitoring.CollectFlatSnapshot(metrics, monitoring.Full, true)
255+
assert.Equal(t, int64(maxEvents), metricsSnapshot.Ints["pipeline.queue.max_events"])
256+
257+
telemetrySnapshot := monitoring.CollectFlatSnapshot(telemetry, monitoring.Full, true)
258+
assert.Equal(t, "output_name", telemetrySnapshot.Strings["output.name"])
259+
assert.Equal(t, int64(batchSize), telemetrySnapshot.Ints["output.batch_size"])
260+
assert.Equal(t, int64(numClients), telemetrySnapshot.Ints["output.clients"])
261+
}

libbeat/publisher/pipeline/module.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -161,6 +161,8 @@ func loadOutput(
161161
telemetry = monitors.Telemetry.NewRegistry("output")
162162
}
163163
monitoring.NewString(telemetry, "name").Set(outName)
164+
monitoring.NewInt(telemetry, "batch_size").Set(int64(out.BatchSize))
165+
monitoring.NewInt(telemetry, "clients").Set(int64(len(out.Clients)))
164166
}
165167

166168
return out, nil

libbeat/publisher/pipeline/monitoring.go

Lines changed: 37 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -43,6 +43,7 @@ type clientObserver interface {
4343

4444
type queueObserver interface {
4545
queueACKed(n int)
46+
queueMaxEvents(n int)
4647
}
4748

4849
type outputObserver interface {
@@ -62,7 +63,10 @@ type outputObserver interface {
6263
// event-handlers only (e.g. the client centric events callbacks)
6364
type metricsObserver struct {
6465
metrics *monitoring.Registry
66+
vars metricsObserverVars
67+
}
6568

69+
type metricsObserverVars struct {
6670
// clients metrics
6771
clients *monitoring.Uint
6872

@@ -72,7 +76,8 @@ type metricsObserver struct {
7276
activeEvents *monitoring.Uint
7377

7478
// queue metrics
75-
ackedQueue *monitoring.Uint
79+
queueACKed *monitoring.Uint
80+
queueMaxEvents *monitoring.Uint
7681
}
7782

7883
func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
@@ -83,18 +88,21 @@ func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
8388

8489
return &metricsObserver{
8590
metrics: metrics,
86-
clients: monitoring.NewUint(reg, "clients"),
91+
vars: metricsObserverVars{
92+
clients: monitoring.NewUint(reg, "clients"),
8793

88-
events: monitoring.NewUint(reg, "events.total"),
89-
filtered: monitoring.NewUint(reg, "events.filtered"),
90-
published: monitoring.NewUint(reg, "events.published"),
91-
failed: monitoring.NewUint(reg, "events.failed"),
92-
dropped: monitoring.NewUint(reg, "events.dropped"),
93-
retry: monitoring.NewUint(reg, "events.retry"),
94+
events: monitoring.NewUint(reg, "events.total"),
95+
filtered: monitoring.NewUint(reg, "events.filtered"),
96+
published: monitoring.NewUint(reg, "events.published"),
97+
failed: monitoring.NewUint(reg, "events.failed"),
98+
dropped: monitoring.NewUint(reg, "events.dropped"),
99+
retry: monitoring.NewUint(reg, "events.retry"),
94100

95-
ackedQueue: monitoring.NewUint(reg, "queue.acked"),
101+
queueACKed: monitoring.NewUint(reg, "queue.acked"),
102+
queueMaxEvents: monitoring.NewUint(reg, "queue.max_events"),
96103

97-
activeEvents: monitoring.NewUint(reg, "events.active"),
104+
activeEvents: monitoring.NewUint(reg, "events.active"),
105+
},
98106
}
99107
}
100108

@@ -109,39 +117,39 @@ func (o *metricsObserver) cleanup() {
109117
//
110118

111119
// (pipeline) pipeline did finish creating a new client instance
112-
func (o *metricsObserver) clientConnected() { o.clients.Inc() }
120+
func (o *metricsObserver) clientConnected() { o.vars.clients.Inc() }
113121

114122
// (client) close being called on client
115123
func (o *metricsObserver) clientClosing() {}
116124

117125
// (client) client finished processing close
118-
func (o *metricsObserver) clientClosed() { o.clients.Dec() }
126+
func (o *metricsObserver) clientClosed() { o.vars.clients.Dec() }
119127

120128
//
121129
// client publish events
122130
//
123131

124132
// (client) client is trying to publish a new event
125133
func (o *metricsObserver) newEvent() {
126-
o.events.Inc()
127-
o.activeEvents.Inc()
134+
o.vars.events.Inc()
135+
o.vars.activeEvents.Inc()
128136
}
129137

130138
// (client) event is filtered out (on purpose or failed)
131139
func (o *metricsObserver) filteredEvent() {
132-
o.filtered.Inc()
133-
o.activeEvents.Dec()
140+
o.vars.filtered.Inc()
141+
o.vars.activeEvents.Dec()
134142
}
135143

136144
// (client) managed to push an event into the publisher pipeline
137145
func (o *metricsObserver) publishedEvent() {
138-
o.published.Inc()
146+
o.vars.published.Inc()
139147
}
140148

141149
// (client) client closing down or DropIfFull is set
142150
func (o *metricsObserver) failedPublishEvent() {
143-
o.failed.Inc()
144-
o.activeEvents.Dec()
151+
o.vars.failed.Inc()
152+
o.vars.activeEvents.Dec()
145153
}
146154

147155
//
@@ -150,8 +158,13 @@ func (o *metricsObserver) failedPublishEvent() {
150158

151159
// (queue) number of events ACKed by the queue/broker in use
152160
func (o *metricsObserver) queueACKed(n int) {
153-
o.ackedQueue.Add(uint64(n))
154-
o.activeEvents.Sub(uint64(n))
161+
o.vars.queueACKed.Add(uint64(n))
162+
o.vars.activeEvents.Sub(uint64(n))
163+
}
164+
165+
// (queue) maximum queue event capacity
166+
func (o *metricsObserver) queueMaxEvents(n int) {
167+
o.vars.queueMaxEvents.Set(uint64(n))
155168
}
156169

157170
//
@@ -166,12 +179,12 @@ func (o *metricsObserver) eventsFailed(int) {}
166179

167180
// (retryer) number of events dropped by retryer
168181
func (o *metricsObserver) eventsDropped(n int) {
169-
o.dropped.Add(uint64(n))
182+
o.vars.dropped.Add(uint64(n))
170183
}
171184

172185
// (retryer) number of events pushed to the output worker queue
173186
func (o *metricsObserver) eventsRetry(n int) {
174-
o.retry.Add(uint64(n))
187+
o.vars.retry.Add(uint64(n))
175188
}
176189

177190
// (output) number of events to be forwarded to the output client
@@ -193,6 +206,7 @@ func (*emptyObserver) filteredEvent() {}
193206
func (*emptyObserver) publishedEvent() {}
194207
func (*emptyObserver) failedPublishEvent() {}
195208
func (*emptyObserver) queueACKed(n int) {}
209+
func (*emptyObserver) queueMaxEvents(int) {}
196210
func (*emptyObserver) updateOutputGroup() {}
197211
func (*emptyObserver) eventsFailed(int) {}
198212
func (*emptyObserver) eventsDropped(int) {}

libbeat/publisher/pipeline/pipeline.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -184,6 +184,7 @@ func New(
184184
// Only active if pipeline can drop events.
185185
maxEvents = 64000
186186
}
187+
p.observer.queueMaxEvents(maxEvents)
187188
p.eventSema = newSema(maxEvents)
188189

189190
p.output = newOutputController(beat, monitors, p.observer, p.queue)

0 commit comments

Comments
 (0)