Skip to content

Commit 4dcc877

Browse files
feat: report user query bytes (#27188) (#27198)
In SHOW STATS, include per-user query response bytes. The httpd service, returns a new statistics object. userquerybytes. Off by default, it can be turned on with the environment variable INFLUXDB_HTTP_USER_QUERY_BYTES_ENABLED or the configuration parameter UserQueryBytesEnabled in the HTTP section. This is available through SHOW STATS either when all statistics are reported, or alone with `FOR 'userquerybytes'` It is returned by /debug/vars, and stored in the _internal database when that is enabled. (cherry picked from commit 858a2c6) Fixes #27195
1 parent 2b71644 commit 4dcc877

File tree

6 files changed

+706
-49
lines changed

6 files changed

+706
-49
lines changed

pkg/data/gensyncmap/gensyncmap.go

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -45,3 +45,12 @@ func (m *Map[K, V]) Len() int {
4545
})
4646
return n
4747
}
48+
49+
func (m *Map[K, V]) IsEmpty() bool {
50+
isEmpty := true
51+
m.m.Range(func(_, _ any) bool {
52+
isEmpty = false
53+
return false
54+
})
55+
return isEmpty
56+
}

services/httpd/config.go

Lines changed: 35 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -32,39 +32,41 @@ const (
3232

3333
// Config represents a configuration for a HTTP service.
3434
type Config struct {
35-
Enabled bool `toml:"enabled"`
36-
BindAddress string `toml:"bind-address"`
37-
AuthEnabled bool `toml:"auth-enabled"`
38-
LogEnabled bool `toml:"log-enabled"`
39-
SuppressWriteLog bool `toml:"suppress-write-log"`
40-
WriteTracing bool `toml:"write-tracing"`
41-
FluxEnabled bool `toml:"flux-enabled"`
42-
FluxLogEnabled bool `toml:"flux-log-enabled"`
43-
FluxTesting bool `toml:"flux-testing"`
44-
PprofEnabled bool `toml:"pprof-enabled"`
45-
PprofAuthEnabled bool `toml:"pprof-auth-enabled"`
46-
DebugPprofEnabled bool `toml:"debug-pprof-enabled"`
47-
PingAuthEnabled bool `toml:"ping-auth-enabled"`
48-
PromReadAuthEnabled bool `toml:"prom-read-auth-enabled"`
49-
HTTPHeaders map[string]string `toml:"headers"`
50-
HTTPSEnabled bool `toml:"https-enabled"`
51-
HTTPSCertificate string `toml:"https-certificate"`
52-
HTTPSPrivateKey string `toml:"https-private-key"`
53-
MaxRowLimit int `toml:"max-row-limit"`
54-
MaxConnectionLimit int `toml:"max-connection-limit"`
55-
SharedSecret string `toml:"shared-secret"`
56-
Realm string `toml:"realm"`
57-
UnixSocketEnabled bool `toml:"unix-socket-enabled"`
58-
UnixSocketGroup *toml.Group `toml:"unix-socket-group"`
59-
UnixSocketPermissions toml.FileMode `toml:"unix-socket-permissions"`
60-
BindSocket string `toml:"bind-socket"`
61-
MaxBodySize int `toml:"max-body-size"`
62-
AccessLogPath string `toml:"access-log-path"`
63-
AccessLogStatusFilters []StatusFilter `toml:"access-log-status-filters"`
64-
MaxConcurrentWriteLimit int `toml:"max-concurrent-write-limit"`
65-
MaxEnqueuedWriteLimit int `toml:"max-enqueued-write-limit"`
66-
EnqueuedWriteTimeout time.Duration `toml:"enqueued-write-timeout"`
67-
TLS *tls.Config `toml:"-"`
35+
Enabled bool `toml:"enabled"`
36+
BindAddress string `toml:"bind-address"`
37+
AuthEnabled bool `toml:"auth-enabled"`
38+
LogEnabled bool `toml:"log-enabled"`
39+
SuppressWriteLog bool `toml:"suppress-write-log"`
40+
WriteTracing bool `toml:"write-tracing"`
41+
FluxEnabled bool `toml:"flux-enabled"`
42+
FluxLogEnabled bool `toml:"flux-log-enabled"`
43+
FluxTesting bool `toml:"flux-testing"`
44+
PprofEnabled bool `toml:"pprof-enabled"`
45+
PprofAuthEnabled bool `toml:"pprof-auth-enabled"`
46+
DebugPprofEnabled bool `toml:"debug-pprof-enabled"`
47+
PingAuthEnabled bool `toml:"ping-auth-enabled"`
48+
PromReadAuthEnabled bool `toml:"prom-read-auth-enabled"`
49+
HTTPHeaders map[string]string `toml:"headers"`
50+
HTTPSEnabled bool `toml:"https-enabled"`
51+
HTTPSCertificate string `toml:"https-certificate"`
52+
HTTPSPrivateKey string `toml:"https-private-key"`
53+
HTTPSInsecureCertificate bool `toml:"https-insecure-certificate"`
54+
MaxRowLimit int `toml:"max-row-limit"`
55+
MaxConnectionLimit int `toml:"max-connection-limit"`
56+
SharedSecret string `toml:"shared-secret"`
57+
Realm string `toml:"realm"`
58+
UnixSocketEnabled bool `toml:"unix-socket-enabled"`
59+
UnixSocketGroup *toml.Group `toml:"unix-socket-group"`
60+
UnixSocketPermissions toml.FileMode `toml:"unix-socket-permissions"`
61+
BindSocket string `toml:"bind-socket"`
62+
MaxBodySize int `toml:"max-body-size"`
63+
AccessLogPath string `toml:"access-log-path"`
64+
AccessLogStatusFilters []StatusFilter `toml:"access-log-status-filters"`
65+
MaxConcurrentWriteLimit int `toml:"max-concurrent-write-limit"`
66+
MaxEnqueuedWriteLimit int `toml:"max-enqueued-write-limit"`
67+
EnqueuedWriteTimeout time.Duration `toml:"enqueued-write-timeout"`
68+
UserQueryBytesEnabled bool `toml:"user-query-bytes-enabled"`
69+
TLS *tls.Config `toml:"-"`
6870
}
6971

7072
// NewConfig returns a new Config with default settings.

services/httpd/handler.go

Lines changed: 63 additions & 15 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@ import (
3030
"github.com/influxdata/influxdb/models"
3131
"github.com/influxdata/influxdb/monitor"
3232
"github.com/influxdata/influxdb/monitor/diagnostics"
33+
"github.com/influxdata/influxdb/pkg/data/gensyncmap"
3334
"github.com/influxdata/influxdb/prometheus"
3435
"github.com/influxdata/influxdb/query"
3536
"github.com/influxdata/influxdb/services/meta"
@@ -143,12 +144,13 @@ type Handler struct {
143144
Controller Controller
144145
CompilerMappings flux.CompilerMappings
145146

146-
Config *Config
147-
Logger *zap.Logger
148-
CLFLogger *log.Logger
149-
accessLog *os.File
150-
accessLogFilters StatusFilters
151-
stats *Statistics
147+
Config *Config
148+
Logger *zap.Logger
149+
CLFLogger *log.Logger
150+
accessLog *os.File
151+
accessLogFilters StatusFilters
152+
stats *Statistics
153+
queryBytesPerUser gensyncmap.Map[string, *atomic.Int64]
152154

153155
requestTracker *RequestTracker
154156
writeThrottler *Throttler
@@ -322,7 +324,12 @@ func NewHandler(c Config) *Handler {
322324
return func(w http.ResponseWriter, r *http.Request, user meta.User) {
323325
// TODO: This is the only place we use AuthorizeUnrestricted. It would be better to use an explicit permission
324326
if user == nil || !user.AuthorizeUnrestricted() {
325-
h.Logger.Info("Unauthorized request", zap.String("user", user.ID()), zap.String("path", r.URL.Path))
327+
// Don't panic
328+
id := ""
329+
if user != nil {
330+
id = user.ID()
331+
}
332+
h.Logger.Info("Unauthorized request", zap.String("user", id), zap.String("path", r.URL.Path))
326333
h.httpError(w, "error authorizing admin access", http.StatusForbidden)
327334
return
328335
}
@@ -442,7 +449,7 @@ type Statistics struct {
442449

443450
// Statistics returns statistics for periodic monitoring.
444451
func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
445-
return []models.Statistic{{
452+
stats := []models.Statistic{{
446453
Name: "httpd",
447454
Tags: tags,
448455
Values: map[string]interface{}{
@@ -472,6 +479,35 @@ func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
472479
statFluxQueryRequestBytesTransmitted: atomic.LoadInt64(&h.stats.FluxQueryRequestBytesTransmitted),
473480
},
474481
}}
482+
483+
// Add per-user query bytes as separate statistics (one per user) if enabled
484+
if h.Config.UserQueryBytesEnabled && !h.queryBytesPerUser.IsEmpty() {
485+
h.queryBytesPerUser.Range(func(user string, counter *atomic.Int64) bool {
486+
userTag := user
487+
if user == "" {
488+
userTag = StatAnonymousUser
489+
}
490+
userTags := models.NewTags(tags).Merge(map[string]string{StatUserTagKey: userTag}).Map()
491+
stats = append(stats, models.Statistic{
492+
Name: "userquerybytes",
493+
Tags: userTags,
494+
Values: map[string]interface{}{statUserQueryRespBytes: counter.Load()},
495+
})
496+
return true
497+
})
498+
}
499+
500+
return stats
501+
}
502+
503+
// addQueryBytesForUser atomically adds bytes to the per-user query bytes counter.
504+
// This is a no-op if UserQueryBytesEnabled is false.
505+
func (h *Handler) addQueryBytesForUser(user string, n int64) {
506+
if !h.Config.UserQueryBytesEnabled {
507+
return
508+
}
509+
counter, _ := h.queryBytesPerUser.LoadOrStore(user, &atomic.Int64{})
510+
counter.Add(n)
475511
}
476512

477513
// AddRoutes sets the provided routes on the handler.
@@ -782,6 +818,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U
782818
Results: []*query.Result{r},
783819
})
784820
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
821+
h.addQueryBytesForUser(userName, int64(n))
785822
w.(http.Flusher).Flush()
786823
continue
787824
}
@@ -873,6 +910,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U
873910
if !chunked {
874911
n, _ := rw.WriteResponse(resp)
875912
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(n))
913+
h.addQueryBytesForUser(userName, int64(n))
876914
}
877915
}
878916

@@ -2031,6 +2069,11 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
20312069
return
20322070
}
20332071

2072+
var userName string
2073+
if user != nil {
2074+
userName = user.ID()
2075+
}
2076+
20342077
respond := func(resp *prompb.ReadResponse) {
20352078
data, err := resp.Marshal()
20362079
if err != nil {
@@ -2048,6 +2091,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
20482091
}
20492092

20502093
atomic.AddInt64(&h.stats.QueryRequestBytesTransmitted, int64(len(compressed)))
2094+
h.addQueryBytesForUser(userName, int64(len(compressed)))
20512095
}
20522096

20532097
ctx := context.Background()
@@ -2134,6 +2178,11 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me
21342178
atomic.AddInt64(&h.stats.FluxQueryRequestDuration, time.Since(start).Nanoseconds())
21352179
}(time.Now())
21362180

2181+
var userName string
2182+
if user != nil {
2183+
userName = user.ID()
2184+
}
2185+
21372186
req, err := decodeQueryRequest(r)
21382187
if err != nil {
21392188
h.httpError(w, err.Error(), http.StatusBadRequest)
@@ -2201,14 +2250,13 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me
22012250
defer results.Release()
22022251

22032252
n, err = encoder.Encode(w, results)
2204-
if err != nil {
2205-
if n == 0 {
2206-
// If the encoder did not write anything, we can write an error header.
2207-
h.httpError(w, err.Error(), http.StatusInternalServerError)
2208-
} else {
2209-
atomic.AddInt64(&h.stats.FluxQueryRequestBytesTransmitted, int64(n))
2210-
}
2253+
if err != nil && n == 0 {
2254+
// If the encoder did not write anything, we can write an error header.
2255+
h.httpError(w, err.Error(), http.StatusInternalServerError)
2256+
return
22112257
}
2258+
atomic.AddInt64(&h.stats.FluxQueryRequestBytesTransmitted, int64(n))
2259+
h.addQueryBytesForUser(userName, int64(n))
22122260
}
22132261
}
22142262

0 commit comments

Comments
 (0)