Compare commits

...

8 Commits

Author SHA1 Message Date
func25
879e444058 clean 2025-08-27 13:58:13 +07:00
func25
4d501b20fd clean 2025-08-27 11:33:44 +07:00
func25
65fa35dfdf clean 2025-08-27 10:50:20 +07:00
func25
3768919413 clean 2025-08-27 10:32:45 +07:00
func25
c65dc7b15a clean 2025-08-27 10:10:45 +07:00
func25
a762889e45 remove maxDebugSamples flag and limit checking 2025-08-27 09:40:53 +07:00
func25
3220760480 add debug functionality to QueryHandler 2025-08-27 09:40:53 +07:00
func25
6ae8855e29 update 2025-08-27 09:40:53 +07:00
15 changed files with 980 additions and 66 deletions

View File

@@ -397,6 +397,13 @@ func selectHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseW
return true
}
return true
case "prometheus/api/v1/config":
httpserver.EnableCORS(w, r)
if err := prometheus.ConfigHandler(qt, startTime, w, r); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
return true
case "prometheus/api/v1/export":
exportRequests.Inc()
if err := prometheus.ExportHandler(startTime, at, w, r); err != nil {
@@ -731,6 +738,13 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
expandWithExprsRequests.Inc()
prometheus.ExpandWithExprs(w, r)
return true
case "prometheus/extract-metric-exprs", "extract-metric-exprs":
startTime := time.Now()
if err := prometheus.ExtractMetricExprsHandler(startTime, w, r); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
return true
case "prometheus/prettify-query", "prettify-query":
prettifyQueryRequests.Inc()
prometheus.PrettifyQuery(w, r)

View File

@@ -88,10 +88,18 @@ type Results struct {
tbfs []*tmpBlocksFile
packedTimeseries []packedTimeseries
// the result is simulated
isSimulated bool
simulatedSeries []*storage.SimulatedSamples
}
// Len returns the number of results in rss.
func (rss *Results) Len() int {
if rss.isSimulated {
return len(rss.simulatedSeries)
}
return len(rss.packedTimeseries)
}
@@ -247,6 +255,10 @@ var defaultMaxWorkersPerQuery = func() int {
//
// rss becomes unusable after the call to RunParallel.
func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) error {
if rss.isSimulated {
return rss.runParallelSimulated(qt, f)
}
qt = qt.NewChild("parallel process of fetched data")
defer rss.closeTmpBlockFiles()
@@ -262,6 +274,94 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
return err
}
func (rss *Results) runParallelSimulated(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) error {
qt = qt.NewChild("parallel process of fetched data")
cb := f
if rss.shouldConvertTenantToLabels {
cb = func(rs *Result, workerID uint) error {
metricNameTenantToTags(&rs.MetricName)
return f(rs, workerID)
}
}
tmpResult := getTmpResult()
defer putTmpResult(tmpResult)
// For simplicity, let's process serially first. Parallelization can be added if needed.
// If parallelization is desired, it would mirror the worker pool logic of the original runParallel,
// but iterating over rss.simulatedSamples entries.
workerID := uint(0)
var firstErr error
for _, metric := range rss.simulatedSeries {
r := &tmpResult.rs
r.reset()
r.MetricName.CopyFrom(&metric.Name)
for i, ts := range metric.Timestamps {
if ts >= rss.tr.MinTimestamp && ts <= rss.tr.MaxTimestamp {
r.Values = append(r.Values, metric.Value[i])
r.Timestamps = append(r.Timestamps, ts)
}
}
// Sort timestamps chronologically to match real storage behavior.
// Real storage ensures chronological order through:
// 1. Block-level sorting by MinTimestamp
// 2. Within-block timestamp ordering via encoding.EnsureNonDecreasingSequence()
if len(r.Timestamps) > 1 {
// Create pairs for sorting
type timestampValue struct {
timestamp int64
value float64
}
pairs := make([]timestampValue, len(r.Timestamps))
for i := range r.Timestamps {
pairs[i] = timestampValue{
timestamp: r.Timestamps[i],
value: r.Values[i],
}
}
// Sort by timestamp
sort.Slice(pairs, func(i, j int) bool {
return pairs[i].timestamp < pairs[j].timestamp
})
// Extract back to separate slices
for i := range pairs {
r.Timestamps[i] = pairs[i].timestamp
r.Values[i] = pairs[i].value
}
}
// The input from the client is most likely already deduplicated, since it's emitted by
// vmselect. However, the client may modify the input instead of using the returned one.
dedupInterval := storage.GetDedupInterval()
if dedupInterval > 0 && len(r.Timestamps) > 0 {
r.Timestamps, r.Values = storage.DeduplicateSamples(r.Timestamps, r.Values, dedupInterval)
}
rowProcessed := len(r.Timestamps)
if rowProcessed > 0 {
err := cb(r, workerID)
if err != nil {
firstErr = err
break
}
}
}
// Count total samples across all series
totalSamples := 0
for _, metric := range rss.simulatedSeries {
totalSamples += len(metric.Timestamps)
}
qt.Donef("series=%d, samples=%d", len(rss.simulatedSeries), totalSamples)
return firstErr
}
func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) (int, error) {
tswsLen := len(rss.packedTimeseries)
if tswsLen == 0 {
@@ -1798,6 +1898,10 @@ func (e limitExceededErr) Error() string { return e.err.Error() }
//
// Results.RunParallel or Results.Cancel must be called on the returned Results.
func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery, deadline searchutil.Deadline) (*Results, bool, error) {
if len(sq.SimulatedSeries) > 0 {
return processSearchSimulated(qt, sq, deadline)
}
qt = qt.NewChild("fetch matching series: %s", sq)
defer qt.Done()
if deadline.Exceeded() {
@@ -1862,6 +1966,41 @@ func ProcessSearchQuery(qt *querytracer.Tracer, denyPartialResponse bool, sq *st
return &rss, isPartial, nil
}
func processSearchSimulated(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutil.Deadline) (*Results, bool, error) {
qt = qt.NewChild("fetch matching series (simulated): %s", sq)
defer qt.Done()
if deadline.Exceeded() {
return nil, false, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
tr := storage.TimeRange{
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
// Process simulated samples.
matchedSamples, err := storage.MatchSimulatedSamples(sq.TenantTokens[0].AccountID, sq.TenantTokens[0].ProjectID, sq.SimulatedSeries, sq.TagFilterss)
if err != nil {
return nil, false, fmt.Errorf("cannot match simulated samples: %w", err)
}
// Create a result set similar to ProcessSearchQuery
rss := &Results{
tr: tr,
deadline: deadline,
isSimulated: true,
simulatedSeries: matchedSamples,
}
if len(matchedSamples) == 0 {
qt.Printf("no matching series found")
} else {
qt.Printf("found %d series", len(rss.simulatedSeries))
}
return rss, false, nil
}
// ProcessBlocks calls processBlock per each block matching the given sq.
func ProcessBlocks(qt *querytracer.Tracer, denyPartialResponse bool, sq *storage.SearchQuery,
processBlock func(mb *storage.MetricBlock, workerID uint) error, deadline searchutil.Deadline,

View File

@@ -0,0 +1,20 @@
{% import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
) %}
{% stripspace %}
ConfigResponse generates response for /api/v1/config .
{% func ConfigResponse(config *ConfigData, qt *querytracer.Tracer) %}
{
"status":"success",
"data":{
"minStalenessInterval": {%q= config.MinStalenessInterval %},
"maxStalenessInterval": {%q= config.MaxStalenessInterval %}
}
{% code qt.Done() %}
{%= dumpQueryTrace(qt) %}
}
{% endfunc %}
{% endstripspace %}

View File

@@ -0,0 +1,73 @@
// Code generated by qtc from "config_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmselect/prometheus/config_response.qtpl:1
package prometheus
//line app/vmselect/prometheus/config_response.qtpl:1
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
)
// ConfigResponse generates response for /api/v1/config .
//line app/vmselect/prometheus/config_response.qtpl:8
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/config_response.qtpl:8
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/prometheus/config_response.qtpl:8
func StreamConfigResponse(qw422016 *qt422016.Writer, config *ConfigData, qt *querytracer.Tracer) {
//line app/vmselect/prometheus/config_response.qtpl:8
qw422016.N().S(`{"status":"success","data":{"minStalenessInterval":`)
//line app/vmselect/prometheus/config_response.qtpl:12
qw422016.N().Q(config.MinStalenessInterval)
//line app/vmselect/prometheus/config_response.qtpl:12
qw422016.N().S(`,"maxStalenessInterval":`)
//line app/vmselect/prometheus/config_response.qtpl:13
qw422016.N().Q(config.MaxStalenessInterval)
//line app/vmselect/prometheus/config_response.qtpl:13
qw422016.N().S(`}`)
//line app/vmselect/prometheus/config_response.qtpl:15
qt.Done()
//line app/vmselect/prometheus/config_response.qtpl:16
streamdumpQueryTrace(qw422016, qt)
//line app/vmselect/prometheus/config_response.qtpl:16
qw422016.N().S(`}`)
//line app/vmselect/prometheus/config_response.qtpl:18
}
//line app/vmselect/prometheus/config_response.qtpl:18
func WriteConfigResponse(qq422016 qtio422016.Writer, config *ConfigData, qt *querytracer.Tracer) {
//line app/vmselect/prometheus/config_response.qtpl:18
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/config_response.qtpl:18
StreamConfigResponse(qw422016, config, qt)
//line app/vmselect/prometheus/config_response.qtpl:18
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/config_response.qtpl:18
}
//line app/vmselect/prometheus/config_response.qtpl:18
func ConfigResponse(config *ConfigData, qt *querytracer.Tracer) string {
//line app/vmselect/prometheus/config_response.qtpl:18
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/config_response.qtpl:18
WriteConfigResponse(qb422016, config, qt)
//line app/vmselect/prometheus/config_response.qtpl:18
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/config_response.qtpl:18
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/config_response.qtpl:18
return qs422016
//line app/vmselect/prometheus/config_response.qtpl:18
}

View File

@@ -0,0 +1,18 @@
{% stripspace %}
ExtractMetricExprsResponse generates response for /extract-metric-exprs .
{% func ExtractMetricExprsResponse(metrics []string) %}
{
"status":"success",
"data":[
{% if len(metrics) > 0 %}
{%q= metrics[0] %}
{% for i := 1; i < len(metrics); i++ %}
,{%q= metrics[i] %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% endstripspace %}

View File

@@ -0,0 +1,69 @@
// Code generated by qtc from "extract_metric_exprs_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// ExtractMetricExprsResponse generates response for /extract-metric-exprs .
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
package prometheus
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
func StreamExtractMetricExprsResponse(qw422016 *qt422016.Writer, metrics []string) {
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
qw422016.N().S(`{"status":"success","data":[`)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:8
if len(metrics) > 0 {
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:9
qw422016.N().Q(metrics[0])
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:10
for i := 1; i < len(metrics); i++ {
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:10
qw422016.N().S(`,`)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:11
qw422016.N().Q(metrics[i])
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:12
}
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:13
}
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:13
qw422016.N().S(`]}`)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
}
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
func WriteExtractMetricExprsResponse(qq422016 qtio422016.Writer, metrics []string) {
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
StreamExtractMetricExprsResponse(qw422016, metrics)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
}
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
func ExtractMetricExprsResponse(metrics []string) string {
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
WriteExtractMetricExprsResponse(qb422016, metrics)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
return qs422016
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
}

View File

@@ -1,8 +1,10 @@
package prometheus
import (
"encoding/json"
"flag"
"fmt"
"io"
"math"
"net/http"
"runtime"
@@ -43,6 +45,9 @@ var (
maxLookback = flag.Duration("search.maxLookback", 0, "Synonym to -query.lookback-delta from Prometheus. "+
"The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. "+
"See also '-search.maxStalenessInterval' flag, which has the same meaning due to historical reasons")
minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+
"This flag could be useful for removing gaps on graphs generated from time series with irregular intervals between samples. "+
"See also '-search.maxStalenessInterval'")
maxStalenessInterval = flag.Duration("search.maxStalenessInterval", 0, "The maximum interval for staleness calculations. "+
"By default, it is automatically calculated from the median interval between samples. This flag could be useful for tuning "+
"Prometheus data model closer to Influx-style data model. See https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness for details. "+
@@ -118,7 +123,7 @@ func FederateHandler(startTime time.Time, at *auth.Token, w http.ResponseWriter,
if err != nil {
return err
}
lookbackDelta, err := getMaxLookback(r)
lookbackDelta, err := getMaxLookback(r, *maxStalenessInterval)
if err != nil {
return err
}
@@ -723,6 +728,55 @@ func TSDBStatusHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/tsdb"}`)
// ConfigData holds the current configuration values for search-related flags
type ConfigData struct {
MinStalenessInterval string
MaxStalenessInterval string
}
// ConfigHandler processes /api/v1/config request.
//
// It returns the current configuration for search-related flags.
func ConfigHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWriter, _ *http.Request) error {
config := &ConfigData{
MinStalenessInterval: (*minStalenessInterval).String(),
MaxStalenessInterval: (*maxStalenessInterval).String(),
}
w.Header().Set("Content-Type", "application/json")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteConfigResponse(bw, config, qt)
if err := bw.Flush(); err != nil {
return fmt.Errorf("cannot send config response to remote client: %w", err)
}
return nil
}
// ExtractMetricExprsHandler processes /extract-metric-exprs request.
//
// It extracts metric expressions from a given PromQL query.
func ExtractMetricExprsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
query := r.FormValue("query")
if len(query) == 0 {
return fmt.Errorf("missing `query` arg")
}
metrics, err := promql.ExtractMetricsFromQuery(query)
if err != nil {
return fmt.Errorf("cannot extract metrics from query: %w", err)
}
w.Header().Set("Content-Type", "application/json")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteExtractMetricExprsResponse(bw, metrics)
if err := bw.Flush(); err != nil {
return fmt.Errorf("cannot send extract metric exprs response to remote client: %w", err)
}
return nil
}
// LabelsHandler processes /api/v1/labels request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
@@ -847,7 +901,8 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w
ct := startTime.UnixNano() / 1e6
deadline := searchutil.GetDeadlineForQuery(r, startTime)
noCache := httputil.GetBool(r, "nocache")
isDebug := httputil.GetBool(r, "debug")
noCache := httputil.GetBool(r, "nocache") || isDebug
query := r.FormValue("query")
if len(query) == 0 {
return fmt.Errorf("missing `query` arg")
@@ -856,7 +911,7 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w
if err != nil {
return err
}
lookbackDelta, err := getMaxLookback(r)
lookbackDelta, err := getMaxLookback(r, *maxStalenessInterval)
if err != nil {
return err
}
@@ -942,29 +997,18 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w
} else {
queryOffset = 0
}
ec := &promql.EvalConfig{
Start: start,
End: start,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: *maxUniqueTimeseries,
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
NoCache: noCache,
LookbackDelta: lookbackDelta,
RoundDigits: getRoundDigits(r),
EnforcedTagFilterss: etfs,
CacheTagFilters: etfs,
GetRequestURI: func() string {
return httpserver.GetRequestURI(r)
},
DenyPartialResponse: httputil.GetDenyPartialResponse(r),
}
ec := newEvalConfig(r, start, start, step, deadline, noCache, lookbackDelta, isDebug, etfs)
err = populateAuthTokens(qt, ec, at, deadline)
if err != nil {
return fmt.Errorf("cannot populate auth tokens: %w", err)
}
if isDebug {
if err := populateSimulatedData(r, at, ec); err != nil {
_ = r.Body.Close()
return fmt.Errorf("cannot read simulated samples: %w", err)
}
}
_ = r.Body.Close()
qs := promql.NewQueryStats(query, at, ec)
ec.QueryStats = qs
@@ -1038,8 +1082,9 @@ func QueryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Token, w http.ResponseWriter, query string,
start, end, step int64, r *http.Request, ct int64, etfs [][]storage.TagFilter) error {
deadline := searchutil.GetDeadlineForQuery(r, startTime)
noCache := httputil.GetBool(r, "nocache")
lookbackDelta, err := getMaxLookback(r)
isDebug := httputil.GetBool(r, "debug")
noCache := httputil.GetBool(r, "nocache") || isDebug
lookbackDelta, err := getMaxLookback(r, *maxStalenessInterval)
if err != nil {
return err
}
@@ -1058,29 +1103,19 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
start, end = promql.AdjustStartEnd(start, end, step)
}
ec := &promql.EvalConfig{
Start: start,
End: end,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: *maxUniqueTimeseries,
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
NoCache: noCache,
LookbackDelta: lookbackDelta,
RoundDigits: getRoundDigits(r),
EnforcedTagFilterss: etfs,
CacheTagFilters: etfs,
GetRequestURI: func() string {
return httpserver.GetRequestURI(r)
},
DenyPartialResponse: httputil.GetDenyPartialResponse(r),
}
ec := newEvalConfig(r, start, end, step, deadline, noCache, lookbackDelta, isDebug, etfs)
err = populateAuthTokens(qt, ec, at, deadline)
if err != nil {
return fmt.Errorf("cannot populate auth tokens: %w", err)
}
if isDebug {
if err := populateSimulatedData(r, at, ec); err != nil {
_ = r.Body.Close()
return fmt.Errorf("cannot read simulated samples: %w", err)
}
}
_ = r.Body.Close()
qs := promql.NewQueryStats(query, at, ec)
ec.QueryStats = qs
@@ -1116,6 +1151,102 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, at *auth.Tok
return nil
}
func newEvalConfig(r *http.Request, start, end, step int64, deadline searchutil.Deadline, noCache bool, lookbackDelta int64, isDebug bool, etfs [][]storage.TagFilter) *promql.EvalConfig {
ec := &promql.EvalConfig{
Start: start,
End: end,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: *maxUniqueTimeseries,
MinStalenessInterval: *minStalenessInterval,
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
NoCache: noCache,
LookbackDelta: lookbackDelta,
RoundDigits: getRoundDigits(r),
EnforcedTagFilterss: etfs,
GetRequestURI: func() string {
return httpserver.GetRequestURI(r)
},
DenyPartialResponse: !isDebug && httputil.GetDenyPartialResponse(r),
}
return ec
}
func populateSimulatedData(r *http.Request, at *auth.Token, evalConfig *promql.EvalConfig) error {
type jsonExportBlockInput struct {
Metric map[string]string `json:"metric"`
Values []float64 `json:"values"`
Timestamps []int64 `json:"timestamps"`
}
// --- Read and Parse Input Samples from r.Body ---
var simulatedSeries []*storage.SimulatedSamples
decoder := json.NewDecoder(r.Body)
lineNum := 0
accountID := uint32(0)
projectID := uint32(0)
if at != nil {
accountID = at.AccountID
projectID = at.ProjectID
}
for {
var jeb jsonExportBlockInput
if err := decoder.Decode(&jeb); err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("error decoding input JSON on line %d: %w", lineNum, err)
}
// Validate that values and timestamps arrays have the same length
if len(jeb.Values) != len(jeb.Timestamps) {
return fmt.Errorf("mismatched values and timestamps arrays length in debug data on line %d: values=%d, timestamps=%d", lineNum, len(jeb.Values), len(jeb.Timestamps))
}
mn := storage.GetMetricName()
defer storage.PutMetricName(mn)
mn.AccountID = accountID
mn.ProjectID = projectID
for k, v := range jeb.Metric {
mn.AddTag(k, v)
}
ss := &storage.SimulatedSamples{
Value: jeb.Values,
Timestamps: jeb.Timestamps,
}
ss.Name.CopyFrom(mn)
simulatedSeries = append(simulatedSeries, ss)
lineNum++
}
// It doesn't make sense to debug with empty samples
if len(simulatedSeries) == 0 {
return fmt.Errorf("no simulated samples found")
}
minStalenessInterval, err := httputil.GetDurationRaw(r, "min_staleness_interval", evalConfig.MinStalenessInterval)
if err != nil {
return fmt.Errorf("cannot parse `min_staleness_interval` arg: %w", err)
}
maxStalenessInterval, err := httputil.GetDurationRaw(r, "max_staleness_interval", *maxStalenessInterval)
if err != nil {
return fmt.Errorf("cannot parse `max_staleness_interval` arg: %w", err)
}
evalConfig.SimulatedSamples = simulatedSeries
evalConfig.MinStalenessInterval = minStalenessInterval
evalConfig.LookbackDelta, err = getMaxLookback(r, maxStalenessInterval)
if err != nil {
return err
}
return nil
}
func populateAuthTokens(qt *querytracer.Tracer, ec *promql.EvalConfig, at *auth.Token, deadline searchutil.Deadline) error {
if at != nil {
ec.AuthTokens = []*auth.Token{at}
@@ -1215,7 +1346,7 @@ func adjustLastPoints(tss []netstorage.Result, start, end int64) []netstorage.Re
return tss
}
func getMaxLookback(r *http.Request) (int64, error) {
func getMaxLookback(r *http.Request, maxStalenessInterval time.Duration) (int64, error) {
d := maxLookback.Milliseconds()
if d == 0 {
d = maxStalenessInterval.Milliseconds()

View File

@@ -138,6 +138,10 @@ type EvalConfig struct {
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
LookbackDelta int64
// MaxStalenessInterval corresponds to -search.maxStalenessInterval,
// but customized per query request.
MinStalenessInterval time.Duration
// How many decimal digits after the point to leave in response.
RoundDigits int
@@ -168,6 +172,9 @@ type EvalConfig struct {
timestamps []int64
timestampsOnce sync.Once
// Simulated samples
SimulatedSamples []*storage.SimulatedSamples
}
// copyEvalConfig returns src copy.
@@ -190,6 +197,8 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig {
ec.DenyPartialResponse = src.DenyPartialResponse
ec.IsPartialResponse.Store(src.IsPartialResponse.Load())
ec.QueryStats = src.QueryStats
ec.MinStalenessInterval = src.MinStalenessInterval
ec.SimulatedSamples = src.SimulatedSamples
// do not copy src.timestamps - they must be generated again.
return &ec
@@ -949,7 +958,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
}
ecSQ := copyEvalConfig(ec)
ecSQ.Start -= window + step + maxSilenceInterval()
ecSQ.Start -= window + step + maxSilenceInterval(ec.MinStalenessInterval)
ecSQ.End += step
ecSQ.Step = step
ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries
@@ -967,7 +976,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
return nil, nil
}
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.IsMultiTenant)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.IsMultiTenant, ec.MinStalenessInterval)
if err != nil {
return nil, err
}
@@ -1720,7 +1729,7 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
}
// Obtain rollup configs before fetching data from db, so type errors could be caught earlier.
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.IsMultiTenant)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.IsMultiTenant, ec.MinStalenessInterval)
if err != nil {
return nil, err
}
@@ -1730,7 +1739,7 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
tfss = searchutil.JoinTagFilterss(tfss, ec.EnforcedTagFilterss)
minTimestamp := ec.Start
if needSilenceIntervalForRollupFunc[funcName] {
minTimestamp -= maxSilenceInterval()
minTimestamp -= maxSilenceInterval(ec.MinStalenessInterval)
}
if window > ec.Step {
minTimestamp -= window
@@ -1749,10 +1758,13 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
} else {
sq = storage.NewSearchQuery(ec.AuthTokens[0].AccountID, ec.AuthTokens[0].ProjectID, minTimestamp, ec.End, tfss, ec.MaxSeries)
}
sq.SimulatedSeries = ec.SimulatedSamples
rss, isPartial, err := netstorage.ProcessSearchQuery(qt, ec.DenyPartialResponse, sq, ec.Deadline)
if err != nil {
return nil, err
}
ec.updateIsPartialResponse(isPartial)
qs := ec.QueryStats
rssLen := rss.Len()
@@ -1835,7 +1847,7 @@ func getRollupMemoryLimiter() *memoryLimiter {
return &rollupMemoryLimiter
}
func maxSilenceInterval() int64 {
func maxSilenceInterval(minStalenessInterval time.Duration) int64 {
d := minStalenessInterval.Milliseconds()
if d <= 0 {
d = 5 * 60 * 1000

View File

@@ -67,12 +67,15 @@ func Exec(qt *querytracer.Tracer, ec *EvalConfig, q string, isFirstPointOnly boo
}
}
var rv []*timeseries
qid := activeQueriesV.Add(ec, q)
rv, err := evalExpr(qt, ec, e)
rv, err = evalExpr(qt, ec, e)
activeQueriesV.Remove(qid)
if err != nil {
return nil, err
}
if isFirstPointOnly {
// Remove all the points except the first one from every time series.
for _, ts := range rv {
@@ -331,3 +334,23 @@ func escapeDots(s string) string {
}
return string(result)
}
// ExtractMetricsFromQuery visits all the expressions in query and returns all the metrics found in the query.
func ExtractMetricsFromQuery(query string) ([]string, error) {
expr, err := metricsql.Parse(query)
if err != nil {
return nil, fmt.Errorf("error parsing query: %w", err)
}
var metrics []string
metricsql.VisitAll(expr, func(e metricsql.Expr) {
if me, ok := e.(*metricsql.MetricExpr); ok {
metricStr := string(me.AppendString(nil))
if metricStr != "" {
metrics = append(metrics, metricStr)
}
}
})
return metrics, nil
}

View File

@@ -0,0 +1,329 @@
package promql
import (
"math"
"slices"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
func TestSimulatedExec(t *testing.T) {
accountID := uint32(123)
projectID := uint32(567)
start := int64(1000e3)
end := int64(2000e3)
step := int64(200e3)
// Base EvalConfig that will be copied for each test
baseEC := EvalConfig{
AuthTokens: []*auth.Token{{
AccountID: accountID,
ProjectID: projectID,
}},
Start: start,
End: end,
Step: step,
MaxPointsPerSeries: 1e4,
MaxSeries: 1000,
Deadline: searchutil.NewDeadline(time.Now(), time.Hour, ""),
RoundDigits: 100,
NoCache: true,
}
t.Run(`simple_metric_exact_match`, func(t *testing.T) {
t.Skip()
ec := copyEvalConfig(&baseEC)
mn := newMetric(accountID, projectID,
"__name__", "test_metric",
"a", "b",
)
ec.SimulatedSamples = []*storage.SimulatedSamples{mn.build()}
q := `test_metric{a="b"}`
result, err := Exec(nil, ec, q, false)
if err != nil {
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
}
// Expected result
expectedMN := storage.MetricName{
AccountID: accountID,
ProjectID: projectID,
MetricGroup: []byte("test_metric"),
Tags: []storage.Tag{
{
Key: []byte("a"),
Value: []byte("b"),
},
},
}
expectedResult := []netstorage.Result{
{
MetricName: expectedMN,
Values: mn.Value,
Timestamps: mn.Timestamps,
},
}
testResultsEqual(t, result, expectedResult, false)
})
t.Run(`filtered_by_tag_value`, func(t *testing.T) {
t.Skip()
// Create a copy of base EvalConfig
ec := copyEvalConfig(&baseEC)
mn := metricBuilders{
newMetric(accountID, projectID,
"__name__", "test_metric",
"a", "b",
"region", "us-west",
),
newMetric(accountID, projectID,
"__name__", "test_metric",
"a", "b",
"region", "us-east",
),
}
ec.SimulatedSamples = mn.build()
q := `test_metric{region="us-west"}`
result, err := Exec(nil, ec, q, false)
if err != nil {
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
}
// Expected result
expectedMN := storage.MetricName{
AccountID: accountID,
ProjectID: projectID,
MetricGroup: []byte("test_metric"),
Tags: []storage.Tag{
{
Key: []byte("a"),
Value: []byte("b"),
},
{
Key: []byte("region"),
Value: []byte("us-west"),
},
},
}
expectedResult := []netstorage.Result{
{
MetricName: expectedMN,
Values: mn[0].Value,
Timestamps: mn[0].Timestamps,
},
}
testResultsEqual(t, result, expectedResult, false)
})
t.Run(`regex_match_on_tag`, func(t *testing.T) {
ec := copyEvalConfig(&baseEC)
mn := metricBuilders{
newMetric(accountID, projectID,
"__name__", "test_metric",
"env", "prod",
),
newMetric(accountID, projectID,
"__name__", "test_metric",
"env", "staging",
),
newMetric(accountID, projectID,
"__name__", "test_metric",
"env", "dev",
),
}
ec.SimulatedSamples = mn.build()
q := `test_metric{env=~"prod|staging"}`
result, err := Exec(nil, ec, q, false)
if err != nil {
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
}
expectedResult := []netstorage.Result{mn[0].toResult(), mn[1].toResult()}
testResultsEqual(t, result, expectedResult, false)
})
}
func TestSumOverTime(t *testing.T) {
accountID := uint32(123)
projectID := uint32(567)
start := int64(1000e3)
end := int64(1300e3)
step := int64(30e3)
baseEC := EvalConfig{
AuthTokens: []*auth.Token{{
AccountID: accountID,
ProjectID: projectID,
}},
Start: start,
End: end,
Step: step,
MaxPointsPerSeries: 1e4,
MaxSeries: 1000,
Deadline: searchutil.NewDeadline(time.Now(), time.Hour, ""),
RoundDigits: 100,
NoCache: true,
}
t.Run(`basic_sum_over_time`, func(t *testing.T) {
ec := copyEvalConfig(&baseEC)
metric := newMetric(accountID, projectID,
"__name__", "test_metric",
"app", "api-server",
).withValues(1, 2, 3, 4, 5, 6).withUnix(1000, 1015, 1030, 1045, 1060, 1075)
ec.SimulatedSamples = []*storage.SimulatedSamples{metric.build()}
q := `sum_over_time(test_metric[30s])`
result, err := Exec(nil, ec, q, false)
if err != nil {
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
}
expectedResult := []netstorage.Result{
newMetric(accountID, projectID,
"app", "api-server",
).withValues(1, 5, 9, 6).withUnix(1000, 1030, 1060, 1090).toResult(),
}
testSimulatedResultsEqual(t, result, expectedResult, false)
})
}
type metricBuilder storage.SimulatedSamples
func newMetric(accountID uint32, projectID uint32, pairs ...string) *metricBuilder {
mn := storage.MetricName{
AccountID: accountID,
ProjectID: projectID,
}
for i := 0; i < len(pairs); i += 2 {
mn.AddTag(pairs[i], pairs[i+1])
}
return &metricBuilder{
Name: mn,
Value: []float64{10, 20, 30, 40, 50, 60},
Timestamps: []int64{1000e3, 1200e3, 1400e3, 1600e3, 1800e3, 2000e3},
}
}
func (b *metricBuilder) withUnix(unix ...int64) *metricBuilder {
b.Timestamps = make([]int64, len(unix))
for i := range unix {
b.Timestamps[i] = unix[i] * 1e3
}
return b
}
func (b *metricBuilder) withValues(values ...float64) *metricBuilder {
b.Value = values
return b
}
func (b *metricBuilder) build() *storage.SimulatedSamples {
return (*storage.SimulatedSamples)(b)
}
func (b *metricBuilder) toResult() netstorage.Result {
return netstorage.Result{
MetricName: b.Name,
Values: b.Value,
Timestamps: b.Timestamps,
}
}
type metricBuilders []*metricBuilder
func (b metricBuilders) build() []*storage.SimulatedSamples {
ss := make([]*storage.SimulatedSamples, len(b))
for i := range b {
ss[i] = b[i].build()
}
return ss
}
func testSimulatedResultsEqual(t *testing.T, result, resultExpected []netstorage.Result, verifyTenant bool) {
t.Helper()
result = removeEmptyValuesAndTimeseries(result)
if len(result) != len(resultExpected) {
t.Fatalf(`unexpected timeseries count; got %d; want %d`, len(result), len(resultExpected))
}
for i := range result {
r := &result[i]
rExpected := &resultExpected[i]
testMetricNamesEqual(t, &r.MetricName, &rExpected.MetricName, verifyTenant, i)
testRowsEqual(t, r.Values, r.Timestamps, rExpected.Values, rExpected.Timestamps)
}
}
func removeEmptyValuesAndTimeseries(tss []netstorage.Result) []netstorage.Result {
dst := tss[:0]
for i := range tss {
ts := &tss[i]
hasNaNs := slices.ContainsFunc(ts.Values, math.IsNaN)
if !hasNaNs {
// Fast path: nothing to remove.
if len(ts.Values) > 0 {
dst = append(dst, *ts)
}
continue
}
// Slow path: remove NaNs.
srcTimestamps := ts.Timestamps
dstValues := ts.Values[:0]
// Do not reuse ts.Timestamps for dstTimestamps, since ts.Timestamps
// may be shared among multiple time series.
dstTimestamps := make([]int64, 0, len(ts.Timestamps))
for j, v := range ts.Values {
if math.IsNaN(v) {
continue
}
dstValues = append(dstValues, v)
dstTimestamps = append(dstTimestamps, srcTimestamps[j])
}
ts.Values = dstValues
ts.Timestamps = dstTimestamps
if len(ts.Values) > 0 {
dst = append(dst, *ts)
}
}
return dst
}
func TestExtractMetricsFromQuery(t *testing.T) {
query := `(vm_free_disk_space_bytes{job=~"$job", instance=~"$instance"}-vm_free_disk_space_limit_bytes{job=~"$job", instance=~"$instance"})
/
ignoring(path) (
(rate(vm_rows_added_to_storage_total{job=~"$job", instance=~"$instance"}[1d]) -
sum(rate(vm_deduplicated_samples_total{job=~"$job", instance=~"$instance"}[1d])) without (type)) *
(
sum(vm_data_size_bytes{job=~"$job", instance=~"$instance", type!~"indexdb.*"}) without(type) /
sum(vm_rows{job=~"$job", instance=~"$instance", type!~"indexdb.*"}) without(type)
)
+
rate(vm_new_timeseries_created_total{job=~"$job", instance=~"$instance"}[1d]) *
(
sum(vm_data_size_bytes{job=~"$job", instance=~"$instance", type="indexdb/file"}) /
sum(vm_rows{job=~"$job", instance=~"$instance", type="indexdb/file"})
)
)`
metrics, err := ExtractMetricsFromQuery(query)
if err != nil {
t.Fatalf(`unexpected error when extracting metrics from query: %s`, err)
}
t.Logf(`metrics: %v`, metrics)
}

View File

@@ -1,12 +1,12 @@
package promql
import (
"flag"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
@@ -17,10 +17,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
var minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+
"This flag could be useful for removing gaps on graphs generated from time series with irregular intervals between samples. "+
"See also '-search.maxStalenessInterval'")
var rollupFuncs = map[string]newRollupFunc{
"absent_over_time": newRollupFuncOneArg(rollupAbsent),
"aggr_over_time": newRollupFuncTwoArgs(rollupFake),
@@ -372,7 +368,7 @@ func getRollupTag(expr metricsql.Expr) (string, error) {
}
func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start, end, step int64, maxPointsPerSeries int,
window, lookbackDelta int64, sharedTimestamps []int64, isMultiTenant bool) (
window, lookbackDelta int64, sharedTimestamps []int64, isMultiTenant bool, minStalenessInterval time.Duration) (
func(values []float64, timestamps []int64), []*rollupConfig, error) {
preFunc := func(_ []float64, _ []int64) {}
funcName = strings.ToLower(funcName)
@@ -409,6 +405,7 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start
isDefaultRollup: funcName == "default_rollup",
samplesScannedPerCall: samplesScannedPerCall,
isMultiTenant: isMultiTenant,
minStalenessInterval: minStalenessInterval,
}
}
@@ -605,6 +602,9 @@ type rollupConfig struct {
// Whether the rollup is used in multi-tenant mode.
// This is used in order to populate labels with tenancy information.
isMultiTenant bool
// The minimum interval for staleness calculations.
minStalenessInterval time.Duration
}
func (rc *rollupConfig) getTimestamps() []int64 {
@@ -728,8 +728,8 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
maxPrevInterval = rc.LookbackDelta
}
if *minStalenessInterval > 0 {
if msi := minStalenessInterval.Milliseconds(); msi > 0 && maxPrevInterval < msi {
if rc.minStalenessInterval > 0 {
if msi := rc.minStalenessInterval.Milliseconds(); msi > 0 && maxPrevInterval < msi {
maxPrevInterval = msi
}
}

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"strconv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
)
@@ -34,4 +35,31 @@ func GetDuration(r *http.Request, argKey string, defaultValue int64) (int64, err
return msecs, nil
}
// GetDurationRaw returns time.Duration from the given argKey query arg.
func GetDurationRaw(r *http.Request, argKey string, defaultValue time.Duration) (time.Duration, error) {
argValue := r.FormValue(argKey)
if len(argValue) == 0 {
return defaultValue, nil
}
if argValue == "undefined" {
// This hack is needed for Grafana, which may send undefined value
return defaultValue, nil
}
secs, err := strconv.ParseFloat(argValue, 64)
if err != nil {
// Try parsing string format
d, err := timeutil.ParseDuration(argValue)
if err != nil {
return 0, fmt.Errorf("cannot parse %q=%q: %w", argKey, argValue, err)
}
return d, nil
}
d := time.Duration(secs * float64(time.Second))
msecs := d.Milliseconds()
if msecs <= 0 || msecs > maxDurationMsecs {
return 0, fmt.Errorf("%s=%s is out of allowed range [%s ... %s]", argKey, d, time.Millisecond, time.Duration(maxDurationMsecs)*time.Millisecond)
}
return d, nil
}
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000

View File

@@ -2236,6 +2236,43 @@ func hasCompositeTagFilters(tfs []*tagFilter, prefix []byte) bool {
return false
}
// MatchSimulatedSamples filters the given simulatedSamples against the provided tag filters.
// It returns only the simulated samples that match any of the given tag filter sets.
// This function is used for debugging and testing purposes to simulate metric queries.
func MatchSimulatedSamples(accountID, projectID uint32, simulatedSamples []*SimulatedSamples, tagFilterss [][]TagFilter) ([]*SimulatedSamples, error) {
var kb bytesutil.ByteBuffer
matchedSamples := make([]*SimulatedSamples, 0, 1)
for _, rawTfs := range tagFilterss {
tfs := NewTagFilters(accountID, projectID)
for _, tf := range rawTfs {
err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp)
if err != nil {
return nil, fmt.Errorf("cannot add tagFilter %s: %w", tf.String(), err)
}
}
for idx, mn := range simulatedSamples {
ok, err := matchTagFilters(&mn.Name, toTFPointers(tfs.tfs), &kb)
if err != nil {
return nil, fmt.Errorf("cannot match MetricName %s against tagFilters: %w", mn.Name.String(), err)
}
if ok {
matchedSamples = append(matchedSamples, simulatedSamples[idx])
}
}
}
return matchedSamples, nil
}
func toTFPointers(tfs []tagFilter) []*tagFilter {
tfps := make([]*tagFilter, len(tfs))
for i := range tfs {
tfps[i] = &tfs[i]
}
return tfps
}
func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) {
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs, mn.AccountID, mn.ProjectID)
for i, tf := range tfs {

View File

@@ -2162,14 +2162,6 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
fs.MustRemoveDir(path)
}
func toTFPointers(tfs []tagFilter) []*tagFilter {
tfps := make([]*tagFilter, len(tfs))
for i := range tfs {
tfps[i] = &tfs[i]
}
return tfps
}
func newTestStorage() *Storage {
s := &Storage{
cachePath: "test-storage-cache",

View File

@@ -362,6 +362,35 @@ type SearchQuery struct {
// The maximum number of time series the search query can return.
MaxMetrics int
// SimulatedSeries is used for simulating samples returned from storage nodes.
SimulatedSeries []*SimulatedSamples
}
// SimulatedSamples represents simulated metric samples for debug and testing purposes.
// It contains metric name, timestamps and corresponding values.
type SimulatedSamples struct {
Name MetricName
Timestamps []int64
Value []float64
}
// NewSimulatedSeries creates a new SimulatedSamples instance with the given parameters.
// It constructs a metric name from the provided accountID, projectID and metric labels.
func NewSimulatedSeries(accountID, projectID uint32, metric map[string]string, timestamp []int64, value []float64) *SimulatedSamples {
ss := &SimulatedSamples{
Timestamps: timestamp,
Value: value,
}
mn := GetMetricName()
defer PutMetricName(mn)
mn.AccountID = accountID
mn.ProjectID = projectID
for k, v := range metric {
mn.AddTag(k, v)
}
ss.Name.CopyFrom(mn)
return ss
}
// GetTimeRange returns time range for the given sq.