@@ -7,12 +7,14 @@ import (
77 "fmt"
88 "io"
99 "maps"
10+ "math"
1011 "net/http"
1112 "os"
1213 "runtime"
1314 "strconv"
1415 "strings"
1516 "sync"
17+ "sync/atomic"
1618 "syscall"
1719 "time"
1820
@@ -153,11 +155,62 @@ func MessagesConsumedOutOfOrderMetric(priority int) *vmetrics.Counter {
153155 return vmetrics .GetOrCreateCounter (`omq_messages_consumed_out_of_order` + labelsToString (labels ))
154156}
155157
158+ type latencyTracker struct {
159+ min atomic.Int64
160+ max atomic.Int64
161+ }
162+
163+ func newLatencyTracker () * latencyTracker {
164+ t := & latencyTracker {}
165+ t .min .Store (math .MaxInt64 )
166+ return t
167+ }
168+
169+ func (t * latencyTracker ) record (latency time.Duration ) {
170+ ns := latency .Nanoseconds ()
171+ for {
172+ old := t .min .Load ()
173+ if ns >= old || t .min .CompareAndSwap (old , ns ) {
174+ break
175+ }
176+ }
177+ for {
178+ old := t .max .Load ()
179+ if ns <= old || t .max .CompareAndSwap (old , ns ) {
180+ break
181+ }
182+ }
183+ }
184+
185+ func (t * latencyTracker ) reset () (min , max time.Duration , ok bool ) {
186+ minNs := t .min .Swap (math .MaxInt64 )
187+ maxNs := t .max .Swap (0 )
188+ if minNs == math .MaxInt64 {
189+ return 0 , 0 , false
190+ }
191+ return time .Duration (minNs ), time .Duration (maxNs ), true
192+ }
193+
156194var (
157195 previouslyPublished uint64
158196 previouslyConsumed uint64
197+ pubLatencyTracker = newLatencyTracker ()
198+ e2eLatencyTracker = newLatencyTracker ()
159199)
160200
201+ func RecordPublishingLatency (latency time.Duration ) {
202+ PublishingLatency .Update (latency .Seconds ())
203+ pubLatencyTracker .record (latency )
204+ }
205+
206+ func RecordEndToEndLatency (latency time.Duration ) {
207+ if latency <= 0 {
208+ return
209+ }
210+ EndToEndLatency .Update (latency .Seconds ())
211+ e2eLatencyTracker .record (latency )
212+ }
213+
161214func (m * MetricsServer ) printMessageRates (ctx context.Context ) {
162215 go func () {
163216 zeroRateSeconds := 0
@@ -183,16 +236,16 @@ func (m *MetricsServer) printMessageRates(ctx context.Context) {
183236 previouslyPublished = published
184237 previouslyConsumed = consumed
185238
239+ fields := buildRateFields (publishedRate , consumedRate )
240+
186241 if publishedRate == 0 && consumedRate == 0 {
187242 if ! paused {
188243 zeroRateSeconds ++
189244 if zeroRateSeconds >= 5 {
190245 log .Info ("metric printing paused (no activity), will resume when there's something new to print" )
191246 paused = true
192247 } else {
193- log .Print ("" ,
194- "published" , fmt .Sprintf ("%v/s" , publishedRate ),
195- "consumed" , fmt .Sprintf ("%v/s" , consumedRate ))
248+ log .Print ("" , fields ... )
196249 }
197250 }
198251 } else {
@@ -202,15 +255,49 @@ func (m *MetricsServer) printMessageRates(ctx context.Context) {
202255 paused = false
203256 }
204257 zeroRateSeconds = 0
205- log .Print ("" ,
206- "published" , fmt .Sprintf ("%v/s" , publishedRate ),
207- "consumed" , fmt .Sprintf ("%v/s" , consumedRate ))
258+ log .Print ("" , fields ... )
208259 }
209260 }
210261 }
211262 }()
212263}
213264
265+ func buildRateFields (publishedRate , consumedRate uint64 ) []any {
266+ var fields []any
267+ fields = append (fields , "published" , fmt .Sprintf ("%v/s" , publishedRate ))
268+ fields = append (fields , "consumed" , fmt .Sprintf ("%v/s" , consumedRate ))
269+ if pubMin , pubMax , ok := pubLatencyTracker .reset (); ok {
270+ fields = append (fields , "pub_min" , formatLatency (pubMin ), "pub_max" , formatLatency (pubMax ))
271+ }
272+ if e2eMin , e2eMax , ok := e2eLatencyTracker .reset (); ok {
273+ fields = append (fields , "e2e_min" , formatLatency (e2eMin ), "e2e_max" , formatLatency (e2eMax ))
274+ }
275+ return fields
276+ }
277+
278+ func formatLatency (d time.Duration ) string {
279+ switch {
280+ case d < time .Millisecond :
281+ us := float64 (d .Microseconds ())
282+ if us == float64 (int64 (us )) {
283+ return fmt .Sprintf ("%dµs" , int64 (us ))
284+ }
285+ return fmt .Sprintf ("%.1f µs" , us )
286+ case d < time .Second :
287+ ms := float64 (d .Nanoseconds ()) / float64 (time .Millisecond )
288+ if ms == float64 (int64 (ms )) {
289+ return fmt .Sprintf ("%dms" , int64 (ms ))
290+ }
291+ return fmt .Sprintf ("%.1fms" , ms )
292+ default :
293+ s := d .Seconds ()
294+ if s == float64 (int64 (s )) {
295+ return fmt .Sprintf ("%ds" , int64 (s ))
296+ }
297+ return fmt .Sprintf ("%.2fs" , s )
298+ }
299+ }
300+
214301func (m * MetricsServer ) StartTime (t time.Time ) {
215302 m .started = t
216303}
0 commit comments