-
Notifications
You must be signed in to change notification settings - Fork 1.6k
Expand file tree
/
Copy patheval.go
More file actions
2002 lines (1865 loc) · 64.9 KB
/
eval.go
File metadata and controls
2002 lines (1865 loc) · 64.9 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
407
408
409
410
411
412
413
414
415
416
417
418
419
420
421
422
423
424
425
426
427
428
429
430
431
432
433
434
435
436
437
438
439
440
441
442
443
444
445
446
447
448
449
450
451
452
453
454
455
456
457
458
459
460
461
462
463
464
465
466
467
468
469
470
471
472
473
474
475
476
477
478
479
480
481
482
483
484
485
486
487
488
489
490
491
492
493
494
495
496
497
498
499
500
501
502
503
504
505
506
507
508
509
510
511
512
513
514
515
516
517
518
519
520
521
522
523
524
525
526
527
528
529
530
531
532
533
534
535
536
537
538
539
540
541
542
543
544
545
546
547
548
549
550
551
552
553
554
555
556
557
558
559
560
561
562
563
564
565
566
567
568
569
570
571
572
573
574
575
576
577
578
579
580
581
582
583
584
585
586
587
588
589
590
591
592
593
594
595
596
597
598
599
600
601
602
603
604
605
606
607
608
609
610
611
612
613
614
615
616
617
618
619
620
621
622
623
624
625
626
627
628
629
630
631
632
633
634
635
636
637
638
639
640
641
642
643
644
645
646
647
648
649
650
651
652
653
654
655
656
657
658
659
660
661
662
663
664
665
666
667
668
669
670
671
672
673
674
675
676
677
678
679
680
681
682
683
684
685
686
687
688
689
690
691
692
693
694
695
696
697
698
699
700
701
702
703
704
705
706
707
708
709
710
711
712
713
714
715
716
717
718
719
720
721
722
723
724
725
726
727
728
729
730
731
732
733
734
735
736
737
738
739
740
741
742
743
744
745
746
747
748
749
750
751
752
753
754
755
756
757
758
759
760
761
762
763
764
765
766
767
768
769
770
771
772
773
774
775
776
777
778
779
780
781
782
783
784
785
786
787
788
789
790
791
792
793
794
795
796
797
798
799
800
801
802
803
804
805
806
807
808
809
810
811
812
813
814
815
816
817
818
819
820
821
822
823
824
825
826
827
828
829
830
831
832
833
834
835
836
837
838
839
840
841
842
843
844
845
846
847
848
849
850
851
852
853
854
855
856
857
858
859
860
861
862
863
864
865
866
867
868
869
870
871
872
873
874
875
876
877
878
879
880
881
882
883
884
885
886
887
888
889
890
891
892
893
894
895
896
897
898
899
900
901
902
903
904
905
906
907
908
909
910
911
912
913
914
915
916
917
918
919
920
921
922
923
924
925
926
927
928
929
930
931
932
933
934
935
936
937
938
939
940
941
942
943
944
945
946
947
948
949
950
951
952
953
954
955
956
957
958
959
960
961
962
963
964
965
966
967
968
969
970
971
972
973
974
975
976
977
978
979
980
981
982
983
984
985
986
987
988
989
990
991
992
993
994
995
996
997
998
999
1000
package promql
import (
"flag"
"fmt"
"math"
"regexp"
"sort"
"strings"
"sync"
"sync/atomic"
"time"
"unsafe"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutils"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/stringsutil"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
)
var (
disableCache = flag.Bool("search.disableCache", false, "Whether to disable response caching. This may be useful when ingesting historical data. "+
"See https://docs.victoriametrics.com/#backfilling . See also -search.resetRollupResultCacheOnStartup")
maxPointsSubqueryPerTimeseries = flag.Int("search.maxPointsSubqueryPerTimeseries", 100e3, "The maximum number of points per series, which can be generated by subquery. "+
"See https://valyala.medium.com/prometheus-subqueries-in-victoriametrics-9b1492b720b3")
maxMemoryPerQuery = flagutil.NewBytes("search.maxMemoryPerQuery", 0, "The maximum amounts of memory a single query may consume. "+
"Queries requiring more memory are rejected. The total memory limit for concurrently executed queries can be estimated "+
"as -search.maxMemoryPerQuery multiplied by -search.maxConcurrentRequests . "+
"See also -search.logQueryMemoryUsage")
logQueryMemoryUsage = flagutil.NewBytes("search.logQueryMemoryUsage", 0, "Log query and increment vm_memory_intensive_queries_total metric each time "+
"the query requires more memory than specified by this flag. "+
"This may help detecting and optimizing heavy queries. Query logging is disabled by default. "+
"See also -search.logSlowQueryDuration and -search.maxMemoryPerQuery")
noStaleMarkers = flag.Bool("search.noStaleMarkers", false, "Set this flag to true if the database doesn't contain Prometheus stale markers, "+
"so there is no need in spending additional CPU time on its handling. Staleness markers may exist only in data obtained from Prometheus scrape targets")
minWindowForInstantRollupOptimization = flagutil.NewDuration("search.minWindowForInstantRollupOptimization", "3h", "Enable cache-based optimization for repeated queries "+
"to /api/v1/query (aka instant queries), which contain rollup functions with lookbehind window exceeding the given value")
)
// The minimum number of points per timeseries for enabling time rounding.
// This improves cache hit ratio for frequently requested queries over
// big time ranges.
const minTimeseriesPointsForTimeRounding = 50
// ValidateMaxPointsPerSeries validates that the number of points for the given start, end and step do not exceed maxPoints.
func ValidateMaxPointsPerSeries(start, end, step int64, maxPoints int) error {
if step == 0 {
return fmt.Errorf("step can't be equal to zero")
}
points := (end-start)/step + 1
if points > int64(maxPoints) {
return fmt.Errorf("too many points for the given start=%d, end=%d and step=%d: %d; the maximum number of points is %d",
start, end, step, points, maxPoints)
}
return nil
}
// AdjustStartEnd adjusts start and end values, so response caching may be enabled.
//
// See EvalConfig.mayCache() for details.
func AdjustStartEnd(start, end, step int64) (int64, int64) {
if *disableCache {
// Do not adjust start and end values when cache is disabled.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/563
return start, end
}
points := (end-start)/step + 1
if points < minTimeseriesPointsForTimeRounding {
// Too small number of points for rounding.
return start, end
}
// Round start and end to values divisible by step in order
// to enable response caching (see EvalConfig.mayCache).
start, end = alignStartEnd(start, end, step)
// Make sure that the new number of points is the same as the initial number of points.
newPoints := (end-start)/step + 1
for newPoints > points {
end -= step
newPoints--
}
return start, end
}
func alignStartEnd(start, end, step int64) (int64, int64) {
// Round start to the nearest smaller value divisible by step.
start -= start % step
// Round end to the nearest bigger value divisible by step.
adjust := end % step
if adjust > 0 {
end += step - adjust
}
return start, end
}
// EvalConfig is the configuration required for query evaluation via Exec
type EvalConfig struct {
Start int64
End int64
Step int64
// MaxSeries is the maximum number of time series, which can be scanned by the query.
// Zero means 'no limit'
MaxSeries int
// MaxPointsPerSeries is the limit on the number of points, which can be generated per each returned time series.
MaxPointsPerSeries int
// QuotedRemoteAddr contains quoted remote address.
QuotedRemoteAddr string
Deadline searchutils.Deadline
// Whether the response can be cached.
MayCache bool
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
LookbackDelta int64
// How many decimal digits after the point to leave in response.
RoundDigits int
// EnforcedTagFilterss may contain additional label filters to use in the query.
EnforcedTagFilterss [][]storage.TagFilter
// The callback, which returns the request URI during logging.
// The request URI isn't stored here because its' construction may take non-trivial amounts of CPU.
GetRequestURI func() string
// QueryStats contains various stats for the currently executed query.
//
// The caller must initialize QueryStats, otherwise it isn't collected.
QueryStats *QueryStats
timestamps []int64
timestampsOnce sync.Once
}
// copyEvalConfig returns src copy.
func copyEvalConfig(src *EvalConfig) *EvalConfig {
var ec EvalConfig
ec.Start = src.Start
ec.End = src.End
ec.Step = src.Step
ec.MaxSeries = src.MaxSeries
ec.MaxPointsPerSeries = src.MaxPointsPerSeries
ec.Deadline = src.Deadline
ec.MayCache = src.MayCache
ec.LookbackDelta = src.LookbackDelta
ec.RoundDigits = src.RoundDigits
ec.EnforcedTagFilterss = src.EnforcedTagFilterss
ec.GetRequestURI = src.GetRequestURI
ec.QueryStats = src.QueryStats
// do not copy src.timestamps - they must be generated again.
return &ec
}
// QueryStats contains various stats for the query.
type QueryStats struct {
// SeriesFetched contains the number of series fetched from storage during the query evaluation.
SeriesFetched atomic.Int64
// ExecutionTimeMsec contains the number of milliseconds the query took to execute.
ExecutionTimeMsec atomic.Int64
}
func (qs *QueryStats) addSeriesFetched(n int) {
if qs == nil {
return
}
qs.SeriesFetched.Add(int64(n))
}
func (qs *QueryStats) addExecutionTimeMsec(startTime time.Time) {
if qs == nil {
return
}
d := time.Since(startTime).Milliseconds()
qs.ExecutionTimeMsec.Add(d)
}
func (ec *EvalConfig) validate() {
if ec.Start > ec.End {
logger.Panicf("BUG: start cannot exceed end; got %d vs %d", ec.Start, ec.End)
}
if ec.Step <= 0 {
logger.Panicf("BUG: step must be greater than 0; got %d", ec.Step)
}
}
func (ec *EvalConfig) mayCache() bool {
if *disableCache {
return false
}
if !ec.MayCache {
return false
}
if ec.Start == ec.End {
// There is no need in aligning start and end to step for instant query
// in order to cache its results.
return true
}
if ec.Start%ec.Step != 0 {
return false
}
if ec.End%ec.Step != 0 {
return false
}
return true
}
func (ec *EvalConfig) timeRangeString() string {
start := storage.TimestampToHumanReadableFormat(ec.Start)
end := storage.TimestampToHumanReadableFormat(ec.End)
return fmt.Sprintf("[%s..%s]", start, end)
}
func (ec *EvalConfig) getSharedTimestamps() []int64 {
ec.timestampsOnce.Do(ec.timestampsInit)
return ec.timestamps
}
func (ec *EvalConfig) timestampsInit() {
ec.timestamps = getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
}
func getTimestamps(start, end, step int64, maxPointsPerSeries int) []int64 {
// Sanity checks.
if step <= 0 {
logger.Panicf("BUG: Step must be bigger than 0; got %d", step)
}
if start > end {
logger.Panicf("BUG: Start cannot exceed End; got %d vs %d", start, end)
}
if err := ValidateMaxPointsPerSeries(start, end, step, maxPointsPerSeries); err != nil {
logger.Panicf("BUG: %s; this must be validated before the call to getTimestamps", err)
}
// Prepare timestamps.
points := 1 + (end-start)/step
timestamps := make([]int64, points)
for i := range timestamps {
timestamps[i] = start
start += step
}
return timestamps
}
func evalExpr(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
if qt.Enabled() {
query := string(e.AppendString(nil))
query = stringsutil.LimitStringLen(query, 300)
mayCache := ec.mayCache()
qt = qt.NewChild("eval: query=%s, timeRange=%s, step=%d, mayCache=%v", query, ec.timeRangeString(), ec.Step, mayCache)
}
rv, err := evalExprInternal(qt, ec, e)
if err != nil {
return nil, err
}
if qt.Enabled() {
seriesCount := len(rv)
pointsPerSeries := 0
if len(rv) > 0 {
pointsPerSeries = len(rv[0].Timestamps)
}
pointsCount := seriesCount * pointsPerSeries
qt.Donef("series=%d, points=%d, pointsPerSeries=%d", seriesCount, pointsCount, pointsPerSeries)
}
return rv, nil
}
func evalExprInternal(qt *querytracer.Tracer, ec *EvalConfig, e metricsql.Expr) ([]*timeseries, error) {
if me, ok := e.(*metricsql.MetricExpr); ok {
re := &metricsql.RollupExpr{
Expr: me,
}
rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, me.AppendString(nil), err)
}
return rv, nil
}
if re, ok := e.(*metricsql.RollupExpr); ok {
rv, err := evalRollupFunc(qt, ec, "default_rollup", rollupDefault, e, re, nil)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, re.AppendString(nil), err)
}
return rv, nil
}
if fe, ok := e.(*metricsql.FuncExpr); ok {
nrf := getRollupFunc(fe.Name)
if nrf == nil {
qtChild := qt.NewChild("transform %s()", fe.Name)
rv, err := evalTransformFunc(qtChild, ec, fe)
qtChild.Donef("series=%d", len(rv))
return rv, err
}
args, re, err := evalRollupFuncArgs(qt, ec, fe)
if err != nil {
return nil, err
}
rf, err := nrf(args)
if err != nil {
return nil, fmt.Errorf("cannot evaluate args for %q: %w", fe.AppendString(nil), err)
}
rv, err := evalRollupFunc(qt, ec, fe.Name, rf, e, re, nil)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err)
}
return rv, nil
}
if ae, ok := e.(*metricsql.AggrFuncExpr); ok {
qtChild := qt.NewChild("aggregate %s()", ae.Name)
rv, err := evalAggrFunc(qtChild, ec, ae)
qtChild.Donef("series=%d", len(rv))
return rv, err
}
if be, ok := e.(*metricsql.BinaryOpExpr); ok {
qtChild := qt.NewChild("binary op %q", be.Op)
rv, err := evalBinaryOp(qtChild, ec, be)
qtChild.Donef("series=%d", len(rv))
return rv, err
}
if ne, ok := e.(*metricsql.NumberExpr); ok {
rv := evalNumber(ec, ne.N)
return rv, nil
}
if se, ok := e.(*metricsql.StringExpr); ok {
rv := evalString(ec, se.S)
return rv, nil
}
if de, ok := e.(*metricsql.DurationExpr); ok {
d := de.Duration(ec.Step)
dSec := float64(d) / 1000
rv := evalNumber(ec, dSec)
return rv, nil
}
return nil, fmt.Errorf("unexpected expression %q", e.AppendString(nil))
}
func evalTransformFunc(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]*timeseries, error) {
tf := getTransformFunc(fe.Name)
if tf == nil {
return nil, &UserReadableError{
Err: fmt.Errorf(`unknown func %q`, fe.Name),
}
}
var args [][]*timeseries
var err error
switch fe.Name {
case "", "union":
args, err = evalExprsInParallel(qt, ec, fe.Args)
default:
args, err = evalExprsSequentially(qt, ec, fe.Args)
}
if err != nil {
return nil, err
}
tfa := &transformFuncArg{
ec: ec,
fe: fe,
args: args,
}
rv, err := tf(tfa)
if err != nil {
return nil, &UserReadableError{
Err: fmt.Errorf(`cannot evaluate %q: %w`, fe.AppendString(nil), err),
}
}
return rv, nil
}
func evalAggrFunc(qt *querytracer.Tracer, ec *EvalConfig, ae *metricsql.AggrFuncExpr) ([]*timeseries, error) {
if callbacks := getIncrementalAggrFuncCallbacks(ae.Name); callbacks != nil {
fe, nrf := tryGetArgRollupFuncWithMetricExpr(ae)
if fe != nil {
// There is an optimized path for calculating metricsql.AggrFuncExpr over rollupFunc over metricsql.MetricExpr.
// The optimized path saves RAM for aggregates over big number of time series.
args, re, err := evalRollupFuncArgs(qt, ec, fe)
if err != nil {
return nil, err
}
rf, err := nrf(args)
if err != nil {
return nil, fmt.Errorf("cannot evaluate args for aggregate func %q: %w", ae.AppendString(nil), err)
}
iafc := newIncrementalAggrFuncContext(ae, callbacks)
return evalRollupFunc(qt, ec, fe.Name, rf, ae, re, iafc)
}
}
args, err := evalExprsInParallel(qt, ec, ae.Args)
if err != nil {
return nil, err
}
af := getAggrFunc(ae.Name)
if af == nil {
return nil, &UserReadableError{
Err: fmt.Errorf(`unknown func %q`, ae.Name),
}
}
afa := &aggrFuncArg{
ae: ae,
args: args,
ec: ec,
}
qtChild := qt.NewChild("eval %s", ae.Name)
rv, err := af(afa)
qtChild.Done()
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, ae.AppendString(nil), err)
}
return rv, nil
}
func evalBinaryOp(qt *querytracer.Tracer, ec *EvalConfig, be *metricsql.BinaryOpExpr) ([]*timeseries, error) {
bf := getBinaryOpFunc(be.Op)
if bf == nil {
return nil, fmt.Errorf(`unknown binary op %q`, be.Op)
}
var err error
var tssLeft, tssRight []*timeseries
switch strings.ToLower(be.Op) {
case "and", "if":
// Fetch right-side series at first, since it usually contains
// lower number of time series for `and` and `if` operator.
// This should produce more specific label filters for the left side of the query.
// This, in turn, should reduce the time to select series for the left side of the query.
tssRight, tssLeft, err = execBinaryOpArgs(qt, ec, be.Right, be.Left, be)
default:
tssLeft, tssRight, err = execBinaryOpArgs(qt, ec, be.Left, be.Right, be)
}
if err != nil {
return nil, fmt.Errorf("cannot execute %q: %w", be.AppendString(nil), err)
}
bfa := &binaryOpFuncArg{
be: be,
left: tssLeft,
right: tssRight,
}
rv, err := bf(bfa)
if err != nil {
return nil, fmt.Errorf(`cannot evaluate %q: %w`, be.AppendString(nil), err)
}
return rv, nil
}
func canPushdownCommonFilters(be *metricsql.BinaryOpExpr) bool {
switch strings.ToLower(be.Op) {
case "or", "default":
return false
}
if isAggrFuncWithoutGrouping(be.Left) || isAggrFuncWithoutGrouping(be.Right) {
return false
}
return true
}
func isAggrFuncWithoutGrouping(e metricsql.Expr) bool {
afe, ok := e.(*metricsql.AggrFuncExpr)
if !ok {
return false
}
return len(afe.Modifier.Args) == 0
}
func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
if !canPushdownCommonFilters(be) {
// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
// from exprFirst to exprSecond.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
qt = qt.NewChild("execute left and right sides of %q in parallel", be.Op)
defer qt.Done()
var wg sync.WaitGroup
var tssFirst []*timeseries
var errFirst error
qtFirst := qt.NewChild("expr1")
wg.Add(1)
go func() {
defer wg.Done()
tssFirst, errFirst = evalExpr(qtFirst, ec, exprFirst)
qtFirst.Done()
}()
var tssSecond []*timeseries
var errSecond error
qtSecond := qt.NewChild("expr2")
wg.Add(1)
go func() {
defer wg.Done()
tssSecond, errSecond = evalExpr(qtSecond, ec, exprSecond)
qtSecond.Done()
}()
wg.Wait()
if errFirst != nil {
return nil, nil, errFirst
}
if errSecond != nil {
return nil, nil, errSecond
}
return tssFirst, tssSecond, nil
}
// Execute binary operation in the following way:
//
// 1) execute the exprFirst
// 2) get common label filters for series returned at step 1
// 3) push down the found common label filters to exprSecond. This filters out unneeded series
// during exprSecond execution instead of spending compute resources on extracting and processing these series
// before they are dropped later when matching time series according to https://prometheus.io/docs/prometheus/latest/querying/operators/#vector-matching
// 4) execute the exprSecond with possible additional filters found at step 3
//
// Typical use cases:
// - Kubernetes-related: show pod creation time with the node name:
//
// kube_pod_created{namespace="prod"} * on (uid) group_left(node) kube_pod_info
//
// Without the optimization `kube_pod_info` would select and spend compute resources
// for more time series than needed. The selected time series would be dropped later
// when matching time series on the right and left sides of binary operand.
//
// - Generic alerting queries, which rely on `info` metrics.
// See https://grafana.com/blog/2021/08/04/how-to-use-promql-joins-for-more-effective-queries-of-prometheus-metrics-at-scale/
//
// - Queries, which get additional labels from `info` metrics.
// See https://www.robustperception.io/exposing-the-software-version-to-prometheus
tssFirst, err := evalExpr(qt, ec, exprFirst)
if err != nil {
return nil, nil, err
}
if len(tssFirst) == 0 && !strings.EqualFold(be.Op, "or") {
// Fast path: there is no sense in executing the exprSecond when exprFirst returns an empty result,
// since the "exprFirst op exprSecond" would return an empty result in any case.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3349
return nil, nil, nil
}
lfs := getCommonLabelFilters(tssFirst)
lfs = metricsql.TrimFiltersByGroupModifier(lfs, be)
exprSecond = metricsql.PushdownBinaryOpFilters(exprSecond, lfs)
tssSecond, err := evalExpr(qt, ec, exprSecond)
if err != nil {
return nil, nil, err
}
return tssFirst, tssSecond, nil
}
func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
if len(tss) == 0 {
return nil
}
type valuesCounter struct {
values map[string]struct{}
count int
}
m := make(map[string]*valuesCounter, len(tss[0].MetricName.Tags))
for _, ts := range tss {
for _, tag := range ts.MetricName.Tags {
vc, ok := m[string(tag.Key)]
if !ok {
k := string(tag.Key)
v := string(tag.Value)
m[k] = &valuesCounter{
values: map[string]struct{}{
v: {},
},
count: 1,
}
continue
}
if len(vc.values) > 100 {
// Too many unique values found for the given tag.
// Do not make a filter on such values, since it may slow down
// search for matching time series.
continue
}
vc.count++
if _, ok := vc.values[string(tag.Value)]; !ok {
vc.values[string(tag.Value)] = struct{}{}
}
}
}
lfs := make([]metricsql.LabelFilter, 0, len(m))
var values []string
for k, vc := range m {
if vc.count != len(tss) {
// Skip the tag, since it doesn't belong to all the time series.
continue
}
values = values[:0]
for s := range vc.values {
values = append(values, s)
}
lf := metricsql.LabelFilter{
Label: k,
}
if len(values) == 1 {
lf.Value = values[0]
} else {
sort.Strings(values)
lf.Value = joinRegexpValues(values)
lf.IsRegexp = true
}
lfs = append(lfs, lf)
}
sort.Slice(lfs, func(i, j int) bool {
return lfs[i].Label < lfs[j].Label
})
return lfs
}
func joinRegexpValues(a []string) string {
var b []byte
for i, s := range a {
sQuoted := regexp.QuoteMeta(s)
b = append(b, sQuoted...)
if i < len(a)-1 {
b = append(b, '|')
}
}
return string(b)
}
func tryGetArgRollupFuncWithMetricExpr(ae *metricsql.AggrFuncExpr) (*metricsql.FuncExpr, newRollupFunc) {
if len(ae.Args) != 1 {
return nil, nil
}
e := ae.Args[0]
// Make sure e contains one of the following:
// - metricExpr
// - metricExpr[d]
// - rollupFunc(metricExpr)
// - rollupFunc(metricExpr[d])
if me, ok := e.(*metricsql.MetricExpr); ok {
// e = metricExpr
if me.IsEmpty() {
return nil, nil
}
fe := &metricsql.FuncExpr{
Name: "default_rollup",
Args: []metricsql.Expr{me},
}
nrf := getRollupFunc(fe.Name)
return fe, nrf
}
if re, ok := e.(*metricsql.RollupExpr); ok {
if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
return nil, nil
}
// e = metricExpr[d]
fe := &metricsql.FuncExpr{
Name: "default_rollup",
Args: []metricsql.Expr{re},
}
nrf := getRollupFunc(fe.Name)
return fe, nrf
}
fe, ok := e.(*metricsql.FuncExpr)
if !ok {
return nil, nil
}
nrf := getRollupFunc(fe.Name)
if nrf == nil {
return nil, nil
}
rollupArgIdx := metricsql.GetRollupArgIdx(fe)
if rollupArgIdx >= len(fe.Args) {
// Incorrect number of args for rollup func.
return nil, nil
}
arg := fe.Args[rollupArgIdx]
if me, ok := arg.(*metricsql.MetricExpr); ok {
if me.IsEmpty() {
return nil, nil
}
// e = rollupFunc(metricExpr)
return fe, nrf
}
if re, ok := arg.(*metricsql.RollupExpr); ok {
if me, ok := re.Expr.(*metricsql.MetricExpr); !ok || me.IsEmpty() || re.ForSubquery() {
return nil, nil
}
// e = rollupFunc(metricExpr[d])
return fe, nrf
}
return nil, nil
}
func evalExprsSequentially(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
var rvs [][]*timeseries
for _, e := range es {
rv, err := evalExpr(qt, ec, e)
if err != nil {
return nil, err
}
rvs = append(rvs, rv)
}
return rvs, nil
}
func evalExprsInParallel(qt *querytracer.Tracer, ec *EvalConfig, es []metricsql.Expr) ([][]*timeseries, error) {
if len(es) < 2 {
return evalExprsSequentially(qt, ec, es)
}
rvs := make([][]*timeseries, len(es))
errs := make([]error, len(es))
qt.Printf("eval function args in parallel")
var wg sync.WaitGroup
for i, e := range es {
wg.Add(1)
qtChild := qt.NewChild("eval arg %d", i)
go func(e metricsql.Expr, i int) {
defer func() {
qtChild.Done()
wg.Done()
}()
rv, err := evalExpr(qtChild, ec, e)
rvs[i] = rv
errs[i] = err
}(e, i)
}
wg.Wait()
for _, err := range errs {
if err != nil {
return nil, err
}
}
return rvs, nil
}
func evalRollupFuncArgs(qt *querytracer.Tracer, ec *EvalConfig, fe *metricsql.FuncExpr) ([]interface{}, *metricsql.RollupExpr, error) {
var re *metricsql.RollupExpr
rollupArgIdx := metricsql.GetRollupArgIdx(fe)
if len(fe.Args) <= rollupArgIdx {
return nil, nil, fmt.Errorf("expecting at least %d args to %q; got %d args; expr: %q", rollupArgIdx+1, fe.Name, len(fe.Args), fe.AppendString(nil))
}
args := make([]interface{}, len(fe.Args))
for i, arg := range fe.Args {
if i == rollupArgIdx {
re = getRollupExprArg(arg)
args[i] = re
continue
}
ts, err := evalExpr(qt, ec, arg)
if err != nil {
return nil, nil, fmt.Errorf("cannot evaluate arg #%d for %q: %w", i+1, fe.AppendString(nil), err)
}
args[i] = ts
}
return args, re, nil
}
func getRollupExprArg(arg metricsql.Expr) *metricsql.RollupExpr {
re, ok := arg.(*metricsql.RollupExpr)
if !ok {
// Wrap non-rollup arg into metricsql.RollupExpr.
return &metricsql.RollupExpr{
Expr: arg,
}
}
if !re.ForSubquery() {
// Return standard rollup if it doesn't contain subquery.
return re
}
me, ok := re.Expr.(*metricsql.MetricExpr)
if !ok {
// arg contains subquery.
return re
}
// Convert me[w:step] -> default_rollup(me)[w:step]
reNew := *re
reNew.Expr = &metricsql.FuncExpr{
Name: "default_rollup",
Args: []metricsql.Expr{
&metricsql.RollupExpr{Expr: me},
},
}
return &reNew
}
// expr may contain:
// - rollupFunc(m) if iafc is nil
// - aggrFunc(rollupFunc(m)) if iafc isn't nil
func evalRollupFunc(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr,
re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
if re.At == nil {
return evalRollupFuncWithoutAt(qt, ec, funcName, rf, expr, re, iafc)
}
tssAt, err := evalExpr(qt, ec, re.At)
if err != nil {
return nil, &UserReadableError{
Err: fmt.Errorf("cannot evaluate `@` modifier: %w", err),
}
}
if len(tssAt) != 1 {
return nil, &UserReadableError{
Err: fmt.Errorf("`@` modifier must return a single series; it returns %d series instead", len(tssAt)),
}
}
atTimestamp := int64(tssAt[0].Values[0] * 1000)
ecNew := copyEvalConfig(ec)
ecNew.Start = atTimestamp
ecNew.End = atTimestamp
tss, err := evalRollupFuncWithoutAt(qt, ecNew, funcName, rf, expr, re, iafc)
if err != nil {
return nil, err
}
// expand single-point tss to the original time range.
timestamps := ec.getSharedTimestamps()
for _, ts := range tss {
v := ts.Values[0]
values := make([]float64, len(timestamps))
for i := range timestamps {
values[i] = v
}
ts.Timestamps = timestamps
ts.Values = values
}
return tss, nil
}
func evalRollupFuncWithoutAt(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc,
expr metricsql.Expr, re *metricsql.RollupExpr, iafc *incrementalAggrFuncContext) ([]*timeseries, error) {
funcName = strings.ToLower(funcName)
ecNew := ec
var offset int64
if re.Offset != nil {
offset = re.Offset.Duration(ec.Step)
ecNew = copyEvalConfig(ecNew)
ecNew.Start -= offset
ecNew.End -= offset
// There is no need in calling AdjustStartEnd() on ecNew if ecNew.MayCache is set to true,
// since the time range alignment has been already performed by the caller,
// so cache hit rate should be quite good.
// See also https://github.com/VictoriaMetrics/VictoriaMetrics/issues/976
}
if funcName == "rollup_candlestick" {
// Automatically apply `offset -step` to `rollup_candlestick` function
// in order to obtain expected OHLC results.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/309#issuecomment-582113462
step := ecNew.Step
ecNew = copyEvalConfig(ecNew)
ecNew.Start += step
ecNew.End += step
offset -= step
}
var rvs []*timeseries
var err error
if me, ok := re.Expr.(*metricsql.MetricExpr); ok {
rvs, err = evalRollupFuncWithMetricExpr(qt, ecNew, funcName, rf, expr, me, iafc, re.Window)
} else {
if iafc != nil {
logger.Panicf("BUG: iafc must be nil for rollup %q over subquery %q", funcName, re.AppendString(nil))
}
rvs, err = evalRollupFuncWithSubquery(qt, ecNew, funcName, rf, expr, re)
}
if err != nil {
return nil, &UserReadableError{
Err: err,
}
}
if funcName == "absent_over_time" {
rvs = aggregateAbsentOverTime(ecNew, re.Expr, rvs)
}
if offset != 0 && len(rvs) > 0 {
// Make a copy of timestamps, since they may be used in other values.
srcTimestamps := rvs[0].Timestamps
dstTimestamps := append([]int64{}, srcTimestamps...)
for i := range dstTimestamps {
dstTimestamps[i] += offset
}
for _, ts := range rvs {
ts.Timestamps = dstTimestamps
}
}
return rvs, nil
}
// aggregateAbsentOverTime collapses tss to a single time series with 1 and nan values.
//
// Values for returned series are set to nan if at least a single tss series contains nan at that point.
// This means that tss contains a series with non-empty results at that point.
// This follows Prometheus logic - see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2130
func aggregateAbsentOverTime(ec *EvalConfig, expr metricsql.Expr, tss []*timeseries) []*timeseries {
rvs := getAbsentTimeseries(ec, expr)
if len(tss) == 0 {
return rvs
}
for i := range tss[0].Values {
for _, ts := range tss {
if math.IsNaN(ts.Values[i]) {
rvs[0].Values[i] = nan
break
}
}
}
return rvs
}
func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName string, rf rollupFunc, expr metricsql.Expr, re *metricsql.RollupExpr) ([]*timeseries, error) {
// TODO: determine whether to use rollupResultCacheV here.
qt = qt.NewChild("subquery")
defer qt.Done()
step, err := re.Step.NonNegativeDuration(ec.Step)
if err != nil {
return nil, fmt.Errorf("cannot parse step in square brackets at %s: %w", expr.AppendString(nil), err)
}
if step == 0 {
step = ec.Step
}
window, err := re.Window.NonNegativeDuration(ec.Step)
if err != nil {
return nil, fmt.Errorf("cannot parse lookbehind window in square brackets at %s: %w", expr.AppendString(nil), err)
}
ecSQ := copyEvalConfig(ec)
ecSQ.Start -= window + step + maxSilenceInterval()
ecSQ.End += step
ecSQ.Step = step
ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries
if err := ValidateMaxPointsPerSeries(ecSQ.Start, ecSQ.End, ecSQ.Step, ecSQ.MaxPointsPerSeries); err != nil {
return nil, fmt.Errorf("%w; (see -search.maxPointsSubqueryPerTimeseries command-line flag)", err)
}
// unconditionally align start and end args to step for subquery as Prometheus does.
ecSQ.Start, ecSQ.End = alignStartEnd(ecSQ.Start, ecSQ.End, ecSQ.Step)
tssSQ, err := evalExpr(qt, ecSQ, re.Expr)
if err != nil {
return nil, err
}
if len(tssSQ) == 0 {
return nil, nil
}
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
if err != nil {
return nil, err
}
var samplesScannedTotal atomic.Uint64
keepMetricNames := getKeepMetricNames(expr)
tsw := getTimeseriesByWorkerID()
seriesByWorkerID := tsw.byWorkerID
doParallel(tssSQ, func(tsSQ *timeseries, values []float64, timestamps []int64, workerID uint) ([]float64, []int64) {
values, timestamps = removeNanValues(values[:0], timestamps[:0], tsSQ.Values, tsSQ.Timestamps)
preFunc(values, timestamps)
for _, rc := range rcs {
if tsm := newTimeseriesMap(funcName, keepMetricNames, sharedTimestamps, &tsSQ.MetricName); tsm != nil {
samplesScanned := rc.DoTimeseriesMap(tsm, values, timestamps)
samplesScannedTotal.Add(samplesScanned)
seriesByWorkerID[workerID].tss = tsm.AppendTimeseriesTo(seriesByWorkerID[workerID].tss)
continue
}
var ts timeseries
samplesScanned := doRollupForTimeseries(funcName, keepMetricNames, rc, &ts, &tsSQ.MetricName, values, timestamps, sharedTimestamps)
samplesScannedTotal.Add(samplesScanned)
seriesByWorkerID[workerID].tss = append(seriesByWorkerID[workerID].tss, &ts)
}
return values, timestamps
})
tss := make([]*timeseries, 0, len(tssSQ)*len(rcs))
for i := range seriesByWorkerID {
tss = append(tss, seriesByWorkerID[i].tss...)
}
putTimeseriesByWorkerID(tsw)
rowsScannedPerQuery.Update(float64(samplesScannedTotal.Load()))
qt.Printf("rollup %s() over %d series returned by subquery: series=%d, samplesScanned=%d", funcName, len(tssSQ), len(tss), samplesScannedTotal.Load())
return tss, nil
}
var rowsScannedPerQuery = metrics.NewHistogram(`vm_rows_scanned_per_query`)
func getKeepMetricNames(expr metricsql.Expr) bool {
if ae, ok := expr.(*metricsql.AggrFuncExpr); ok {
// Extract rollupFunc(...) from aggrFunc(rollupFunc(...)).
// This case is possible when optimized aggrFunc calculations are used
// such as `sum(rate(...))`
if len(ae.Args) != 1 {
return false
}
expr = ae.Args[0]
}
if fe, ok := expr.(*metricsql.FuncExpr); ok {
return fe.KeepMetricNames
}
return false