Skip to content

Commit f75be56

Browse files
authored
feat: Adds 'cq' diagnostics to /debug/vars (#26874) (#26884)
1 parent 0e656d6 commit f75be56

File tree

5 files changed

+68
-0
lines changed

5 files changed

+68
-0
lines changed

cmd/influxd/run/server.go

Lines changed: 3 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -416,6 +416,9 @@ func (s *Server) appendContinuousQueryService(c continuous_querier.Config) {
416416
srv.QueryExecutor = s.QueryExecutor
417417
srv.Monitor = s.Monitor
418418
s.Services = append(s.Services, srv)
419+
if s.Monitor != nil {
420+
s.Monitor.RegisterDiagnosticsClient("cq", srv)
421+
}
419422
}
420423

421424
// Err returns an error channel that multiplexes all out of band errors received from all services.

monitor/service.go

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -241,6 +241,7 @@ func (m *Monitor) Close() error {
241241
m.DeregisterDiagnosticsClient("system")
242242
m.DeregisterDiagnosticsClient("stats")
243243
m.DeregisterDiagnosticsClient("config")
244+
m.DeregisterDiagnosticsClient("cq")
244245
return nil
245246
}
246247

monitor/service_test.go

Lines changed: 22 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -13,9 +13,11 @@ import (
1313

1414
"github.com/influxdata/influxdb/models"
1515
"github.com/influxdata/influxdb/monitor"
16+
"github.com/influxdata/influxdb/services/continuous_querier"
1617
"github.com/influxdata/influxdb/services/meta"
1718
"github.com/influxdata/influxdb/toml"
1819
"github.com/influxdata/influxdb/tsdb"
20+
"github.com/stretchr/testify/require"
1921
"go.uber.org/zap"
2022
"go.uber.org/zap/zaptest/observer"
2123
)
@@ -414,6 +416,26 @@ func TestMonitor_QuickClose(t *testing.T) {
414416
}
415417
}
416418

419+
func TestMonitor_CQStatistics(t *testing.T) {
420+
s := monitor.New(nil, monitor.Config{}, &tsdb.Config{})
421+
err := s.Open()
422+
require.NoError(t, err, "monitor open")
423+
defer s.Close()
424+
425+
s.RegisterDiagnosticsClient("cq", continuous_querier.NewService(continuous_querier.NewConfig()))
426+
stats, err := s.Statistics(nil)
427+
require.NoError(t, err, "cq statistics")
428+
429+
for _, stat := range stats {
430+
if stat.Name == "cq" {
431+
require.Equal(t, stat.Values, map[string]interface{}{
432+
"queryOk": 0,
433+
"queryFail": 0,
434+
}, "statistics")
435+
}
436+
}
437+
}
438+
417439
func TestStatistic_ValueNames(t *testing.T) {
418440
statistic := monitor.Statistic{
419441
Statistic: models.Statistic{

services/continuous_querier/service.go

Lines changed: 7 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111

1212
"github.com/influxdata/influxdb/logger"
1313
"github.com/influxdata/influxdb/models"
14+
"github.com/influxdata/influxdb/monitor/diagnostics"
1415
"github.com/influxdata/influxdb/query"
1516
"github.com/influxdata/influxdb/services/meta"
1617
"github.com/influxdata/influxql"
@@ -169,6 +170,12 @@ func (s *Service) Statistics(tags map[string]string) []models.Statistic {
169170
}}
170171
}
171172

173+
// Required for Monitor interface. Currently does not return any
174+
// diagnostic values.
175+
func (s *Service) Diagnostics() (*diagnostics.Diagnostics, error) {
176+
return &diagnostics.Diagnostics{}, nil
177+
}
178+
172179
// Run runs the specified continuous query, or all CQs if none is specified.
173180
func (s *Service) Run(database, name string, t time.Time) error {
174181
var dbs []meta.DatabaseInfo

services/httpd/handler.go

Lines changed: 35 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -2395,6 +2395,33 @@ func (h *Handler) serveExpvar(w http.ResponseWriter, r *http.Request) {
23952395
uniqueKeys := make(map[string]int)
23962396

23972397
for _, s := range stats {
2398+
if s.Name == "cq" {
2399+
jv, err := parseCQStatistics(&s.Statistic)
2400+
if err != nil {
2401+
h.httpError(w, err.Error(), http.StatusInternalServerError)
2402+
return
2403+
}
2404+
data, err := json.Marshal(jv)
2405+
if err != nil {
2406+
h.httpError(w, err.Error(), http.StatusInternalServerError)
2407+
return
2408+
}
2409+
2410+
if !first {
2411+
_, err := fmt.Fprintln(w, ",")
2412+
if err != nil {
2413+
h.httpError(w, err.Error(), http.StatusInternalServerError)
2414+
return
2415+
}
2416+
}
2417+
first = false
2418+
_, err = fmt.Fprintf(w, "\"cq\": %s", data)
2419+
if err != nil {
2420+
h.httpError(w, err.Error(), http.StatusInternalServerError)
2421+
return
2422+
}
2423+
continue
2424+
}
23982425
val, err := json.Marshal(s)
23992426
if err != nil {
24002427
continue
@@ -2644,6 +2671,14 @@ func parseConfigDiagnostics(d *diagnostics.Diagnostics) (map[string]interface{},
26442671
return m, nil
26452672
}
26462673

2674+
func parseCQStatistics(s *models.Statistic) (map[string]interface{}, error) {
2675+
if len(s.Values) == 0 {
2676+
return nil, fmt.Errorf("no cq statistics data available")
2677+
}
2678+
2679+
return s.Values, nil
2680+
}
2681+
26472682
// httpError writes an error to the client in a standard format.
26482683
func (h *Handler) httpError(w http.ResponseWriter, errmsg string, code int) {
26492684
if code == http.StatusUnauthorized {

0 commit comments

Comments
 (0)