Skip to content

Commit fdc363f

Browse files
committed
Print pub_min, pub_max, e2e_min, e2e_max
1 parent e37ebc8 commit fdc363f

11 files changed

Lines changed: 104 additions & 17 deletions

File tree

pkg/amqp091/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -182,7 +182,7 @@ func (c *Amqp091Consumer) Start(consumerReady chan bool) {
182182
payload := msg.Body
183183
priority := int(msg.Priority)
184184
timeSent, latency := utils.CalculateEndToEndLatency(&payload)
185-
metrics.EndToEndLatency.UpdateDuration(timeSent)
185+
metrics.RecordEndToEndLatency(latency)
186186

187187
if c.Config.LogOutOfOrder && timeSent.Before(previousMessageTimeSent) {
188188
metrics.MessagesConsumedOutOfOrderMetric(priority).Inc()

pkg/amqp091/publisher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -181,7 +181,7 @@ func (p *Amqp091Publisher) handleConfirms() {
181181
if confirm.Ack {
182182
pubTime := p.getPublishTime(confirm.DeliveryTag)
183183
latency := time.Since(pubTime)
184-
metrics.PublishingLatency.Update(latency.Seconds())
184+
metrics.RecordPublishingLatency(latency)
185185
metrics.MessagesConfirmed.Inc()
186186
log.Debug("message confirmed", "id", p.Id, "delivery_tag", confirm.DeliveryTag, "latency", latency)
187187
} else {

pkg/amqp10/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -178,7 +178,7 @@ func (c *Amqp10Consumer) Start(consumerReady chan bool) {
178178
payload := msg.GetData()
179179
priority := int(msg.Header.Priority)
180180
timeSent, latency := utils.CalculateEndToEndLatency(&payload)
181-
metrics.EndToEndLatency.UpdateDuration(timeSent)
181+
metrics.RecordEndToEndLatency(latency)
182182

183183
// Check for delayed message accuracy
184184
if msg.Annotations != nil {

pkg/amqp10/publisher.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -232,7 +232,7 @@ func (p *Amqp10Publisher) publishSettled() string {
232232
}
233233
metrics.MessagesPublished.Inc()
234234
metrics.MessagesConfirmed.Inc()
235-
metrics.PublishingLatency.Update(latency.Seconds())
235+
metrics.RecordPublishingLatency(latency)
236236
}
237237
}
238238
}
@@ -390,7 +390,7 @@ func (p *Amqp10Publisher) handleSettlement(s amqp.Settlement, ptMu *sync.Mutex,
390390
case *amqp.StateAccepted:
391391
metrics.MessagesPublished.Inc()
392392
metrics.MessagesConfirmed.Inc()
393-
metrics.PublishingLatency.Update(latency.Seconds())
393+
metrics.RecordPublishingLatency(latency)
394394
case *amqp.StateModified:
395395
log.Debug("server requires modifications to accept this message", "state", stateType)
396396
case *amqp.StateReceived:

pkg/metrics/metrics.go

Lines changed: 93 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -7,12 +7,14 @@ import (
77
"fmt"
88
"io"
99
"maps"
10+
"math"
1011
"net/http"
1112
"os"
1213
"runtime"
1314
"strconv"
1415
"strings"
1516
"sync"
17+
"sync/atomic"
1618
"syscall"
1719
"time"
1820

@@ -153,11 +155,62 @@ func MessagesConsumedOutOfOrderMetric(priority int) *vmetrics.Counter {
153155
return vmetrics.GetOrCreateCounter(`omq_messages_consumed_out_of_order` + labelsToString(labels))
154156
}
155157

158+
type latencyTracker struct {
159+
min atomic.Int64
160+
max atomic.Int64
161+
}
162+
163+
func newLatencyTracker() *latencyTracker {
164+
t := &latencyTracker{}
165+
t.min.Store(math.MaxInt64)
166+
return t
167+
}
168+
169+
func (t *latencyTracker) record(latency time.Duration) {
170+
ns := latency.Nanoseconds()
171+
for {
172+
old := t.min.Load()
173+
if ns >= old || t.min.CompareAndSwap(old, ns) {
174+
break
175+
}
176+
}
177+
for {
178+
old := t.max.Load()
179+
if ns <= old || t.max.CompareAndSwap(old, ns) {
180+
break
181+
}
182+
}
183+
}
184+
185+
func (t *latencyTracker) reset() (min, max time.Duration, ok bool) {
186+
minNs := t.min.Swap(math.MaxInt64)
187+
maxNs := t.max.Swap(0)
188+
if minNs == math.MaxInt64 {
189+
return 0, 0, false
190+
}
191+
return time.Duration(minNs), time.Duration(maxNs), true
192+
}
193+
156194
var (
157195
previouslyPublished uint64
158196
previouslyConsumed uint64
197+
pubLatencyTracker = newLatencyTracker()
198+
e2eLatencyTracker = newLatencyTracker()
159199
)
160200

201+
func RecordPublishingLatency(latency time.Duration) {
202+
PublishingLatency.Update(latency.Seconds())
203+
pubLatencyTracker.record(latency)
204+
}
205+
206+
func RecordEndToEndLatency(latency time.Duration) {
207+
if latency <= 0 {
208+
return
209+
}
210+
EndToEndLatency.Update(latency.Seconds())
211+
e2eLatencyTracker.record(latency)
212+
}
213+
161214
func (m *MetricsServer) printMessageRates(ctx context.Context) {
162215
go func() {
163216
zeroRateSeconds := 0
@@ -183,16 +236,16 @@ func (m *MetricsServer) printMessageRates(ctx context.Context) {
183236
previouslyPublished = published
184237
previouslyConsumed = consumed
185238

239+
fields := buildRateFields(publishedRate, consumedRate)
240+
186241
if publishedRate == 0 && consumedRate == 0 {
187242
if !paused {
188243
zeroRateSeconds++
189244
if zeroRateSeconds >= 5 {
190245
log.Info("metric printing paused (no activity), will resume when there's something new to print")
191246
paused = true
192247
} else {
193-
log.Print("",
194-
"published", fmt.Sprintf("%v/s", publishedRate),
195-
"consumed", fmt.Sprintf("%v/s", consumedRate))
248+
log.Print("", fields...)
196249
}
197250
}
198251
} else {
@@ -202,15 +255,49 @@ func (m *MetricsServer) printMessageRates(ctx context.Context) {
202255
paused = false
203256
}
204257
zeroRateSeconds = 0
205-
log.Print("",
206-
"published", fmt.Sprintf("%v/s", publishedRate),
207-
"consumed", fmt.Sprintf("%v/s", consumedRate))
258+
log.Print("", fields...)
208259
}
209260
}
210261
}
211262
}()
212263
}
213264

265+
func buildRateFields(publishedRate, consumedRate uint64) []any {
266+
var fields []any
267+
fields = append(fields, "published", fmt.Sprintf("%v/s", publishedRate))
268+
fields = append(fields, "consumed", fmt.Sprintf("%v/s", consumedRate))
269+
if pubMin, pubMax, ok := pubLatencyTracker.reset(); ok {
270+
fields = append(fields, "pub_min", formatLatency(pubMin), "pub_max", formatLatency(pubMax))
271+
}
272+
if e2eMin, e2eMax, ok := e2eLatencyTracker.reset(); ok {
273+
fields = append(fields, "e2e_min", formatLatency(e2eMin), "e2e_max", formatLatency(e2eMax))
274+
}
275+
return fields
276+
}
277+
278+
func formatLatency(d time.Duration) string {
279+
switch {
280+
case d < time.Millisecond:
281+
us := float64(d.Microseconds())
282+
if us == float64(int64(us)) {
283+
return fmt.Sprintf("%dµs", int64(us))
284+
}
285+
return fmt.Sprintf("%.1f µs", us)
286+
case d < time.Second:
287+
ms := float64(d.Nanoseconds()) / float64(time.Millisecond)
288+
if ms == float64(int64(ms)) {
289+
return fmt.Sprintf("%dms", int64(ms))
290+
}
291+
return fmt.Sprintf("%.1fms", ms)
292+
default:
293+
s := d.Seconds()
294+
if s == float64(int64(s)) {
295+
return fmt.Sprintf("%ds", int64(s))
296+
}
297+
return fmt.Sprintf("%.2fs", s)
298+
}
299+
}
300+
214301
func (m *MetricsServer) StartTime(t time.Time) {
215302
m.started = t
216303
}

pkg/mqtt/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -41,7 +41,7 @@ func (c MqttConsumer) Start(cosumerReady chan bool) {
4141
handleMessage(payload)
4242
metrics.MessagesConsumedMetric(0).Inc()
4343
timeSent, latency := utils.CalculateEndToEndLatency(&payload)
44-
metrics.EndToEndLatency.UpdateDuration(timeSent)
44+
metrics.RecordEndToEndLatency(latency)
4545

4646
if c.Config.LogOutOfOrder && timeSent.Before(previousMessageTimeSent) {
4747
metrics.MessagesConsumedOutOfOrderMetric(0).Inc()

pkg/mqtt/consumer_v5.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,7 @@ func (c Mqtt5Consumer) Start(consumerReady chan bool) {
4040
metrics.MessagesConsumedMetric(0).Inc()
4141
payload := rcv.Packet.Payload
4242
timeSent, latency := utils.CalculateEndToEndLatency(&payload)
43-
metrics.EndToEndLatency.UpdateDuration(timeSent)
43+
metrics.RecordEndToEndLatency(latency)
4444

4545
if c.Config.LogOutOfOrder && timeSent.Before(previousMessageTimeSent) {
4646
metrics.MessagesConsumedOutOfOrderMetric(0).Inc()

pkg/mqtt/publisher.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -152,7 +152,7 @@ func (p MqttPublisher) Send() {
152152
log.Error("message sending failure", "id", p.Id, "error", token.Error())
153153
} else {
154154
metrics.MessagesPublished.Inc()
155-
metrics.PublishingLatency.Update(latency.Seconds())
155+
metrics.RecordPublishingLatency(latency)
156156
log.Debug("message sent", "id", p.Id, "destination", p.Topic, "latency", latency)
157157
}
158158
}

pkg/mqtt/publisher_v5.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -157,7 +157,7 @@ func (p Mqtt5Publisher) Send() {
157157
}
158158
latency := time.Since(startTime)
159159
metrics.MessagesPublished.Inc()
160-
metrics.PublishingLatency.Update(latency.Seconds())
160+
metrics.RecordPublishingLatency(latency)
161161
log.Debug("message sent", "id", p.Id, "destination", p.Topic, "latency", latency)
162162
}
163163

pkg/stomp/consumer.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -126,7 +126,7 @@ func (c *StompConsumer) Start(consumerReady chan bool) {
126126
}
127127

128128
timeSent, latency := utils.CalculateEndToEndLatency(&msg.Body)
129-
metrics.EndToEndLatency.UpdateDuration(timeSent)
129+
metrics.RecordEndToEndLatency(latency)
130130

131131
priority, _ := strconv.Atoi(msg.Header.Get("priority"))
132132

0 commit comments

Comments
 (0)