Skip to content

Commit 0e656d6

Browse files
authored
feat: Adds statistics measurement for compact-throughput (#26754) (#26883)
1 parent 7f56ad3 commit 0e656d6

File tree

6 files changed

+113
-1
lines changed

6 files changed

+113
-1
lines changed

cmd/influxd/run/server.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -513,6 +513,8 @@ func (s *Server) Open() error {
513513
return fmt.Errorf("open points writer: %s", err)
514514
}
515515

516+
s.Monitor.WithCompactThroughputLimiter(s.TSDBStore.EngineOptions.CompactionThroughputLimiter)
517+
516518
for _, service := range s.Services {
517519
if err := service.Open(); err != nil {
518520
return fmt.Errorf("open service: %s", err)

monitor/service.go

Lines changed: 20 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,6 +16,7 @@ import (
1616
"github.com/influxdata/influxdb/logger"
1717
"github.com/influxdata/influxdb/models"
1818
"github.com/influxdata/influxdb/monitor/diagnostics"
19+
"github.com/influxdata/influxdb/pkg/limiter"
1920
"github.com/influxdata/influxdb/services/meta"
2021
"github.com/influxdata/influxdb/tsdb"
2122
"go.uber.org/zap"
@@ -100,7 +101,8 @@ type Monitor struct {
100101
// TSDB configuration for diagnostics
101102
TSDBConfig *tsdb.Config
102103

103-
Logger *zap.Logger
104+
Logger *zap.Logger
105+
Limiter limiter.Rate
104106
}
105107

106108
// PointsWriter is a simplified interface for writing the points the monitor gathers.
@@ -150,6 +152,15 @@ func (m *Monitor) Open() error {
150152
m.RegisterDiagnosticsClient("runtime", &goRuntime{})
151153
m.RegisterDiagnosticsClient("network", &network{})
152154
m.RegisterDiagnosticsClient("system", &system{})
155+
156+
if m.Limiter != nil {
157+
m.RegisterDiagnosticsClient("stats", &stats{
158+
comp: compactThroughputStats{
159+
limiter: m.Limiter,
160+
},
161+
})
162+
}
163+
153164
if m.TSDBConfig != nil {
154165
m.RegisterDiagnosticsClient("config", m.TSDBConfig)
155166
}
@@ -200,6 +211,12 @@ func (m *Monitor) writePoints(p models.Points) error {
200211
return nil
201212
}
202213

214+
func (m *Monitor) WithCompactThroughputLimiter(limiter limiter.Rate) {
215+
m.mu.Lock()
216+
defer m.mu.Unlock()
217+
m.Limiter = limiter
218+
}
219+
203220
// Close closes the monitor system.
204221
func (m *Monitor) Close() error {
205222
if !m.open() {
@@ -222,6 +239,8 @@ func (m *Monitor) Close() error {
222239
m.DeregisterDiagnosticsClient("runtime")
223240
m.DeregisterDiagnosticsClient("network")
224241
m.DeregisterDiagnosticsClient("system")
242+
m.DeregisterDiagnosticsClient("stats")
243+
m.DeregisterDiagnosticsClient("config")
225244
return nil
226245
}
227246

monitor/stats.go

Lines changed: 34 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,34 @@
1+
package monitor
2+
3+
import (
4+
"math"
5+
6+
"github.com/influxdata/influxdb/monitor/diagnostics"
7+
"github.com/influxdata/influxdb/pkg/limiter"
8+
"golang.org/x/time/rate"
9+
)
10+
11+
// stats captures statistics
12+
type stats struct {
13+
comp compactThroughputStats
14+
}
15+
16+
type compactThroughputStats struct {
17+
limiter limiter.Rate
18+
}
19+
20+
// CompactThroughputUsage calculates the percentage of burst capacity currently consumed by compaction.
21+
func (s *stats) CompactThroughputUsage() float64 {
22+
percentage := 100 * (1 - rate.Limit(s.comp.limiter.Tokens())/s.comp.limiter.Limit())
23+
return float64(percentage)
24+
}
25+
26+
func (s *stats) Diagnostics() (*diagnostics.Diagnostics, error) {
27+
compactThroughputUsage := s.CompactThroughputUsage()
28+
compactThroughputUsageTrunc := math.Round(compactThroughputUsage*100.0) / 100.0
29+
d := map[string]interface{}{
30+
"compact-throughput-usage-percentage": compactThroughputUsageTrunc,
31+
}
32+
33+
return diagnostics.RowFromMap(d), nil
34+
}

monitor/stats_test.go

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,31 @@
1+
package monitor_test
2+
3+
import (
4+
"testing"
5+
6+
"github.com/influxdata/influxdb/monitor"
7+
"github.com/influxdata/influxdb/pkg/limiter"
8+
"github.com/influxdata/influxdb/tsdb"
9+
"github.com/stretchr/testify/require"
10+
)
11+
12+
func TestDiagnostics_Stats(t *testing.T) {
13+
s := monitor.New(nil, monitor.Config{}, &tsdb.Config{})
14+
compactLimiter := limiter.NewRate(100, 100)
15+
16+
s.WithCompactThroughputLimiter(compactLimiter)
17+
18+
require.NoError(t, s.Open(), "opening monitor")
19+
defer func() {
20+
require.NoError(t, s.Close(), "closing monitor")
21+
}()
22+
23+
d, err := s.Diagnostics()
24+
require.NoError(t, err, "getting diagnostics")
25+
26+
diags, ok := d["stats"]
27+
require.True(t, ok, "expected stats diagnostic client to be registered")
28+
29+
got, exp := diags.Columns, []string{"compact-throughput-usage-percentage"}
30+
require.Equal(t, exp, got)
31+
}

pkg/limiter/writer.go

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,8 @@ type Writer struct {
1818
type Rate interface {
1919
WaitN(ctx context.Context, n int) error
2020
Burst() int
21+
Tokens() float64
22+
Limit() rate.Limit
2123
}
2224

2325
func NewRate(bytesPerSec, burstLimit int) Rate {

services/httpd/handler.go

Lines changed: 24 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2327,6 +2327,30 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
23272327
}
23282328
}
23292329

2330+
if val := diags["stats"]; val != nil {
2331+
if len(val.Rows) > 0 && len(val.Columns) > 0 {
2332+
// Create a map of column names to values
2333+
statsMap := make(map[string]interface{})
2334+
for i, col := range val.Columns {
2335+
if i < len(val.Rows[0]) {
2336+
statsMap[col] = val.Rows[0][i]
2337+
}
2338+
}
2339+
2340+
data, err := json.Marshal(statsMap)
2341+
if err != nil {
2342+
h.httpError(w, err.Error(), http.StatusInternalServerError)
2343+
return
2344+
}
2345+
2346+
if !first {
2347+
fmt.Fprintln(w, ",")
2348+
}
2349+
first = false
2350+
fmt.Fprintf(w, "\"stats\": %s", data)
2351+
}
2352+
}
2353+
23302354
// We're going to print some kind of crypto data, we just
23312355
// need to find the proper source for it.
23322356
{

0 commit comments

Comments
 (0)