@@ -43,6 +43,7 @@ type clientObserver interface {
4343
4444type queueObserver interface {
4545 queueACKed (n int )
46+ queueMaxEvents (n int )
4647}
4748
4849type outputObserver interface {
@@ -62,7 +63,10 @@ type outputObserver interface {
6263// event-handlers only (e.g. the client centric events callbacks)
6364type metricsObserver struct {
6465 metrics * monitoring.Registry
66+ vars metricsObserverVars
67+ }
6568
69+ type metricsObserverVars struct {
6670 // clients metrics
6771 clients * monitoring.Uint
6872
@@ -72,7 +76,8 @@ type metricsObserver struct {
7276 activeEvents * monitoring.Uint
7377
7478 // queue metrics
75- ackedQueue * monitoring.Uint
79+ queueACKed * monitoring.Uint
80+ queueMaxEvents * monitoring.Uint
7681}
7782
7883func newMetricsObserver (metrics * monitoring.Registry ) * metricsObserver {
@@ -83,18 +88,21 @@ func newMetricsObserver(metrics *monitoring.Registry) *metricsObserver {
8388
8489 return & metricsObserver {
8590 metrics : metrics ,
86- clients : monitoring .NewUint (reg , "clients" ),
91+ vars : metricsObserverVars {
92+ clients : monitoring .NewUint (reg , "clients" ),
8793
88- events : monitoring .NewUint (reg , "events.total" ),
89- filtered : monitoring .NewUint (reg , "events.filtered" ),
90- published : monitoring .NewUint (reg , "events.published" ),
91- failed : monitoring .NewUint (reg , "events.failed" ),
92- dropped : monitoring .NewUint (reg , "events.dropped" ),
93- retry : monitoring .NewUint (reg , "events.retry" ),
94+ events : monitoring .NewUint (reg , "events.total" ),
95+ filtered : monitoring .NewUint (reg , "events.filtered" ),
96+ published : monitoring .NewUint (reg , "events.published" ),
97+ failed : monitoring .NewUint (reg , "events.failed" ),
98+ dropped : monitoring .NewUint (reg , "events.dropped" ),
99+ retry : monitoring .NewUint (reg , "events.retry" ),
94100
95- ackedQueue : monitoring .NewUint (reg , "queue.acked" ),
101+ queueACKed : monitoring .NewUint (reg , "queue.acked" ),
102+ queueMaxEvents : monitoring .NewUint (reg , "queue.max_events" ),
96103
97- activeEvents : monitoring .NewUint (reg , "events.active" ),
104+ activeEvents : monitoring .NewUint (reg , "events.active" ),
105+ },
98106 }
99107}
100108
@@ -109,39 +117,39 @@ func (o *metricsObserver) cleanup() {
109117//
110118
111119// (pipeline) pipeline did finish creating a new client instance
112- func (o * metricsObserver ) clientConnected () { o .clients .Inc () }
120+ func (o * metricsObserver ) clientConnected () { o .vars . clients .Inc () }
113121
114122// (client) close being called on client
115123func (o * metricsObserver ) clientClosing () {}
116124
117125// (client) client finished processing close
118- func (o * metricsObserver ) clientClosed () { o .clients .Dec () }
126+ func (o * metricsObserver ) clientClosed () { o .vars . clients .Dec () }
119127
120128//
121129// client publish events
122130//
123131
124132// (client) client is trying to publish a new event
125133func (o * metricsObserver ) newEvent () {
126- o .events .Inc ()
127- o .activeEvents .Inc ()
134+ o .vars . events .Inc ()
135+ o .vars . activeEvents .Inc ()
128136}
129137
130138// (client) event is filtered out (on purpose or failed)
131139func (o * metricsObserver ) filteredEvent () {
132- o .filtered .Inc ()
133- o .activeEvents .Dec ()
140+ o .vars . filtered .Inc ()
141+ o .vars . activeEvents .Dec ()
134142}
135143
136144// (client) managed to push an event into the publisher pipeline
137145func (o * metricsObserver ) publishedEvent () {
138- o .published .Inc ()
146+ o .vars . published .Inc ()
139147}
140148
141149// (client) client closing down or DropIfFull is set
142150func (o * metricsObserver ) failedPublishEvent () {
143- o .failed .Inc ()
144- o .activeEvents .Dec ()
151+ o .vars . failed .Inc ()
152+ o .vars . activeEvents .Dec ()
145153}
146154
147155//
@@ -150,8 +158,13 @@ func (o *metricsObserver) failedPublishEvent() {
150158
151159// (queue) number of events ACKed by the queue/broker in use
152160func (o * metricsObserver ) queueACKed (n int ) {
153- o .ackedQueue .Add (uint64 (n ))
154- o .activeEvents .Sub (uint64 (n ))
161+ o .vars .queueACKed .Add (uint64 (n ))
162+ o .vars .activeEvents .Sub (uint64 (n ))
163+ }
164+
165+ // (queue) maximum queue event capacity
166+ func (o * metricsObserver ) queueMaxEvents (n int ) {
167+ o .vars .queueMaxEvents .Set (uint64 (n ))
155168}
156169
157170//
@@ -166,12 +179,12 @@ func (o *metricsObserver) eventsFailed(int) {}
166179
167180// (retryer) number of events dropped by retryer
168181func (o * metricsObserver ) eventsDropped (n int ) {
169- o .dropped .Add (uint64 (n ))
182+ o .vars . dropped .Add (uint64 (n ))
170183}
171184
172185// (retryer) number of events pushed to the output worker queue
173186func (o * metricsObserver ) eventsRetry (n int ) {
174- o .retry .Add (uint64 (n ))
187+ o .vars . retry .Add (uint64 (n ))
175188}
176189
177190// (output) number of events to be forwarded to the output client
@@ -193,6 +206,7 @@ func (*emptyObserver) filteredEvent() {}
193206func (* emptyObserver ) publishedEvent () {}
194207func (* emptyObserver ) failedPublishEvent () {}
195208func (* emptyObserver ) queueACKed (n int ) {}
209+ func (* emptyObserver ) queueMaxEvents (int ) {}
196210func (* emptyObserver ) updateOutputGroup () {}
197211func (* emptyObserver ) eventsFailed (int ) {}
198212func (* emptyObserver ) eventsDropped (int ) {}
0 commit comments