mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
Compare commits
8 Commits
feature/bu
...
query-debu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
879e444058 | ||
|
|
4d501b20fd | ||
|
|
65fa35dfdf | ||
|
|
3768919413 | ||
|
|
c65dc7b15a | ||
|
|
a762889e45 | ||
|
|
3220760480 | ||
|
|
6ae8855e29 |
@@ -397,6 +397,13 @@ func selectHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW
|
||||
return true
|
||||
}
|
||||
return true
|
||||
case "prometheus/api/v1/config":
|
||||
httpserver.EnableCORS(w, r)
|
||||
if err := prometheus.ConfigHandler(qt, startTime, w, r); err != nil {
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
case "prometheus/api/v1/export":
|
||||
exportRequests.Inc()
|
||||
if err := prometheus.ExportHandler(startTime, at, w, r); err != nil {
|
||||
@@ -731,6 +738,13 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
|
||||
expandWithExprsRequests.Inc()
|
||||
prometheus.ExpandWithExprs(w, r)
|
||||
return true
|
||||
case "prometheus/extract-metric-exprs", "extract-metric-exprs":
|
||||
startTime := time.Now()
|
||||
if err := prometheus.ExtractMetricExprsHandler(startTime, w, r); err != nil {
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
case "prometheus/prettify-query", "prettify-query":
|
||||
prettifyQueryRequests.Inc()
|
||||
prometheus.PrettifyQuery(w, r)
|
||||
|
||||
@@ -88,10 +88,18 @@ type Results struct {
|
||||
tbfs []*tmpBlocksFile
|
||||
|
||||
packedTimeseries []packedTimeseries
|
||||
|
||||
// the result is simulated
|
||||
isSimulated bool
|
||||
simulatedSeries []*storage.SimulatedSamples
|
||||
}
|
||||
|
||||
// Len returns the number of results in rss.
|
||||
func (rss *Results) Len() int {
|
||||
if rss.isSimulated {
|
||||
return len(rss.simulatedSeries)
|
||||
}
|
||||
|
||||
return len(rss.packedTimeseries)
|
||||
}
|
||||
|
||||
@@ -247,6 +255,10 @@ var defaultMaxWorkersPerQuery = func() int {
|
||||
//
|
||||
// rss becomes unusable after the call to RunParallel.
|
||||
func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) error {
|
||||
if rss.isSimulated {
|
||||
return rss.runParallelSimulated(qt, f)
|
||||
}
|
||||
|
||||
qt = qt.NewChild("parallel process of fetched data")
|
||||
defer rss.closeTmpBlockFiles()
|
||||
|
||||
@@ -262,6 +274,94 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
|
||||
return err
|
||||
}
|
||||
|
||||
func (rss *Results) runParallelSimulated(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) error {
|
||||
qt = qt.NewChild("parallel process of fetched data")
|
||||
|
||||
cb := f
|
||||
if rss.shouldConvertTenantToLabels {
|
||||
cb = func(rs *Result, workerID uint) error {
|
||||
metricNameTenantToTags(&rs.MetricName)
|
||||
return f(rs, workerID)
|
||||
}
|
||||
}
|
||||
|
||||
tmpResult := getTmpResult()
|
||||
defer putTmpResult(tmpResult)
|
||||
|
||||
// For simplicity, let's process serially first. Parallelization can be added if needed.
|
||||
// If parallelization is desired, it would mirror the worker pool logic of the original runParallel,
|
||||
// but iterating over rss.simulatedSamples entries.
|
||||
workerID := uint(0)
|
||||
var firstErr error
|
||||
for _, metric := range rss.simulatedSeries {
|
||||
r := &tmpResult.rs
|
||||
r.reset()
|
||||
r.MetricName.CopyFrom(&metric.Name)
|
||||
for i, ts := range metric.Timestamps {
|
||||
if ts >= rss.tr.MinTimestamp && ts <= rss.tr.MaxTimestamp {
|
||||
r.Values = append(r.Values, metric.Value[i])
|
||||
r.Timestamps = append(r.Timestamps, ts)
|
||||
}
|
||||
}
|
||||
|
||||
// Sort timestamps chronologically to match real storage behavior.
|
||||
// Real storage ensures chronological order through:
|
||||
// 1. Block-level sorting by MinTimestamp
|
||||
// 2. Within-block timestamp ordering via encoding.EnsureNonDecreasingSequence()
|
||||
if len(r.Timestamps) > 1 {
|
||||
// Create pairs for sorting
|
||||
type timestampValue struct {
|
||||
timestamp int64
|
||||
value float64
|
||||
}
|
||||
pairs := make([]timestampValue, len(r.Timestamps))
|
||||
for i := range r.Timestamps {
|
||||
pairs[i] = timestampValue{
|
||||
timestamp: r.Timestamps[i],
|
||||
value: r.Values[i],
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by timestamp
|
||||
sort.Slice(pairs, func(i, j int) bool {
|
||||
return pairs[i].timestamp < pairs[j].timestamp
|
||||
})
|
||||
|
||||
// Extract back to separate slices
|
||||
for i := range pairs {
|
||||
r.Timestamps[i] = pairs[i].timestamp
|
||||
r.Values[i] = pairs[i].value
|
||||
}
|
||||
}
|
||||
|
||||
// The input from the client is most likely already deduplicated, since it's emitted by
|
||||
// vmselect. However, the client may modify the input instead of using the returned one.
|
||||
dedupInterval := storage.GetDedupInterval()
|
||||
if dedupInterval > 0 && len(r.Timestamps) > 0 {
|
||||
r.Timestamps, r.Values = storage.DeduplicateSamples(r.Timestamps, r.Values, dedupInterval)
|
||||
}
|
||||
|
||||
rowProcessed := len(r.Timestamps)
|
||||
|
||||
if rowProcessed > 0 {
|
||||
err := cb(r, workerID)
|
||||
if err != nil {
|
||||
firstErr = err
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Count total samples across all series
|
||||
totalSamples := 0
|
||||
for _, metric := range rss.simulatedSeries {
|
||||
totalSamples += len(metric.Timestamps)
|
||||
}
|
||||
qt.Donef("series=%d, samples=%d", len(rss.simulatedSeries), totalSamples)
|
||||
|
||||
return firstErr
|
||||
}
|
||||
|
||||
func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) (int, error) {
|
||||
tswsLen := len(rss.packedTimeseries)
|
||||
if tswsLen == 0 {
|
||||
@@ -1798,6 +1898,10 @@ func (e limitExceededErr) Error() string { return e.err.Error() }
|
||||
//
|
||||
// Results.RunParallel or Results.Cancel must be called on the returned Results.
|
||||
func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutil.Deadline) (*Results, bool, error) {
|
||||
if len(sq.SimulatedSeries) > 0 {
|
||||
return processSearchSimulated(qt, sq, deadline)
|
||||
}
|
||||
|
||||
qt = qt.NewChild("fetch matching series: %s", sq)
|
||||
defer qt.Done()
|
||||
if deadline.Exceeded() {
|
||||
@@ -1862,6 +1966,41 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
|
||||
return &rss, isPartial, nil
|
||||
}
|
||||
|
||||
func processSearchSimulated(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutil.Deadline) (*Results, bool, error) {
|
||||
qt = qt.NewChild("fetch matching series (simulated): %s", sq)
|
||||
defer qt.Done()
|
||||
if deadline.Exceeded() {
|
||||
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||
}
|
||||
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: sq.MinTimestamp,
|
||||
MaxTimestamp: sq.MaxTimestamp,
|
||||
}
|
||||
|
||||
// Process simulated samples.
|
||||
matchedSamples, err := storage.MatchSimulatedSamples(sq.TenantTokens[0].AccountID, sq.TenantTokens[0].ProjectID, sq.SimulatedSeries, sq.TagFilterss)
|
||||
if err != nil {
|
||||
return nil, false, fmt.Errorf("cannot match simulated samples: %w", err)
|
||||
}
|
||||
|
||||
// Create a result set similar to ProcessSearchQuery
|
||||
rss := &Results{
|
||||
tr: tr,
|
||||
deadline: deadline,
|
||||
isSimulated: true,
|
||||
simulatedSeries: matchedSamples,
|
||||
}
|
||||
|
||||
if len(matchedSamples) == 0 {
|
||||
qt.Printf("no matching series found")
|
||||
} else {
|
||||
qt.Printf("found %d series", len(rss.simulatedSeries))
|
||||
}
|
||||
|
||||
return rss, false, nil
|
||||
}
|
||||
|
||||
// ProcessBlocks calls processBlock per each block matching the given sq.
|
||||
func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery,
|
||||
processBlock func(mb *storage.MetricBlock, workerID uint) error, deadline searchutil.Deadline,
|
||||
|
||||
20
app/vmselect/prometheus/config_response.qtpl
Normal file
20
app/vmselect/prometheus/config_response.qtpl
Normal file
@@ -0,0 +1,20 @@
|
||||
{% import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
) %}
|
||||
|
||||
{% stripspace %}
|
||||
|
||||
ConfigResponse generates response for /api/v1/config .
|
||||
{% func ConfigResponse(config *ConfigData, qt *querytracer.Tracer) %}
|
||||
{
|
||||
"status":"success",
|
||||
"data":{
|
||||
"minStalenessInterval": {%q= config.MinStalenessInterval %},
|
||||
"maxStalenessInterval": {%q= config.MaxStalenessInterval %}
|
||||
}
|
||||
{% code qt.Done() %}
|
||||
{%= dumpQueryTrace(qt) %}
|
||||
}
|
||||
{% endfunc %}
|
||||
|
||||
{% endstripspace %}
|
||||
73
app/vmselect/prometheus/config_response.qtpl.go
Normal file
73
app/vmselect/prometheus/config_response.qtpl.go
Normal file
@@ -0,0 +1,73 @@
|
||||
// Code generated by qtc from "config_response.qtpl". DO NOT EDIT.
|
||||
// See https://github.com/valyala/quicktemplate for details.
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:1
|
||||
package prometheus
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:1
|
||||
import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
)
|
||||
|
||||
// ConfigResponse generates response for /api/v1/config .
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:8
|
||||
import (
|
||||
qtio422016 "io"
|
||||
|
||||
qt422016 "github.com/valyala/quicktemplate"
|
||||
)
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:8
|
||||
var (
|
||||
_ = qtio422016.Copy
|
||||
_ = qt422016.AcquireByteBuffer
|
||||
)
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:8
|
||||
func StreamConfigResponse(qw422016 *qt422016.Writer, config *ConfigData, qt *querytracer.Tracer) {
|
||||
//line app/vmselect/prometheus/config_response.qtpl:8
|
||||
qw422016.N().S(`{"status":"success","data":{"minStalenessInterval":`)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:12
|
||||
qw422016.N().Q(config.MinStalenessInterval)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:12
|
||||
qw422016.N().S(`,"maxStalenessInterval":`)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:13
|
||||
qw422016.N().Q(config.MaxStalenessInterval)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:13
|
||||
qw422016.N().S(`}`)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:15
|
||||
qt.Done()
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:16
|
||||
streamdumpQueryTrace(qw422016, qt)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:16
|
||||
qw422016.N().S(`}`)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
func WriteConfigResponse(qq422016 qtio422016.Writer, config *ConfigData, qt *querytracer.Tracer) {
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
StreamConfigResponse(qw422016, config, qt)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
func ConfigResponse(config *ConfigData, qt *querytracer.Tracer) string {
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
WriteConfigResponse(qb422016, config, qt)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
return qs422016
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
}
|
||||
18
app/vmselect/prometheus/extract_metric_exprs_response.qtpl
Normal file
18
app/vmselect/prometheus/extract_metric_exprs_response.qtpl
Normal file
@@ -0,0 +1,18 @@
|
||||
{% stripspace %}
|
||||
|
||||
ExtractMetricExprsResponse generates response for /extract-metric-exprs .
|
||||
{% func ExtractMetricExprsResponse(metrics []string) %}
|
||||
{
|
||||
"status":"success",
|
||||
"data":[
|
||||
{% if len(metrics) > 0 %}
|
||||
{%q= metrics[0] %}
|
||||
{% for i := 1; i < len(metrics); i++ %}
|
||||
,{%q= metrics[i] %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
]
|
||||
}
|
||||
{% endfunc %}
|
||||
|
||||
{% endstripspace %}
|
||||
@@ -0,0 +1,69 @@
|
||||
// Code generated by qtc from "extract_metric_exprs_response.qtpl". DO NOT EDIT.
|
||||
// See https://github.com/valyala/quicktemplate for details.
|
||||
|
||||
// ExtractMetricExprsResponse generates response for /extract-metric-exprs .
|
||||
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
|
||||
package prometheus
|
||||
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
|
||||
import (
|
||||
qtio422016 "io"
|
||||
|
||||
qt422016 "github.com/valyala/quicktemplate"
|
||||
)
|
||||
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
|
||||
var (
|
||||
_ = qtio422016.Copy
|
||||
_ = qt422016.AcquireByteBuffer
|
||||
)
|
||||
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
|
||||
func StreamExtractMetricExprsResponse(qw422016 *qt422016.Writer, metrics []string) {
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
|
||||
qw422016.N().S(`{"status":"success","data":[`)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:8
|
||||
if len(metrics) > 0 {
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:9
|
||||
qw422016.N().Q(metrics[0])
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:10
|
||||
for i := 1; i < len(metrics); i++ {
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:10
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:11
|
||||
qw422016.N().Q(metrics[i])
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:12
|
||||
}
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:13
|
||||
}
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:13
|
||||
qw422016.N().S(`]}`)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
func WriteExtractMetricExprsResponse(qq422016 qtio422016.Writer, metrics []string) {
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
StreamExtractMetricExprsResponse(qw422016, metrics)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
func ExtractMetricExprsResponse(metrics []string) string {
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
WriteExtractMetricExprsResponse(qb422016, metrics)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
return qs422016
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
}
|
||||
@@ -1,8 +1,10 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
"runtime"
|
||||
@@ -43,6 +45,9 @@ var (
|
||||
maxLookback = flag.Duration("search.maxLookback", 0, "Synonym to -query.lookback-delta from Prometheus. "+
|
||||
"The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. "+
|
||||
"See also '-search.maxStalenessInterval' flag, which has the same meaning due to historical reasons")
|
||||
minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+
|
||||
"This flag could be useful for removing gaps on graphs generated from time series with irregular intervals between samples. "+
|
||||
"See also '-search.maxStalenessInterval'")
|
||||
maxStalenessInterval = flag.Duration("search.maxStalenessInterval", 0, "The maximum interval for staleness calculations. "+
|
||||
"By default, it is automatically calculated from the median interval between samples. This flag could be useful for tuning "+
|
||||
"Prometheus data model closer to Influx-style data model. See https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness for details. "+
|
||||
@@ -118,7 +123,7 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lookbackDelta, err := getMaxLookback(r)
|
||||
lookbackDelta, err := getMaxLookback(r, *maxStalenessInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -723,6 +728,55 @@ func TSDBStatusHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
|
||||
|
||||
var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/tsdb"}`)
|
||||
|
||||
// ConfigData holds the current configuration values for search-related flags
|
||||
type ConfigData struct {
|
||||
MinStalenessInterval string
|
||||
MaxStalenessInterval string
|
||||
}
|
||||
|
||||
// ConfigHandler processes /api/v1/config request.
|
||||
//
|
||||
// It returns the current configuration for search-related flags.
|
||||
func ConfigHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWriter, _ *http.Request) error {
|
||||
config := &ConfigData{
|
||||
MinStalenessInterval: (*minStalenessInterval).String(),
|
||||
MaxStalenessInterval: (*maxStalenessInterval).String(),
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
WriteConfigResponse(bw, config, qt)
|
||||
if err := bw.Flush(); err != nil {
|
||||
return fmt.Errorf("cannot send config response to remote client: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExtractMetricExprsHandler processes /extract-metric-exprs request.
|
||||
//
|
||||
// It extracts metric expressions from a given PromQL query.
|
||||
func ExtractMetricExprsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
|
||||
query := r.FormValue("query")
|
||||
if len(query) == 0 {
|
||||
return fmt.Errorf("missing `query` arg")
|
||||
}
|
||||
|
||||
metrics, err := promql.ExtractMetricsFromQuery(query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot extract metrics from query: %w", err)
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
WriteExtractMetricExprsResponse(bw, metrics)
|
||||
if err := bw.Flush(); err != nil {
|
||||
return fmt.Errorf("cannot send extract metric exprs response to remote client: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LabelsHandler processes /api/v1/labels request.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
|
||||
@@ -847,7 +901,8 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w
|
||||
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
deadline := searchutil.GetDeadlineForQuery(r, startTime)
|
||||
noCache := httputil.GetBool(r, "nocache")
|
||||
isDebug := httputil.GetBool(r, "debug")
|
||||
noCache := httputil.GetBool(r, "nocache") || isDebug
|
||||
query := r.FormValue("query")
|
||||
if len(query) == 0 {
|
||||
return fmt.Errorf("missing `query` arg")
|
||||
@@ -856,7 +911,7 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lookbackDelta, err := getMaxLookback(r)
|
||||
lookbackDelta, err := getMaxLookback(r, *maxStalenessInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -942,29 +997,18 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w
|
||||
} else {
|
||||
queryOffset = 0
|
||||
}
|
||||
ec := &promql.EvalConfig{
|
||||
Start: start,
|
||||
End: start,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: *maxUniqueTimeseries,
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
NoCache: noCache,
|
||||
LookbackDelta: lookbackDelta,
|
||||
RoundDigits: getRoundDigits(r),
|
||||
EnforcedTagFilterss: etfs,
|
||||
CacheTagFilters: etfs,
|
||||
GetRequestURI: func() string {
|
||||
return httpserver.GetRequestURI(r)
|
||||
},
|
||||
|
||||
DenyPartialResponse: httputil.GetDenyPartialResponse(r),
|
||||
}
|
||||
ec := newEvalConfig(r, start, start, step, deadline, noCache, lookbackDelta, isDebug, etfs)
|
||||
err = populateAuthTokens(qt, ec, at, deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot populate auth tokens: %w", err)
|
||||
}
|
||||
if isDebug {
|
||||
if err := populateSimulatedData(r, at, ec); err != nil {
|
||||
_ = r.Body.Close()
|
||||
return fmt.Errorf("cannot read simulated samples: %w", err)
|
||||
}
|
||||
}
|
||||
_ = r.Body.Close()
|
||||
qs := promql.NewQueryStats(query, at, ec)
|
||||
ec.QueryStats = qs
|
||||
|
||||
@@ -1038,8 +1082,9 @@ func QueryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
|
||||
func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w http.ResponseWriter, query string,
|
||||
start, end, step int64, r *http.Request, ct int64, etfs [][]storage.TagFilter) error {
|
||||
deadline := searchutil.GetDeadlineForQuery(r, startTime)
|
||||
noCache := httputil.GetBool(r, "nocache")
|
||||
lookbackDelta, err := getMaxLookback(r)
|
||||
isDebug := httputil.GetBool(r, "debug")
|
||||
noCache := httputil.GetBool(r, "nocache") || isDebug
|
||||
lookbackDelta, err := getMaxLookback(r, *maxStalenessInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -1058,29 +1103,19 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
|
||||
start, end = promql.AdjustStartEnd(start, end, step)
|
||||
}
|
||||
|
||||
ec := &promql.EvalConfig{
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: *maxUniqueTimeseries,
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
NoCache: noCache,
|
||||
LookbackDelta: lookbackDelta,
|
||||
RoundDigits: getRoundDigits(r),
|
||||
EnforcedTagFilterss: etfs,
|
||||
CacheTagFilters: etfs,
|
||||
GetRequestURI: func() string {
|
||||
return httpserver.GetRequestURI(r)
|
||||
},
|
||||
|
||||
DenyPartialResponse: httputil.GetDenyPartialResponse(r),
|
||||
}
|
||||
ec := newEvalConfig(r, start, end, step, deadline, noCache, lookbackDelta, isDebug, etfs)
|
||||
err = populateAuthTokens(qt, ec, at, deadline)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot populate auth tokens: %w", err)
|
||||
}
|
||||
if isDebug {
|
||||
if err := populateSimulatedData(r, at, ec); err != nil {
|
||||
_ = r.Body.Close()
|
||||
return fmt.Errorf("cannot read simulated samples: %w", err)
|
||||
}
|
||||
}
|
||||
_ = r.Body.Close()
|
||||
|
||||
qs := promql.NewQueryStats(query, at, ec)
|
||||
ec.QueryStats = qs
|
||||
|
||||
@@ -1116,6 +1151,102 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
|
||||
return nil
|
||||
}
|
||||
|
||||
func newEvalConfig(r *http.Request, start, end, step int64, deadline searchutil.Deadline, noCache bool, lookbackDelta int64, isDebug bool, etfs [][]storage.TagFilter) *promql.EvalConfig {
|
||||
ec := &promql.EvalConfig{
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: *maxUniqueTimeseries,
|
||||
MinStalenessInterval: *minStalenessInterval,
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
NoCache: noCache,
|
||||
LookbackDelta: lookbackDelta,
|
||||
RoundDigits: getRoundDigits(r),
|
||||
EnforcedTagFilterss: etfs,
|
||||
GetRequestURI: func() string {
|
||||
return httpserver.GetRequestURI(r)
|
||||
},
|
||||
|
||||
DenyPartialResponse: !isDebug && httputil.GetDenyPartialResponse(r),
|
||||
}
|
||||
|
||||
return ec
|
||||
}
|
||||
|
||||
func populateSimulatedData(r *http.Request, at *auth.Token, evalConfig *promql.EvalConfig) error {
|
||||
type jsonExportBlockInput struct {
|
||||
Metric map[string]string `json:"metric"`
|
||||
Values []float64 `json:"values"`
|
||||
Timestamps []int64 `json:"timestamps"`
|
||||
}
|
||||
|
||||
// --- Read and Parse Input Samples from r.Body ---
|
||||
var simulatedSeries []*storage.SimulatedSamples
|
||||
decoder := json.NewDecoder(r.Body)
|
||||
lineNum := 0
|
||||
accountID := uint32(0)
|
||||
projectID := uint32(0)
|
||||
if at != nil {
|
||||
accountID = at.AccountID
|
||||
projectID = at.ProjectID
|
||||
}
|
||||
for {
|
||||
var jeb jsonExportBlockInput
|
||||
if err := decoder.Decode(&jeb); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("error decoding input JSON on line %d: %w", lineNum, err)
|
||||
}
|
||||
|
||||
// Validate that values and timestamps arrays have the same length
|
||||
if len(jeb.Values) != len(jeb.Timestamps) {
|
||||
return fmt.Errorf("mismatched values and timestamps arrays length in debug data on line %d: values=%d, timestamps=%d", lineNum, len(jeb.Values), len(jeb.Timestamps))
|
||||
}
|
||||
|
||||
mn := storage.GetMetricName()
|
||||
defer storage.PutMetricName(mn)
|
||||
mn.AccountID = accountID
|
||||
mn.ProjectID = projectID
|
||||
for k, v := range jeb.Metric {
|
||||
mn.AddTag(k, v)
|
||||
}
|
||||
|
||||
ss := &storage.SimulatedSamples{
|
||||
Value: jeb.Values,
|
||||
Timestamps: jeb.Timestamps,
|
||||
}
|
||||
ss.Name.CopyFrom(mn)
|
||||
simulatedSeries = append(simulatedSeries, ss)
|
||||
lineNum++
|
||||
}
|
||||
|
||||
// It doesn't make sense to debug with empty samples
|
||||
if len(simulatedSeries) == 0 {
|
||||
return fmt.Errorf("no simulated samples found")
|
||||
}
|
||||
|
||||
minStalenessInterval, err := httputil.GetDurationRaw(r, "min_staleness_interval", evalConfig.MinStalenessInterval)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse `min_staleness_interval` arg: %w", err)
|
||||
}
|
||||
|
||||
maxStalenessInterval, err := httputil.GetDurationRaw(r, "max_staleness_interval", *maxStalenessInterval)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse `max_staleness_interval` arg: %w", err)
|
||||
}
|
||||
|
||||
evalConfig.SimulatedSamples = simulatedSeries
|
||||
evalConfig.MinStalenessInterval = minStalenessInterval
|
||||
evalConfig.LookbackDelta, err = getMaxLookback(r, maxStalenessInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func populateAuthTokens(qt *querytracer.Tracer, ec *promql.EvalConfig, at *auth.Token, deadline searchutil.Deadline) error {
|
||||
if at != nil {
|
||||
ec.AuthTokens = []*auth.Token{at}
|
||||
@@ -1215,7 +1346,7 @@ func adjustLastPoints(tss []netstorage.Result, start, end int64) []netstorage.Re
|
||||
return tss
|
||||
}
|
||||
|
||||
func getMaxLookback(r *http.Request) (int64, error) {
|
||||
func getMaxLookback(r *http.Request, maxStalenessInterval time.Duration) (int64, error) {
|
||||
d := maxLookback.Milliseconds()
|
||||
if d == 0 {
|
||||
d = maxStalenessInterval.Milliseconds()
|
||||
|
||||
@@ -138,6 +138,10 @@ type EvalConfig struct {
|
||||
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
|
||||
LookbackDelta int64
|
||||
|
||||
// MaxStalenessInterval corresponds to -search.maxStalenessInterval,
|
||||
// but customized per query request.
|
||||
MinStalenessInterval time.Duration
|
||||
|
||||
// How many decimal digits after the point to leave in response.
|
||||
RoundDigits int
|
||||
|
||||
@@ -168,6 +172,9 @@ type EvalConfig struct {
|
||||
|
||||
timestamps []int64
|
||||
timestampsOnce sync.Once
|
||||
|
||||
// Simulated samples
|
||||
SimulatedSamples []*storage.SimulatedSamples
|
||||
}
|
||||
|
||||
// copyEvalConfig returns src copy.
|
||||
@@ -190,6 +197,8 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig {
|
||||
ec.DenyPartialResponse = src.DenyPartialResponse
|
||||
ec.IsPartialResponse.Store(src.IsPartialResponse.Load())
|
||||
ec.QueryStats = src.QueryStats
|
||||
ec.MinStalenessInterval = src.MinStalenessInterval
|
||||
ec.SimulatedSamples = src.SimulatedSamples
|
||||
|
||||
// do not copy src.timestamps - they must be generated again.
|
||||
return &ec
|
||||
@@ -949,7 +958,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
|
||||
}
|
||||
|
||||
ecSQ := copyEvalConfig(ec)
|
||||
ecSQ.Start -= window + step + maxSilenceInterval()
|
||||
ecSQ.Start -= window + step + maxSilenceInterval(ec.MinStalenessInterval)
|
||||
ecSQ.End += step
|
||||
ecSQ.Step = step
|
||||
ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries
|
||||
@@ -967,7 +976,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
|
||||
return nil, nil
|
||||
}
|
||||
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
|
||||
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.IsMultiTenant)
|
||||
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.IsMultiTenant, ec.MinStalenessInterval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1720,7 +1729,7 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
|
||||
}
|
||||
// Obtain rollup configs before fetching data from db, so type errors could be caught earlier.
|
||||
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
|
||||
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.IsMultiTenant)
|
||||
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.IsMultiTenant, ec.MinStalenessInterval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1730,7 +1739,7 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
|
||||
tfss = searchutil.JoinTagFilterss(tfss, ec.EnforcedTagFilterss)
|
||||
minTimestamp := ec.Start
|
||||
if needSilenceIntervalForRollupFunc[funcName] {
|
||||
minTimestamp -= maxSilenceInterval()
|
||||
minTimestamp -= maxSilenceInterval(ec.MinStalenessInterval)
|
||||
}
|
||||
if window > ec.Step {
|
||||
minTimestamp -= window
|
||||
@@ -1749,10 +1758,13 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
|
||||
} else {
|
||||
sq = storage.NewSearchQuery(ec.AuthTokens[0].AccountID, ec.AuthTokens[0].ProjectID, minTimestamp, ec.End, tfss, ec.MaxSeries)
|
||||
}
|
||||
sq.SimulatedSeries = ec.SimulatedSamples
|
||||
|
||||
rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.DenyPartialResponse, sq, ec.Deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
ec.updateIsPartialResponse(isPartial)
|
||||
qs := ec.QueryStats
|
||||
rssLen := rss.Len()
|
||||
@@ -1835,7 +1847,7 @@ func getRollupMemoryLimiter() *memoryLimiter {
|
||||
return &rollupMemoryLimiter
|
||||
}
|
||||
|
||||
func maxSilenceInterval() int64 {
|
||||
func maxSilenceInterval(minStalenessInterval time.Duration) int64 {
|
||||
d := minStalenessInterval.Milliseconds()
|
||||
if d <= 0 {
|
||||
d = 5 * 60 * 1000
|
||||
|
||||
@@ -67,12 +67,15 @@ func Exec(qt *querytracer.Tracer, ec *EvalConfig, q string, isFirstPointOnly boo
|
||||
}
|
||||
}
|
||||
|
||||
var rv []*timeseries
|
||||
|
||||
qid := activeQueriesV.Add(ec, q)
|
||||
rv, err := evalExpr(qt, ec, e)
|
||||
rv, err = evalExpr(qt, ec, e)
|
||||
activeQueriesV.Remove(qid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if isFirstPointOnly {
|
||||
// Remove all the points except the first one from every time series.
|
||||
for _, ts := range rv {
|
||||
@@ -331,3 +334,23 @@ func escapeDots(s string) string {
|
||||
}
|
||||
return string(result)
|
||||
}
|
||||
|
||||
// ExtractMetricsFromQuery visits all the expressions in query and returns all the metrics found in the query.
|
||||
func ExtractMetricsFromQuery(query string) ([]string, error) {
|
||||
expr, err := metricsql.Parse(query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing query: %w", err)
|
||||
}
|
||||
|
||||
var metrics []string
|
||||
metricsql.VisitAll(expr, func(e metricsql.Expr) {
|
||||
if me, ok := e.(*metricsql.MetricExpr); ok {
|
||||
metricStr := string(me.AppendString(nil))
|
||||
if metricStr != "" {
|
||||
metrics = append(metrics, metricStr)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
329
app/vmselect/promql/exec_debug_test.go
Normal file
329
app/vmselect/promql/exec_debug_test.go
Normal file
@@ -0,0 +1,329 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"math"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
)
|
||||
|
||||
func TestSimulatedExec(t *testing.T) {
|
||||
accountID := uint32(123)
|
||||
projectID := uint32(567)
|
||||
start := int64(1000e3)
|
||||
end := int64(2000e3)
|
||||
step := int64(200e3)
|
||||
|
||||
// Base EvalConfig that will be copied for each test
|
||||
baseEC := EvalConfig{
|
||||
AuthTokens: []*auth.Token{{
|
||||
AccountID: accountID,
|
||||
ProjectID: projectID,
|
||||
}},
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: 1e4,
|
||||
MaxSeries: 1000,
|
||||
Deadline: searchutil.NewDeadline(time.Now(), time.Hour, ""),
|
||||
RoundDigits: 100,
|
||||
NoCache: true,
|
||||
}
|
||||
|
||||
t.Run(`simple_metric_exact_match`, func(t *testing.T) {
|
||||
t.Skip()
|
||||
ec := copyEvalConfig(&baseEC)
|
||||
mn := newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"a", "b",
|
||||
)
|
||||
|
||||
ec.SimulatedSamples = []*storage.SimulatedSamples{mn.build()}
|
||||
|
||||
q := `test_metric{a="b"}`
|
||||
result, err := Exec(nil, ec, q, false)
|
||||
if err != nil {
|
||||
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
|
||||
}
|
||||
|
||||
// Expected result
|
||||
expectedMN := storage.MetricName{
|
||||
AccountID: accountID,
|
||||
ProjectID: projectID,
|
||||
MetricGroup: []byte("test_metric"),
|
||||
Tags: []storage.Tag{
|
||||
{
|
||||
Key: []byte("a"),
|
||||
Value: []byte("b"),
|
||||
},
|
||||
},
|
||||
}
|
||||
expectedResult := []netstorage.Result{
|
||||
{
|
||||
MetricName: expectedMN,
|
||||
Values: mn.Value,
|
||||
Timestamps: mn.Timestamps,
|
||||
},
|
||||
}
|
||||
|
||||
testResultsEqual(t, result, expectedResult, false)
|
||||
})
|
||||
|
||||
t.Run(`filtered_by_tag_value`, func(t *testing.T) {
|
||||
t.Skip()
|
||||
|
||||
// Create a copy of base EvalConfig
|
||||
ec := copyEvalConfig(&baseEC)
|
||||
mn := metricBuilders{
|
||||
newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"a", "b",
|
||||
"region", "us-west",
|
||||
),
|
||||
newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"a", "b",
|
||||
"region", "us-east",
|
||||
),
|
||||
}
|
||||
ec.SimulatedSamples = mn.build()
|
||||
|
||||
q := `test_metric{region="us-west"}`
|
||||
result, err := Exec(nil, ec, q, false)
|
||||
if err != nil {
|
||||
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
|
||||
}
|
||||
|
||||
// Expected result
|
||||
expectedMN := storage.MetricName{
|
||||
AccountID: accountID,
|
||||
ProjectID: projectID,
|
||||
MetricGroup: []byte("test_metric"),
|
||||
Tags: []storage.Tag{
|
||||
{
|
||||
Key: []byte("a"),
|
||||
Value: []byte("b"),
|
||||
},
|
||||
{
|
||||
Key: []byte("region"),
|
||||
Value: []byte("us-west"),
|
||||
},
|
||||
},
|
||||
}
|
||||
expectedResult := []netstorage.Result{
|
||||
{
|
||||
MetricName: expectedMN,
|
||||
Values: mn[0].Value,
|
||||
Timestamps: mn[0].Timestamps,
|
||||
},
|
||||
}
|
||||
|
||||
testResultsEqual(t, result, expectedResult, false)
|
||||
})
|
||||
|
||||
t.Run(`regex_match_on_tag`, func(t *testing.T) {
|
||||
ec := copyEvalConfig(&baseEC)
|
||||
mn := metricBuilders{
|
||||
newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"env", "prod",
|
||||
),
|
||||
newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"env", "staging",
|
||||
),
|
||||
newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"env", "dev",
|
||||
),
|
||||
}
|
||||
ec.SimulatedSamples = mn.build()
|
||||
|
||||
q := `test_metric{env=~"prod|staging"}`
|
||||
result, err := Exec(nil, ec, q, false)
|
||||
if err != nil {
|
||||
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
|
||||
}
|
||||
|
||||
expectedResult := []netstorage.Result{mn[0].toResult(), mn[1].toResult()}
|
||||
testResultsEqual(t, result, expectedResult, false)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSumOverTime(t *testing.T) {
|
||||
accountID := uint32(123)
|
||||
projectID := uint32(567)
|
||||
start := int64(1000e3)
|
||||
end := int64(1300e3)
|
||||
step := int64(30e3)
|
||||
|
||||
baseEC := EvalConfig{
|
||||
AuthTokens: []*auth.Token{{
|
||||
AccountID: accountID,
|
||||
ProjectID: projectID,
|
||||
}},
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: 1e4,
|
||||
MaxSeries: 1000,
|
||||
Deadline: searchutil.NewDeadline(time.Now(), time.Hour, ""),
|
||||
RoundDigits: 100,
|
||||
NoCache: true,
|
||||
}
|
||||
|
||||
t.Run(`basic_sum_over_time`, func(t *testing.T) {
|
||||
ec := copyEvalConfig(&baseEC)
|
||||
|
||||
metric := newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"app", "api-server",
|
||||
).withValues(1, 2, 3, 4, 5, 6).withUnix(1000, 1015, 1030, 1045, 1060, 1075)
|
||||
ec.SimulatedSamples = []*storage.SimulatedSamples{metric.build()}
|
||||
|
||||
q := `sum_over_time(test_metric[30s])`
|
||||
result, err := Exec(nil, ec, q, false)
|
||||
if err != nil {
|
||||
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
|
||||
}
|
||||
|
||||
expectedResult := []netstorage.Result{
|
||||
newMetric(accountID, projectID,
|
||||
"app", "api-server",
|
||||
).withValues(1, 5, 9, 6).withUnix(1000, 1030, 1060, 1090).toResult(),
|
||||
}
|
||||
|
||||
testSimulatedResultsEqual(t, result, expectedResult, false)
|
||||
})
|
||||
}
|
||||
|
||||
type metricBuilder storage.SimulatedSamples
|
||||
|
||||
func newMetric(accountID uint32, projectID uint32, pairs ...string) *metricBuilder {
|
||||
mn := storage.MetricName{
|
||||
AccountID: accountID,
|
||||
ProjectID: projectID,
|
||||
}
|
||||
for i := 0; i < len(pairs); i += 2 {
|
||||
mn.AddTag(pairs[i], pairs[i+1])
|
||||
}
|
||||
return &metricBuilder{
|
||||
Name: mn,
|
||||
Value: []float64{10, 20, 30, 40, 50, 60},
|
||||
Timestamps: []int64{1000e3, 1200e3, 1400e3, 1600e3, 1800e3, 2000e3},
|
||||
}
|
||||
}
|
||||
|
||||
func (b *metricBuilder) withUnix(unix ...int64) *metricBuilder {
|
||||
b.Timestamps = make([]int64, len(unix))
|
||||
for i := range unix {
|
||||
b.Timestamps[i] = unix[i] * 1e3
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *metricBuilder) withValues(values ...float64) *metricBuilder {
|
||||
b.Value = values
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *metricBuilder) build() *storage.SimulatedSamples {
|
||||
return (*storage.SimulatedSamples)(b)
|
||||
}
|
||||
|
||||
func (b *metricBuilder) toResult() netstorage.Result {
|
||||
return netstorage.Result{
|
||||
MetricName: b.Name,
|
||||
Values: b.Value,
|
||||
Timestamps: b.Timestamps,
|
||||
}
|
||||
}
|
||||
|
||||
type metricBuilders []*metricBuilder
|
||||
|
||||
func (b metricBuilders) build() []*storage.SimulatedSamples {
|
||||
ss := make([]*storage.SimulatedSamples, len(b))
|
||||
for i := range b {
|
||||
ss[i] = b[i].build()
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
func testSimulatedResultsEqual(t *testing.T, result, resultExpected []netstorage.Result, verifyTenant bool) {
|
||||
t.Helper()
|
||||
result = removeEmptyValuesAndTimeseries(result)
|
||||
|
||||
if len(result) != len(resultExpected) {
|
||||
t.Fatalf(`unexpected timeseries count; got %d; want %d`, len(result), len(resultExpected))
|
||||
}
|
||||
for i := range result {
|
||||
r := &result[i]
|
||||
rExpected := &resultExpected[i]
|
||||
testMetricNamesEqual(t, &r.MetricName, &rExpected.MetricName, verifyTenant, i)
|
||||
testRowsEqual(t, r.Values, r.Timestamps, rExpected.Values, rExpected.Timestamps)
|
||||
}
|
||||
}
|
||||
|
||||
func removeEmptyValuesAndTimeseries(tss []netstorage.Result) []netstorage.Result {
|
||||
dst := tss[:0]
|
||||
for i := range tss {
|
||||
ts := &tss[i]
|
||||
hasNaNs := slices.ContainsFunc(ts.Values, math.IsNaN)
|
||||
if !hasNaNs {
|
||||
// Fast path: nothing to remove.
|
||||
if len(ts.Values) > 0 {
|
||||
dst = append(dst, *ts)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Slow path: remove NaNs.
|
||||
srcTimestamps := ts.Timestamps
|
||||
dstValues := ts.Values[:0]
|
||||
// Do not reuse ts.Timestamps for dstTimestamps, since ts.Timestamps
|
||||
// may be shared among multiple time series.
|
||||
dstTimestamps := make([]int64, 0, len(ts.Timestamps))
|
||||
for j, v := range ts.Values {
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
dstValues = append(dstValues, v)
|
||||
dstTimestamps = append(dstTimestamps, srcTimestamps[j])
|
||||
}
|
||||
ts.Values = dstValues
|
||||
ts.Timestamps = dstTimestamps
|
||||
if len(ts.Values) > 0 {
|
||||
dst = append(dst, *ts)
|
||||
}
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func TestExtractMetricsFromQuery(t *testing.T) {
|
||||
query := `(vm_free_disk_space_bytes{job=~"$job", instance=~"$instance"}-vm_free_disk_space_limit_bytes{job=~"$job", instance=~"$instance"})
|
||||
/
|
||||
ignoring(path) (
|
||||
(rate(vm_rows_added_to_storage_total{job=~"$job", instance=~"$instance"}[1d]) -
|
||||
sum(rate(vm_deduplicated_samples_total{job=~"$job", instance=~"$instance"}[1d])) without (type)) *
|
||||
(
|
||||
sum(vm_data_size_bytes{job=~"$job", instance=~"$instance", type!~"indexdb.*"}) without(type) /
|
||||
sum(vm_rows{job=~"$job", instance=~"$instance", type!~"indexdb.*"}) without(type)
|
||||
)
|
||||
+
|
||||
rate(vm_new_timeseries_created_total{job=~"$job", instance=~"$instance"}[1d]) *
|
||||
(
|
||||
sum(vm_data_size_bytes{job=~"$job", instance=~"$instance", type="indexdb/file"}) /
|
||||
sum(vm_rows{job=~"$job", instance=~"$instance", type="indexdb/file"})
|
||||
)
|
||||
)`
|
||||
metrics, err := ExtractMetricsFromQuery(query)
|
||||
if err != nil {
|
||||
t.Fatalf(`unexpected error when extracting metrics from query: %s`, err)
|
||||
}
|
||||
t.Logf(`metrics: %v`, metrics)
|
||||
}
|
||||
@@ -1,12 +1,12 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
@@ -17,10 +17,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
)
|
||||
|
||||
var minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+
|
||||
"This flag could be useful for removing gaps on graphs generated from time series with irregular intervals between samples. "+
|
||||
"See also '-search.maxStalenessInterval'")
|
||||
|
||||
var rollupFuncs = map[string]newRollupFunc{
|
||||
"absent_over_time": newRollupFuncOneArg(rollupAbsent),
|
||||
"aggr_over_time": newRollupFuncTwoArgs(rollupFake),
|
||||
@@ -372,7 +368,7 @@ func getRollupTag(expr metricsql.Expr) (string, error) {
|
||||
}
|
||||
|
||||
func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start, end, step int64, maxPointsPerSeries int,
|
||||
window, lookbackDelta int64, sharedTimestamps []int64, isMultiTenant bool) (
|
||||
window, lookbackDelta int64, sharedTimestamps []int64, isMultiTenant bool, minStalenessInterval time.Duration) (
|
||||
func(values []float64, timestamps []int64), []*rollupConfig, error) {
|
||||
preFunc := func(_ []float64, _ []int64) {}
|
||||
funcName = strings.ToLower(funcName)
|
||||
@@ -409,6 +405,7 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start
|
||||
isDefaultRollup: funcName == "default_rollup",
|
||||
samplesScannedPerCall: samplesScannedPerCall,
|
||||
isMultiTenant: isMultiTenant,
|
||||
minStalenessInterval: minStalenessInterval,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -605,6 +602,9 @@ type rollupConfig struct {
|
||||
// Whether the rollup is used in multi-tenant mode.
|
||||
// This is used in order to populate labels with tenancy information.
|
||||
isMultiTenant bool
|
||||
|
||||
// The minimum interval for staleness calculations.
|
||||
minStalenessInterval time.Duration
|
||||
}
|
||||
|
||||
func (rc *rollupConfig) getTimestamps() []int64 {
|
||||
@@ -728,8 +728,8 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
|
||||
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
|
||||
maxPrevInterval = rc.LookbackDelta
|
||||
}
|
||||
if *minStalenessInterval > 0 {
|
||||
if msi := minStalenessInterval.Milliseconds(); msi > 0 && maxPrevInterval < msi {
|
||||
if rc.minStalenessInterval > 0 {
|
||||
if msi := rc.minStalenessInterval.Milliseconds(); msi > 0 && maxPrevInterval < msi {
|
||||
maxPrevInterval = msi
|
||||
}
|
||||
}
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||
)
|
||||
@@ -34,4 +35,31 @@ func GetDuration(r *http.Request, argKey string, defaultValue int64) (int64, err
|
||||
return msecs, nil
|
||||
}
|
||||
|
||||
// GetDurationRaw returns time.Duration from the given argKey query arg.
|
||||
func GetDurationRaw(r *http.Request, argKey string, defaultValue time.Duration) (time.Duration, error) {
|
||||
argValue := r.FormValue(argKey)
|
||||
if len(argValue) == 0 {
|
||||
return defaultValue, nil
|
||||
}
|
||||
if argValue == "undefined" {
|
||||
// This hack is needed for Grafana, which may send undefined value
|
||||
return defaultValue, nil
|
||||
}
|
||||
secs, err := strconv.ParseFloat(argValue, 64)
|
||||
if err != nil {
|
||||
// Try parsing string format
|
||||
d, err := timeutil.ParseDuration(argValue)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("cannot parse %q=%q: %w", argKey, argValue, err)
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
d := time.Duration(secs * float64(time.Second))
|
||||
msecs := d.Milliseconds()
|
||||
if msecs <= 0 || msecs > maxDurationMsecs {
|
||||
return 0, fmt.Errorf("%s=%s is out of allowed range [%s ... %s]", argKey, d, time.Millisecond, time.Duration(maxDurationMsecs)*time.Millisecond)
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
|
||||
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
|
||||
|
||||
@@ -2236,6 +2236,43 @@ func hasCompositeTagFilters(tfs []*tagFilter, prefix []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// MatchSimulatedSamples filters the given simulatedSamples against the provided tag filters.
|
||||
// It returns only the simulated samples that match any of the given tag filter sets.
|
||||
// This function is used for debugging and testing purposes to simulate metric queries.
|
||||
func MatchSimulatedSamples(accountID, projectID uint32, simulatedSamples []*SimulatedSamples, tagFilterss [][]TagFilter) ([]*SimulatedSamples, error) {
|
||||
var kb bytesutil.ByteBuffer
|
||||
matchedSamples := make([]*SimulatedSamples, 0, 1)
|
||||
for _, rawTfs := range tagFilterss {
|
||||
tfs := NewTagFilters(accountID, projectID)
|
||||
for _, tf := range rawTfs {
|
||||
err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot add tagFilter %s: %w", tf.String(), err)
|
||||
}
|
||||
}
|
||||
|
||||
for idx, mn := range simulatedSamples {
|
||||
ok, err := matchTagFilters(&mn.Name, toTFPointers(tfs.tfs), &kb)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot match MetricName %s against tagFilters: %w", mn.Name.String(), err)
|
||||
}
|
||||
if ok {
|
||||
matchedSamples = append(matchedSamples, simulatedSamples[idx])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return matchedSamples, nil
|
||||
}
|
||||
|
||||
func toTFPointers(tfs []tagFilter) []*tagFilter {
|
||||
tfps := make([]*tagFilter, len(tfs))
|
||||
for i := range tfs {
|
||||
tfps[i] = &tfs[i]
|
||||
}
|
||||
return tfps
|
||||
}
|
||||
|
||||
func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) {
|
||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
|
||||
for i, tf := range tfs {
|
||||
|
||||
@@ -2162,14 +2162,6 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||
fs.MustRemoveDir(path)
|
||||
}
|
||||
|
||||
func toTFPointers(tfs []tagFilter) []*tagFilter {
|
||||
tfps := make([]*tagFilter, len(tfs))
|
||||
for i := range tfs {
|
||||
tfps[i] = &tfs[i]
|
||||
}
|
||||
return tfps
|
||||
}
|
||||
|
||||
func newTestStorage() *Storage {
|
||||
s := &Storage{
|
||||
cachePath: "test-storage-cache",
|
||||
|
||||
@@ -362,6 +362,35 @@ type SearchQuery struct {
|
||||
|
||||
// The maximum number of time series the search query can return.
|
||||
MaxMetrics int
|
||||
|
||||
// SimulatedSeries is used for simulating samples returned from storage nodes.
|
||||
SimulatedSeries []*SimulatedSamples
|
||||
}
|
||||
|
||||
// SimulatedSamples represents simulated metric samples for debug and testing purposes.
|
||||
// It contains metric name, timestamps and corresponding values.
|
||||
type SimulatedSamples struct {
|
||||
Name MetricName
|
||||
Timestamps []int64
|
||||
Value []float64
|
||||
}
|
||||
|
||||
// NewSimulatedSeries creates a new SimulatedSamples instance with the given parameters.
|
||||
// It constructs a metric name from the provided accountID, projectID and metric labels.
|
||||
func NewSimulatedSeries(accountID, projectID uint32, metric map[string]string, timestamp []int64, value []float64) *SimulatedSamples {
|
||||
ss := &SimulatedSamples{
|
||||
Timestamps: timestamp,
|
||||
Value: value,
|
||||
}
|
||||
mn := GetMetricName()
|
||||
defer PutMetricName(mn)
|
||||
mn.AccountID = accountID
|
||||
mn.ProjectID = projectID
|
||||
for k, v := range metric {
|
||||
mn.AddTag(k, v)
|
||||
}
|
||||
ss.Name.CopyFrom(mn)
|
||||
return ss
|
||||
}
|
||||
|
||||
// GetTimeRange returns time range for the given sq.
|
||||
|
||||
Reference in New Issue
Block a user