@@ -30,6 +30,7 @@ import (
3030 "github.com/influxdata/influxdb/models"
3131 "github.com/influxdata/influxdb/monitor"
3232 "github.com/influxdata/influxdb/monitor/diagnostics"
33+ "github.com/influxdata/influxdb/pkg/data/gensyncmap"
3334 "github.com/influxdata/influxdb/prometheus"
3435 "github.com/influxdata/influxdb/query"
3536 "github.com/influxdata/influxdb/services/meta"
@@ -143,12 +144,13 @@ type Handler struct {
143144 Controller Controller
144145 CompilerMappings flux.CompilerMappings
145146
146- Config * Config
147- Logger * zap.Logger
148- CLFLogger * log.Logger
149- accessLog * os.File
150- accessLogFilters StatusFilters
151- stats * Statistics
147+ Config * Config
148+ Logger * zap.Logger
149+ CLFLogger * log.Logger
150+ accessLog * os.File
151+ accessLogFilters StatusFilters
152+ stats * Statistics
153+ queryBytesPerUser gensyncmap.Map [string , * atomic.Int64 ]
152154
153155 requestTracker * RequestTracker
154156 writeThrottler * Throttler
@@ -322,7 +324,12 @@ func NewHandler(c Config) *Handler {
322324 return func (w http.ResponseWriter , r * http.Request , user meta.User ) {
323325 // TODO: This is the only place we use AuthorizeUnrestricted. It would be better to use an explicit permission
324326 if user == nil || ! user .AuthorizeUnrestricted () {
325- h .Logger .Info ("Unauthorized request" , zap .String ("user" , user .ID ()), zap .String ("path" , r .URL .Path ))
327+ // Don't panic
328+ id := ""
329+ if user != nil {
330+ id = user .ID ()
331+ }
332+ h .Logger .Info ("Unauthorized request" , zap .String ("user" , id ), zap .String ("path" , r .URL .Path ))
326333 h .httpError (w , "error authorizing admin access" , http .StatusForbidden )
327334 return
328335 }
@@ -442,7 +449,7 @@ type Statistics struct {
442449
443450// Statistics returns statistics for periodic monitoring.
444451func (h * Handler ) Statistics (tags map [string ]string ) []models.Statistic {
445- return []models.Statistic {{
452+ stats := []models.Statistic {{
446453 Name : "httpd" ,
447454 Tags : tags ,
448455 Values : map [string ]interface {}{
@@ -472,6 +479,35 @@ func (h *Handler) Statistics(tags map[string]string) []models.Statistic {
472479 statFluxQueryRequestBytesTransmitted : atomic .LoadInt64 (& h .stats .FluxQueryRequestBytesTransmitted ),
473480 },
474481 }}
482+
483+ // Add per-user query bytes as separate statistics (one per user) if enabled
484+ if h .Config .UserQueryBytesEnabled && ! h .queryBytesPerUser .IsEmpty () {
485+ h .queryBytesPerUser .Range (func (user string , counter * atomic.Int64 ) bool {
486+ userTag := user
487+ if user == "" {
488+ userTag = StatAnonymousUser
489+ }
490+ userTags := models .NewTags (tags ).Merge (map [string ]string {StatUserTagKey : userTag }).Map ()
491+ stats = append (stats , models.Statistic {
492+ Name : "userquerybytes" ,
493+ Tags : userTags ,
494+ Values : map [string ]interface {}{statUserQueryRespBytes : counter .Load ()},
495+ })
496+ return true
497+ })
498+ }
499+
500+ return stats
501+ }
502+
503+ // addQueryBytesForUser atomically adds bytes to the per-user query bytes counter.
504+ // This is a no-op if UserQueryBytesEnabled is false.
505+ func (h * Handler ) addQueryBytesForUser (user string , n int64 ) {
506+ if ! h .Config .UserQueryBytesEnabled {
507+ return
508+ }
509+ counter , _ := h .queryBytesPerUser .LoadOrStore (user , & atomic.Int64 {})
510+ counter .Add (n )
475511}
476512
477513// AddRoutes sets the provided routes on the handler.
@@ -782,6 +818,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U
782818 Results : []* query.Result {r },
783819 })
784820 atomic .AddInt64 (& h .stats .QueryRequestBytesTransmitted , int64 (n ))
821+ h .addQueryBytesForUser (userName , int64 (n ))
785822 w .(http.Flusher ).Flush ()
786823 continue
787824 }
@@ -873,6 +910,7 @@ func (h *Handler) serveQuery(w http.ResponseWriter, r *http.Request, user meta.U
873910 if ! chunked {
874911 n , _ := rw .WriteResponse (resp )
875912 atomic .AddInt64 (& h .stats .QueryRequestBytesTransmitted , int64 (n ))
913+ h .addQueryBytesForUser (userName , int64 (n ))
876914 }
877915}
878916
@@ -2031,6 +2069,11 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
20312069 return
20322070 }
20332071
2072+ var userName string
2073+ if user != nil {
2074+ userName = user .ID ()
2075+ }
2076+
20342077 respond := func (resp * prompb.ReadResponse ) {
20352078 data , err := resp .Marshal ()
20362079 if err != nil {
@@ -2048,6 +2091,7 @@ func (h *Handler) servePromRead(w http.ResponseWriter, r *http.Request, user met
20482091 }
20492092
20502093 atomic .AddInt64 (& h .stats .QueryRequestBytesTransmitted , int64 (len (compressed )))
2094+ h .addQueryBytesForUser (userName , int64 (len (compressed )))
20512095 }
20522096
20532097 ctx := context .Background ()
@@ -2134,6 +2178,11 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me
21342178 atomic .AddInt64 (& h .stats .FluxQueryRequestDuration , time .Since (start ).Nanoseconds ())
21352179 }(time .Now ())
21362180
2181+ var userName string
2182+ if user != nil {
2183+ userName = user .ID ()
2184+ }
2185+
21372186 req , err := decodeQueryRequest (r )
21382187 if err != nil {
21392188 h .httpError (w , err .Error (), http .StatusBadRequest )
@@ -2201,14 +2250,13 @@ func (h *Handler) serveFluxQuery(w http.ResponseWriter, r *http.Request, user me
22012250 defer results .Release ()
22022251
22032252 n , err = encoder .Encode (w , results )
2204- if err != nil {
2205- if n == 0 {
2206- // If the encoder did not write anything, we can write an error header.
2207- h .httpError (w , err .Error (), http .StatusInternalServerError )
2208- } else {
2209- atomic .AddInt64 (& h .stats .FluxQueryRequestBytesTransmitted , int64 (n ))
2210- }
2253+ if err != nil && n == 0 {
2254+ // If the encoder did not write anything, we can write an error header.
2255+ h .httpError (w , err .Error (), http .StatusInternalServerError )
2256+ return
22112257 }
2258+ atomic .AddInt64 (& h .stats .FluxQueryRequestBytesTransmitted , int64 (n ))
2259+ h .addQueryBytesForUser (userName , int64 (n ))
22122260 }
22132261}
22142262
0 commit comments