Compare commits

..

10 Commits

Author SHA1 Message Date
func25
f9e5881303 clean 2025-08-27 13:58:07 +07:00
func25
ab6fd0afed clean 2025-08-27 11:33:49 +07:00
func25
8f8ead2c50 clean 2025-08-27 10:47:30 +07:00
func25
2f422bad85 clean 2025-08-27 10:46:54 +07:00
func25
c0a41b41ca missing after cherry-pick 2025-08-27 10:27:05 +07:00
func25
68e493cef3 remove maxDebugSamples flag and limit checking 2025-08-27 10:12:17 +07:00
func25
06572772d4 update 2025-08-27 10:11:55 +07:00
func25
d12f6c280f update 2025-08-27 10:08:30 +07:00
Alexander Frolov
e62e0685dc vmctl: inconsistent vm-native logs (#9607)
### Describe Your Changes

Some messages were written to `stdout` using `fmt.Printf` and
`fmt.Println`, while the other messages like import statistics were
written to `stderr` through the `log` package.

This led to ordering problems where the `Import finished!` +
`VictoriaMetrics importer stats` messages, which expected to be the last
messages, appeared before `Continue import process with filter`
messages, creating confusing output for users.

```
2025/08/20 13:07:26 Import finished!
2025/08/20 13:07:26 VictoriaMetrics importer stats:
  time spent while importing: 20h49m10.8497184s;
  total bytes: 277.1 GB;
  bytes/s: 3.7 MB;
  requests: 7978614;
  requests retries: 0;
2025/08/20 13:07:26 Total time: 20h49m10.851006088s
Continue import process with filter
        filter: match[]={__name__!=""}
        start: 2025-08-08T00:00:00Z
        end: 2025-08-15T00:00:00Z:
Continue import process with filter
        filter: match[]={__name__!=""}
        start: 2025-08-15T00:00:00Z
        end: 2025-08-19T16:18:15Z:
```


### Checklist

The following checks are **mandatory**:

- [x] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [x] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).
2025-08-26 18:53:59 +03:00
Max Kotliar
df92e617db Revert "app/{vminsert,vmagent}: added flags for periodical relabel and stream aggregation configs check (#9598)"
This reverts commit 07291c1d62 and partly
7c0c8cc702.

The reasons explained in
https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9598#issuecomment-3223766551
2025-08-26 14:42:35 +03:00
27 changed files with 982 additions and 229 deletions

View File

@@ -481,7 +481,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
if !httpserver.CheckAuthFlag(w, r, reloadAuthKey) {
return true
}
configReloadRequests.Inc()
promscrapeConfigReloadRequests.Inc()
procutil.SelfSIGHUP()
w.WriteHeader(http.StatusOK)
return true
@@ -747,7 +747,7 @@ var (
promscrapeConfigRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/config"}`)
promscrapeStatusConfigRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/status/config"}`)
configReloadRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/-/reload"}`)
promscrapeConfigReloadRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/-/reload"}`)
)
func usage() {

View File

@@ -3,11 +3,9 @@ package remotewrite
import (
"flag"
"fmt"
"os"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@@ -32,8 +30,6 @@ var (
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
"See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels")
relabelConfigCheckInterval = flag.Duration("relabel.configCheckInterval", 0, "Interval for checking for changes in configurations defined via "+
"-remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig flags. By default, the checking is disabled.")
)
var labelsGlobal []prompb.Label
@@ -71,15 +67,13 @@ func initRelabelConfigs() {
}
}
func reloadRelabelConfigs(logReload bool) {
func reloadRelabelConfigs() {
rcs := allRelabelConfigs.Load()
if !rcs.isSet() {
return
}
relabelConfigReloads.Inc()
if logReload {
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
}
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
rcs, err := loadRelabelConfigs()
if err != nil {
relabelConfigReloadErrors.Inc()
@@ -277,26 +271,3 @@ func fixPromCompatibleNaming(labels []prompb.Label) {
}
}
}
func startRelabelConfigReloader(sighupCh <-chan os.Signal) {
configReloaderWG.Add(1)
go func() {
var tickerCh <-chan time.Time
if *relabelConfigCheckInterval > 0 {
ticker := time.NewTicker(*relabelConfigCheckInterval)
tickerCh = ticker.C
defer ticker.Stop()
}
defer configReloaderWG.Done()
for {
select {
case <-configReloaderStopCh:
return
case <-sighupCh:
reloadRelabelConfigs(true)
case <-tickerCh:
reloadRelabelConfigs(false)
}
}
}()
}

View File

@@ -213,8 +213,20 @@ func Init() {
dropDanglingQueues()
startRelabelConfigReloader(sighupCh)
startStreamAggrConfigReloader(sighupCh)
// Start config reloader.
configReloaderWG.Add(1)
go func() {
defer configReloaderWG.Done()
for {
select {
case <-configReloaderStopCh:
return
case <-sighupCh:
}
reloadRelabelConfigs()
reloadStreamAggrConfigs()
}
}()
}
func dropDanglingQueues() {

View File

@@ -3,9 +3,7 @@ package remotewrite
import (
"flag"
"fmt"
"os"
"strings"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
@@ -67,8 +65,6 @@ var (
streamAggrEnableWindows = flagutil.NewArrayBool("remoteWrite.streamAggr.enableWindows", "Enables aggregation within fixed windows for all remote write's aggregators. "+
"This allows to get more precise results, but impacts resource usage as it requires twice more memory to store two states. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows.")
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking for changes in configurations defined via "+
"-streamAggr.config and -remoteWrite.streamAggr.config flags. By default, the checking is disabled.")
)
// CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config.
@@ -95,22 +91,20 @@ func CheckStreamAggrConfigs() error {
return nil
}
func reloadStreamAggrConfigs(logReload bool) {
reloadStreamAggrConfigGlobal(logReload)
func reloadStreamAggrConfigs() {
reloadStreamAggrConfigGlobal()
for _, rwctx := range rwctxsGlobal {
rwctx.reloadStreamAggrConfig(logReload)
rwctx.reloadStreamAggrConfig()
}
}
func reloadStreamAggrConfigGlobal(logReload bool) {
func reloadStreamAggrConfigGlobal() {
path := *streamAggrGlobalConfig
if path == "" {
return
}
if logReload {
logger.Infof("reloading stream aggregation configs pointed by -streamAggr.config=%q", path)
}
logger.Infof("reloading stream aggregation configs pointed by -streamAggr.config=%q", path)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
sasNew, err := newStreamAggrConfigGlobal()
@@ -128,9 +122,7 @@ func reloadStreamAggrConfigGlobal(logReload bool) {
logger.Infof("successfully reloaded -streamAggr.config=%q", path)
} else {
sasNew.MustStop()
if logReload {
logger.Infof("-streamAggr.config=%q wasn't changed since the last reload", path)
}
logger.Infof("-streamAggr.config=%q wasn't changed since the last reload", path)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
@@ -179,15 +171,13 @@ func (rwctx *remoteWriteCtx) initStreamAggrConfig() {
}
}
func (rwctx *remoteWriteCtx) reloadStreamAggrConfig(logReload bool) {
func (rwctx *remoteWriteCtx) reloadStreamAggrConfig() {
path := streamAggrConfig.GetOptionalArg(rwctx.idx)
if path == "" {
return
}
if logReload {
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
}
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
sasNew, err := rwctx.newStreamAggrConfig()
@@ -205,9 +195,7 @@ func (rwctx *remoteWriteCtx) reloadStreamAggrConfig(logReload bool) {
logger.Infof("successfully reloaded -remoteWrite.streamAggr.config=%q", path)
} else {
sasNew.MustStop()
if logReload {
logger.Infof("-remoteWrite.streamAggr.config=%q wasn't changed since the last reload", path)
}
logger.Infof("-remoteWrite.streamAggr.config=%q wasn't changed since the last reload", path)
}
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
@@ -268,26 +256,3 @@ func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamag
}
return sas, nil
}
func startStreamAggrConfigReloader(sighupCh <-chan os.Signal) {
configReloaderWG.Add(1)
go func() {
var tickerCh <-chan time.Time
if *streamAggrConfigCheckInterval > 0 {
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
tickerCh = ticker.C
defer ticker.Stop()
}
defer configReloaderWG.Done()
for {
select {
case <-configReloaderStopCh:
return
case <-sighupCh:
reloadStreamAggrConfigs(true)
case <-tickerCh:
reloadStreamAggrConfigs(false)
}
}
}()
}

View File

@@ -121,7 +121,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
pr := bar.NewProxyReader(reader)
if pr != nil {
reader = pr
fmt.Printf("Continue import process with filter %s:\n", f.String())
fmt.Fprintf(log.Writer(), "Continue import process with filter %s:\n", f.String())
}
}
@@ -191,7 +191,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
initParams = []any{srcURL, dstURL, p.filter.String(), tenantID}
}
fmt.Println("") // extra line for better output formatting
fmt.Fprintln(log.Writer(), "") // extra line for better output formatting
log.Printf(initMessage, initParams...)
if len(ranges) > 1 {
log.Printf("Selected time range will be split into %d ranges according to %q step", len(ranges), p.filter.Chunk)

View File

@@ -5,7 +5,6 @@ import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
@@ -20,8 +19,6 @@ import (
)
var (
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking stream aggregation configuration. "+
"By default, the checking is disabled.")
streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/ . "+
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
@@ -115,17 +112,10 @@ func InitStreamAggr() {
// Start config reloader.
saCfgReloaderWG.Add(1)
go func() {
var tickerCh <-chan time.Time
if *streamAggrConfigCheckInterval > 0 {
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
tickerCh = ticker.C
defer ticker.Stop()
}
defer saCfgReloaderWG.Done()
for {
select {
case <-sighupCh:
case <-tickerCh:
case <-saCfgReloaderStopCh:
return
}

View File

@@ -124,7 +124,6 @@ func Stop() {
}
protoparserutil.StopUnmarshalWorkers()
common.MustStopStreamAggr()
relabel.Stop()
}
// RequestHandler is a handler for Prometheus remote storage write API

View File

@@ -3,9 +3,7 @@ package relabel
import (
"flag"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
@@ -18,14 +16,11 @@ import (
var (
relabelConfig = flag.String("relabelConfig", "", "Optional path to a file with relabeling rules, which are applied to all the ingested metrics. "+
"The path can point either to local file or to http url. "+
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#relabeling for details. The config is reloaded on SIGHUP signal or"+
"at the interval specified by -relabelConfigCheckInterval.")
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#relabeling for details. The config is reloaded on SIGHUP signal")
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
"See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels")
relabelConfigCheckInterval = flag.Duration("relabelConfigCheckInterval", 0, "Interval for checking for changes in '-relabelConfig' file. "+
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes")
)
// Init must be called after flag.Parse and before using the relabel package.
@@ -44,8 +39,6 @@ func Init() {
return
}
globalStopChan = make(chan struct{})
relabelWG.Add(1)
configReloads = metrics.NewCounter(`vm_relabel_config_reloads_total`)
configReloadErrors = metrics.NewCounter(`vm_relabel_config_reloads_errors_total`)
configSuccess = metrics.NewGauge(`vm_relabel_config_last_reload_successful`, nil)
@@ -56,46 +49,17 @@ func Init() {
configTimestamp.Set(fasttime.UnixTimestamp())
go func() {
defer relabelWG.Done()
var tickerCh <-chan time.Time
if *relabelConfigCheckInterval > 0 {
ticker := time.NewTicker(*relabelConfigCheckInterval)
tickerCh = ticker.C
defer ticker.Stop()
}
var noChangesLogFn func()
for {
select {
case <-sighupCh:
logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig)
noChangesLogFn = func() {
logger.Infof("nothing changed in %q", *relabelConfig)
}
case <-tickerCh:
// silently skip logging for the unchanged config files
noChangesLogFn = func() {}
case <-globalStopChan:
logger.Infof("stopping relabel config reloader")
return
}
pcsNew, err := loadRelabelConfig()
for range sighupCh {
configReloads.Inc()
logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig)
pcs, err := loadRelabelConfig()
if err != nil {
configReloadErrors.Inc()
configSuccess.Set(0)
logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err)
continue
}
if pcsNew.String() == pcs.String() {
// set success to 1 since previous reload could have been unsuccessful
// do not update configTimestamp as config version remains old.
configSuccess.Set(1)
noChangesLogFn()
continue
}
configReloads.Inc()
pcs = pcsNew
pcsGlobal.Store(pcsNew)
pcsGlobal.Store(pcs)
configSuccess.Set(1)
configTimestamp.Set(fasttime.UnixTimestamp())
logger.Infof("successfully reloaded -relabelConfig=%q", *relabelConfig)
@@ -103,21 +67,6 @@ func Init() {
}()
}
// Stop stops relabel config reloader watchers
func Stop() {
if len(*relabelConfig) == 0 {
return
}
close(globalStopChan)
relabelWG.Wait()
}
var (
globalStopChan chan struct{}
relabelWG sync.WaitGroup
)
var (
configReloads *metrics.Counter
configReloadErrors *metrics.Counter

View File

@@ -262,6 +262,13 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
return true
}
return true
case "/api/v1/config":
httpserver.EnableCORS(w, r)
if err := prometheus.ConfigHandler(qt, startTime, w, r); err != nil {
httpserver.SendPrometheusError(w, r, err)
return true
}
return true
case "/api/v1/export":
exportRequests.Inc()
if err := prometheus.ExportHandler(startTime, w, r); err != nil {
@@ -538,6 +545,13 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
expandWithExprsRequests.Inc()
prometheus.ExpandWithExprs(w, r)
return true
case "/extract-metric-exprs":
startTime := time.Now()
if err := prometheus.ExtractMetricExprsHandler(startTime, w, r); err != nil {
httpserver.Errorf(w, r, "%s", err)
return true
}
return true
case "/prettify-query":
prettifyQueryRequests.Inc()
prometheus.PrettifyQuery(w, r)

View File

@@ -63,10 +63,18 @@ type Results struct {
packedTimeseries []packedTimeseries
sr *storage.Search
tbf *tmpBlocksFile
// the result is simulated
isSimulated bool
simulatedSeries []*storage.SimulatedSamples
}
// Len returns the number of results in rss.
func (rss *Results) Len() int {
if rss.isSimulated {
return len(rss.simulatedSeries)
}
return len(rss.packedTimeseries)
}
@@ -218,6 +226,10 @@ var defaultMaxWorkersPerQuery = func() int {
//
// rss becomes unusable after the call to RunParallel.
func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) error {
if rss.isSimulated {
return rss.runParallelSimulated(qt, f)
}
qt = qt.NewChild("parallel process of fetched data")
defer rss.mustClose()
@@ -233,6 +245,87 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
return err
}
func (rss *Results) runParallelSimulated(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) error {
qt = qt.NewChild("parallel process of fetched data")
cb := f
tmpResult := getTmpResult()
defer putTmpResult(tmpResult)
// For simplicity, let's process serially first. Parallelization can be added if needed.
// If parallelization is desired, it would mirror the worker pool logic of the original runParallel,
// but iterating over rss.simulatedSamples entries.
workerID := uint(0)
var firstErr error
for _, metric := range rss.simulatedSeries {
r := &tmpResult.rs
r.reset()
r.MetricName.CopyFrom(&metric.Name)
for i, ts := range metric.Timestamps {
if ts >= rss.tr.MinTimestamp && ts <= rss.tr.MaxTimestamp {
r.Values = append(r.Values, metric.Value[i])
r.Timestamps = append(r.Timestamps, ts)
}
}
// Sort timestamps chronologically to match real storage behavior.
// Real storage ensures chronological order through:
// 1. Block-level sorting by MinTimestamp
// 2. Within-block timestamp ordering via encoding.EnsureNonDecreasingSequence()
if len(r.Timestamps) > 1 {
// Create pairs for sorting
type timestampValue struct {
timestamp int64
value float64
}
pairs := make([]timestampValue, len(r.Timestamps))
for i := range r.Timestamps {
pairs[i] = timestampValue{
timestamp: r.Timestamps[i],
value: r.Values[i],
}
}
// Sort by timestamp
sort.Slice(pairs, func(i, j int) bool {
return pairs[i].timestamp < pairs[j].timestamp
})
// Extract back to separate slices
for i := range pairs {
r.Timestamps[i] = pairs[i].timestamp
r.Values[i] = pairs[i].value
}
}
// The input from the client is most likely already deduplicated, since it's emitted by
// vmselect. However, the client may modify the input instead of using the returned one.
dedupInterval := storage.GetDedupInterval()
if dedupInterval > 0 && len(r.Timestamps) > 0 {
r.Timestamps, r.Values = storage.DeduplicateSamples(r.Timestamps, r.Values, dedupInterval)
}
rowProcessed := len(r.Timestamps)
if rowProcessed > 0 {
err := cb(r, workerID)
if err != nil {
firstErr = err
break
}
}
}
// Count total samples across all series
totalSamples := 0
for _, metric := range rss.simulatedSeries {
totalSamples += len(metric.Timestamps)
}
qt.Donef("series=%d, samples=%d", len(rss.simulatedSeries), totalSamples)
return firstErr
}
func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) (int, error) {
tswsLen := len(rss.packedTimeseries)
if tswsLen == 0 {
@@ -1119,6 +1212,10 @@ func SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline
//
// Results.RunParallel or Results.Cancel must be called on the returned Results.
func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutil.Deadline) (*Results, error) {
if len(sq.SimulatedSeries) > 0 {
return processSearchSimulated(qt, sq, deadline)
}
qt = qt.NewChild("fetch matching series: %s", sq)
defer qt.Done()
if deadline.Exceeded() {
@@ -1291,6 +1388,41 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadlin
return &rss, nil
}
func processSearchSimulated(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutil.Deadline) (*Results, error) {
qt = qt.NewChild("fetch matching series (simulated): %s", sq)
defer qt.Done()
if deadline.Exceeded() {
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
}
tr := storage.TimeRange{
MinTimestamp: sq.MinTimestamp,
MaxTimestamp: sq.MaxTimestamp,
}
// Process simulated samples.
matchedSamples, err := storage.MatchSimulatedSamples(sq.SimulatedSeries, sq.TagFilterss)
if err != nil {
return nil, fmt.Errorf("cannot match simulated samples: %w", err)
}
// Create a result set similar to ProcessSearchQuery
rss := &Results{
tr: tr,
deadline: deadline,
isSimulated: true,
simulatedSeries: matchedSamples,
}
if len(matchedSamples) == 0 {
qt.Printf("no matching series found")
} else {
qt.Printf("found %d series", len(rss.simulatedSeries))
}
return rss, nil
}
type blockRef struct {
partRef storage.PartRef
addr tmpBlockAddr

View File

@@ -0,0 +1,20 @@
{% import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
) %}
{% stripspace %}
ConfigResponse generates response for /api/v1/config .
{% func ConfigResponse(config *ConfigData, qt *querytracer.Tracer) %}
{
"status":"success",
"data":{
"minStalenessInterval": {%q= config.MinStalenessInterval %},
"maxStalenessInterval": {%q= config.MaxStalenessInterval %}
}
{% code qt.Done() %}
{%= dumpQueryTrace(qt) %}
}
{% endfunc %}
{% endstripspace %}

View File

@@ -0,0 +1,73 @@
// Code generated by qtc from "config_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
//line app/vmselect/prometheus/config_response.qtpl:1
package prometheus
//line app/vmselect/prometheus/config_response.qtpl:1
import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
)
// ConfigResponse generates response for /api/v1/config .
//line app/vmselect/prometheus/config_response.qtpl:8
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/config_response.qtpl:8
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/prometheus/config_response.qtpl:8
func StreamConfigResponse(qw422016 *qt422016.Writer, config *ConfigData, qt *querytracer.Tracer) {
//line app/vmselect/prometheus/config_response.qtpl:8
qw422016.N().S(`{"status":"success","data":{"minStalenessInterval":`)
//line app/vmselect/prometheus/config_response.qtpl:12
qw422016.N().Q(config.MinStalenessInterval)
//line app/vmselect/prometheus/config_response.qtpl:12
qw422016.N().S(`,"maxStalenessInterval":`)
//line app/vmselect/prometheus/config_response.qtpl:13
qw422016.N().Q(config.MaxStalenessInterval)
//line app/vmselect/prometheus/config_response.qtpl:13
qw422016.N().S(`}`)
//line app/vmselect/prometheus/config_response.qtpl:15
qt.Done()
//line app/vmselect/prometheus/config_response.qtpl:16
streamdumpQueryTrace(qw422016, qt)
//line app/vmselect/prometheus/config_response.qtpl:16
qw422016.N().S(`}`)
//line app/vmselect/prometheus/config_response.qtpl:18
}
//line app/vmselect/prometheus/config_response.qtpl:18
func WriteConfigResponse(qq422016 qtio422016.Writer, config *ConfigData, qt *querytracer.Tracer) {
//line app/vmselect/prometheus/config_response.qtpl:18
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/config_response.qtpl:18
StreamConfigResponse(qw422016, config, qt)
//line app/vmselect/prometheus/config_response.qtpl:18
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/config_response.qtpl:18
}
//line app/vmselect/prometheus/config_response.qtpl:18
func ConfigResponse(config *ConfigData, qt *querytracer.Tracer) string {
//line app/vmselect/prometheus/config_response.qtpl:18
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/config_response.qtpl:18
WriteConfigResponse(qb422016, config, qt)
//line app/vmselect/prometheus/config_response.qtpl:18
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/config_response.qtpl:18
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/config_response.qtpl:18
return qs422016
//line app/vmselect/prometheus/config_response.qtpl:18
}

View File

@@ -0,0 +1,18 @@
{% stripspace %}
ExtractMetricExprsResponse generates response for /extract-metric-exprs .
{% func ExtractMetricExprsResponse(metrics []string) %}
{
"status":"success",
"data":[
{% if len(metrics) > 0 %}
{%q= metrics[0] %}
{% for i := 1; i < len(metrics); i++ %}
,{%q= metrics[i] %}
{% endfor %}
{% endif %}
]
}
{% endfunc %}
{% endstripspace %}

View File

@@ -0,0 +1,69 @@
// Code generated by qtc from "extract_metric_exprs_response.qtpl". DO NOT EDIT.
// See https://github.com/valyala/quicktemplate for details.
// ExtractMetricExprsResponse generates response for /extract-metric-exprs .
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
package prometheus
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
import (
qtio422016 "io"
qt422016 "github.com/valyala/quicktemplate"
)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
var (
_ = qtio422016.Copy
_ = qt422016.AcquireByteBuffer
)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
func StreamExtractMetricExprsResponse(qw422016 *qt422016.Writer, metrics []string) {
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
qw422016.N().S(`{"status":"success","data":[`)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:8
if len(metrics) > 0 {
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:9
qw422016.N().Q(metrics[0])
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:10
for i := 1; i < len(metrics); i++ {
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:10
qw422016.N().S(`,`)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:11
qw422016.N().Q(metrics[i])
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:12
}
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:13
}
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:13
qw422016.N().S(`]}`)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
}
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
func WriteExtractMetricExprsResponse(qq422016 qtio422016.Writer, metrics []string) {
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
qw422016 := qt422016.AcquireWriter(qq422016)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
StreamExtractMetricExprsResponse(qw422016, metrics)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
qt422016.ReleaseWriter(qw422016)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
}
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
func ExtractMetricExprsResponse(metrics []string) string {
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
qb422016 := qt422016.AcquireByteBuffer()
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
WriteExtractMetricExprsResponse(qb422016, metrics)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
qs422016 := string(qb422016.B)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
qt422016.ReleaseByteBuffer(qb422016)
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
return qs422016
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
}

View File

@@ -1,8 +1,10 @@
package prometheus
import (
"encoding/json"
"flag"
"fmt"
"io"
"math"
"net/http"
"runtime"
@@ -20,6 +22,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
@@ -42,6 +45,9 @@ var (
maxLookback = flag.Duration("search.maxLookback", 0, "Synonym to -query.lookback-delta from Prometheus. "+
"The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. "+
"See also '-search.maxStalenessInterval' flag, which has the same meaning due to historical reasons")
minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+
"This flag could be useful for removing gaps on graphs generated from time series with irregular intervals between samples. "+
"See also '-search.maxStalenessInterval'")
maxStalenessInterval = flag.Duration("search.maxStalenessInterval", 0, "The maximum interval for staleness calculations. "+
"By default, it is automatically calculated from the median interval between samples. This flag could be useful for tuning "+
"Prometheus data model closer to Influx-style data model. See https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness for details. "+
@@ -116,7 +122,7 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request
if err != nil {
return err
}
lookbackDelta, err := getMaxLookback(r)
lookbackDelta, err := getMaxLookback(r, *maxStalenessInterval)
if err != nil {
return err
}
@@ -611,6 +617,55 @@ func TSDBStatusHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/tsdb"}`)
// ConfigData holds the current configuration values for search-related flags
type ConfigData struct {
MinStalenessInterval string
MaxStalenessInterval string
}
// ConfigHandler processes /api/v1/config request.
//
// It returns the current configuration for search-related flags.
func ConfigHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWriter, _ *http.Request) error {
config := &ConfigData{
MinStalenessInterval: (*minStalenessInterval).String(),
MaxStalenessInterval: (*maxStalenessInterval).String(),
}
w.Header().Set("Content-Type", "application/json")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteConfigResponse(bw, config, qt)
if err := bw.Flush(); err != nil {
return fmt.Errorf("cannot send config response to remote client: %w", err)
}
return nil
}
// ExtractMetricExprsHandler processes /extract-metric-exprs request.
//
// It extracts metric expressions from a given PromQL query.
func ExtractMetricExprsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
query := r.FormValue("query")
if len(query) == 0 {
return fmt.Errorf("missing `query` arg")
}
metrics, err := promql.ExtractMetricsFromQuery(query)
if err != nil {
return fmt.Errorf("cannot extract metrics from query: %w", err)
}
w.Header().Set("Content-Type", "application/json")
bw := bufferedwriter.Get(w)
defer bufferedwriter.Put(bw)
WriteExtractMetricExprsResponse(bw, metrics)
if err := bw.Flush(); err != nil {
return fmt.Errorf("cannot send extract metric exprs response to remote client: %w", err)
}
return nil
}
// LabelsHandler processes /api/v1/labels request.
//
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
@@ -712,7 +767,8 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
ct := startTime.UnixNano() / 1e6
deadline := searchutil.GetDeadlineForQuery(r, startTime)
mayCache := !httputil.GetBool(r, "nocache")
isDebug := httputil.GetBool(r, "debug")
noCache := httputil.GetBool(r, "nocache") || isDebug
query := r.FormValue("query")
if len(query) == 0 {
return fmt.Errorf("missing `query` arg")
@@ -721,7 +777,7 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
if err != nil {
return err
}
lookbackDelta, err := getMaxLookback(r)
lookbackDelta, err := getMaxLookback(r, *maxStalenessInterval)
if err != nil {
return err
}
@@ -807,23 +863,14 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
} else {
queryOffset = 0
}
ec := &promql.EvalConfig{
Start: start,
End: start,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: GetMaxUniqueTimeSeries(),
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: mayCache,
LookbackDelta: lookbackDelta,
RoundDigits: getRoundDigits(r),
EnforcedTagFilterss: etfs,
CacheTagFilters: etfs,
GetRequestURI: func() string {
return httpserver.GetRequestURI(r)
},
ec := newEvalConfig(r, start, start, step, deadline, noCache, lookbackDelta, isDebug, etfs)
if isDebug {
if err := populateSimulatedData(r, nil, ec); err != nil {
_ = r.Body.Close()
return fmt.Errorf("cannot read simulated samples: %w", err)
}
}
_ = r.Body.Close()
qs := promql.NewQueryStats(query, nil, ec)
ec.QueryStats = qs
@@ -897,8 +944,9 @@ func QueryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWriter, query string,
start, end, step int64, r *http.Request, ct int64, etfs [][]storage.TagFilter) error {
deadline := searchutil.GetDeadlineForQuery(r, startTime)
mayCache := !httputil.GetBool(r, "nocache")
lookbackDelta, err := getMaxLookback(r)
isDebug := httputil.GetBool(r, "debug")
noCache := httputil.GetBool(r, "nocache") || isDebug
lookbackDelta, err := getMaxLookback(r, *maxStalenessInterval)
if err != nil {
return err
}
@@ -913,27 +961,19 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
if err := promql.ValidateMaxPointsPerSeries(start, end, step, *maxPointsPerTimeseries); err != nil {
return fmt.Errorf("%w; (see -search.maxPointsPerTimeseries command-line flag)", err)
}
if mayCache {
if !noCache {
start, end = promql.AdjustStartEnd(start, end, step)
}
ec := &promql.EvalConfig{
Start: start,
End: end,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: GetMaxUniqueTimeSeries(),
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: mayCache,
LookbackDelta: lookbackDelta,
RoundDigits: getRoundDigits(r),
EnforcedTagFilterss: etfs,
CacheTagFilters: etfs,
GetRequestURI: func() string {
return httpserver.GetRequestURI(r)
},
ec := newEvalConfig(r, start, end, step, deadline, noCache, lookbackDelta, isDebug, etfs)
if isDebug {
if err := populateSimulatedData(r, nil, ec); err != nil {
_ = r.Body.Close()
return fmt.Errorf("cannot read simulated samples: %w", err)
}
}
_ = r.Body.Close()
qs := promql.NewQueryStats(query, nil, ec)
ec.QueryStats = qs
@@ -969,6 +1009,93 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
return nil
}
func newEvalConfig(r *http.Request, start, end, step int64, deadline searchutil.Deadline, noCache bool, lookbackDelta int64, isDebug bool, etfs [][]storage.TagFilter) *promql.EvalConfig {
ec := &promql.EvalConfig{
Start: start,
End: end,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: GetMaxUniqueTimeSeries(),
MinStalenessInterval: *minStalenessInterval,
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: !noCache,
LookbackDelta: lookbackDelta,
RoundDigits: getRoundDigits(r),
EnforcedTagFilterss: etfs,
CacheTagFilters: etfs,
GetRequestURI: func() string {
return httpserver.GetRequestURI(r)
},
}
return ec
}
func populateSimulatedData(r *http.Request, at *auth.Token, evalConfig *promql.EvalConfig) error {
type jsonExportBlockInput struct {
Metric map[string]string `json:"metric"`
Values []float64 `json:"values"`
Timestamps []int64 `json:"timestamps"`
}
// --- Read and Parse Input Samples from r.Body ---
var simulatedSeries []*storage.SimulatedSamples
decoder := json.NewDecoder(r.Body)
lineNum := 0
for {
var jeb jsonExportBlockInput
if err := decoder.Decode(&jeb); err == io.EOF {
break
} else if err != nil {
return fmt.Errorf("error decoding input JSON on line %d: %w", lineNum, err)
}
// Validate that values and timestamps arrays have the same length
if len(jeb.Values) != len(jeb.Timestamps) {
return fmt.Errorf("mismatched values and timestamps arrays length in debug data on line %d: values=%d, timestamps=%d", lineNum, len(jeb.Values), len(jeb.Timestamps))
}
var mn = storage.GetMetricName()
defer storage.PutMetricName(mn)
for k, v := range jeb.Metric {
mn.AddTag(k, v)
}
ss := &storage.SimulatedSamples{
Value: jeb.Values,
Timestamps: jeb.Timestamps,
}
ss.Name.CopyFrom(mn)
simulatedSeries = append(simulatedSeries, ss)
lineNum++
}
// It doesn't make sense to debug with empty samples
if len(simulatedSeries) == 0 {
return fmt.Errorf("no simulated samples found")
}
minStalenessInterval, err := httputil.GetDurationRaw(r, "min_staleness_interval", evalConfig.MinStalenessInterval)
if err != nil {
return fmt.Errorf("cannot parse `min_staleness_interval` arg: %w", err)
}
maxStalenessInterval, err := httputil.GetDurationRaw(r, "max_staleness_interval", *maxStalenessInterval)
if err != nil {
return fmt.Errorf("cannot parse `max_staleness_interval` arg: %w", err)
}
evalConfig.SimulatedSamples = simulatedSeries
evalConfig.MinStalenessInterval = minStalenessInterval
evalConfig.LookbackDelta, err = getMaxLookback(r, maxStalenessInterval)
if err != nil {
return err
}
return nil
}
func removeEmptyValuesAndTimeseries(tss []netstorage.Result) []netstorage.Result {
dst := tss[:0]
for i := range tss {
@@ -1044,7 +1171,7 @@ func adjustLastPoints(tss []netstorage.Result, start, end int64) []netstorage.Re
return tss
}
func getMaxLookback(r *http.Request) (int64, error) {
func getMaxLookback(r *http.Request, maxStalenessInterval time.Duration) (int64, error) {
d := maxLookback.Milliseconds()
if d == 0 {
d = maxStalenessInterval.Milliseconds()

View File

@@ -134,6 +134,10 @@ type EvalConfig struct {
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
LookbackDelta int64
// MaxStalenessInterval corresponds to -search.maxStalenessInterval,
// but customized per query request.
MinStalenessInterval time.Duration
// How many decimal digits after the point to leave in response.
RoundDigits int
@@ -158,6 +162,9 @@ type EvalConfig struct {
timestamps []int64
timestampsOnce sync.Once
// Simulated samples
SimulatedSamples []*storage.SimulatedSamples
}
// copyEvalConfig returns src copy.
@@ -176,6 +183,8 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig {
ec.CacheTagFilters = src.CacheTagFilters
ec.GetRequestURI = src.GetRequestURI
ec.QueryStats = src.QueryStats
ec.MinStalenessInterval = src.MinStalenessInterval
ec.SimulatedSamples = src.SimulatedSamples
// do not copy src.timestamps - they must be generated again.
return &ec
@@ -929,7 +938,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
}
ecSQ := copyEvalConfig(ec)
ecSQ.Start -= window + step + maxSilenceInterval()
ecSQ.Start -= window + step + maxSilenceInterval(ec.MinStalenessInterval)
ecSQ.End += step
ecSQ.Step = step
ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries
@@ -946,7 +955,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
return nil, nil
}
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.MinStalenessInterval)
if err != nil {
return nil, err
}
@@ -1684,7 +1693,7 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
}
// Obtain rollup configs before fetching data from db, so type errors could be caught earlier.
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.MinStalenessInterval)
if err != nil {
return nil, err
}
@@ -1694,7 +1703,7 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
tfss = searchutil.JoinTagFilterss(tfss, ec.EnforcedTagFilterss)
minTimestamp := ec.Start
if needSilenceIntervalForRollupFunc[funcName] {
minTimestamp -= maxSilenceInterval()
minTimestamp -= maxSilenceInterval(ec.MinStalenessInterval)
}
if window > ec.Step {
minTimestamp -= window
@@ -1702,6 +1711,8 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
minTimestamp -= ec.Step
}
sq := storage.NewSearchQuery(minTimestamp, ec.End, tfss, ec.MaxSeries)
sq.SimulatedSeries = ec.SimulatedSamples
rss, err := netstorage.ProcessSearchQuery(qt, sq, ec.Deadline)
if err != nil {
return nil, err
@@ -1787,7 +1798,7 @@ func getRollupMemoryLimiter() *memoryLimiter {
return &rollupMemoryLimiter
}
func maxSilenceInterval() int64 {
func maxSilenceInterval(minStalenessInterval time.Duration) int64 {
d := minStalenessInterval.Milliseconds()
if d <= 0 {
d = 5 * 60 * 1000

View File

@@ -61,12 +61,15 @@ func Exec(qt *querytracer.Tracer, ec *EvalConfig, q string, isFirstPointOnly boo
}
}
var rv []*timeseries
qid := activeQueriesV.Add(ec, q)
rv, err := evalExpr(qt, ec, e)
rv, err = evalExpr(qt, ec, e)
activeQueriesV.Remove(qid)
if err != nil {
return nil, err
}
if isFirstPointOnly {
// Remove all the points except the first one from every time series.
for _, ts := range rv {
@@ -325,3 +328,23 @@ func escapeDots(s string) string {
}
return string(result)
}
// ExtractMetricsFromQuery visits all the expressions in query and returns all the metrics found in the query.
func ExtractMetricsFromQuery(query string) ([]string, error) {
expr, err := metricsql.Parse(query)
if err != nil {
return nil, fmt.Errorf("error parsing query: %w", err)
}
var metrics []string
metricsql.VisitAll(expr, func(e metricsql.Expr) {
if me, ok := e.(*metricsql.MetricExpr); ok {
metricStr := string(me.AppendString(nil))
if metricStr != "" {
metrics = append(metrics, metricStr)
}
}
})
return metrics, nil
}

View File

@@ -0,0 +1,313 @@
package promql
import (
"math"
"slices"
"testing"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
func TestSimulatedExec(t *testing.T) {
accountID := uint32(123)
projectID := uint32(567)
start := int64(1000e3)
end := int64(2000e3)
step := int64(200e3)
// Base EvalConfig that will be copied for each test
baseEC := EvalConfig{
Start: start,
End: end,
Step: step,
MaxPointsPerSeries: 1e4,
MaxSeries: 1000,
Deadline: searchutil.NewDeadline(time.Now(), time.Hour, ""),
RoundDigits: 100,
MayCache: false,
}
t.Run(`simple_metric_exact_match`, func(t *testing.T) {
t.Skip()
ec := copyEvalConfig(&baseEC)
mn := newMetric(accountID, projectID,
"__name__", "test_metric",
"a", "b",
)
ec.SimulatedSamples = []*storage.SimulatedSamples{mn.build()}
q := `test_metric{a="b"}`
result, err := Exec(nil, ec, q, false)
if err != nil {
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
}
// Expected result
expectedMN := storage.MetricName{
MetricGroup: []byte("test_metric"),
Tags: []storage.Tag{
{
Key: []byte("a"),
Value: []byte("b"),
},
},
}
expectedResult := []netstorage.Result{
{
MetricName: expectedMN,
Values: mn.Value,
Timestamps: mn.Timestamps,
},
}
testResultsEqual(t, result, expectedResult)
})
t.Run(`filtered_by_tag_value`, func(t *testing.T) {
t.Skip()
// Create a copy of base EvalConfig
ec := copyEvalConfig(&baseEC)
mn := metricBuilders{
newMetric(accountID, projectID,
"__name__", "test_metric",
"a", "b",
"region", "us-west",
),
newMetric(accountID, projectID,
"__name__", "test_metric",
"a", "b",
"region", "us-east",
),
}
ec.SimulatedSamples = mn.build()
q := `test_metric{region="us-west"}`
result, err := Exec(nil, ec, q, false)
if err != nil {
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
}
// Expected result
expectedMN := storage.MetricName{
MetricGroup: []byte("test_metric"),
Tags: []storage.Tag{
{
Key: []byte("a"),
Value: []byte("b"),
},
{
Key: []byte("region"),
Value: []byte("us-west"),
},
},
}
expectedResult := []netstorage.Result{
{
MetricName: expectedMN,
Values: mn[0].Value,
Timestamps: mn[0].Timestamps,
},
}
testResultsEqual(t, result, expectedResult)
})
t.Run(`regex_match_on_tag`, func(t *testing.T) {
ec := copyEvalConfig(&baseEC)
mn := metricBuilders{
newMetric(accountID, projectID,
"__name__", "test_metric",
"env", "prod",
),
newMetric(accountID, projectID,
"__name__", "test_metric",
"env", "staging",
),
newMetric(accountID, projectID,
"__name__", "test_metric",
"env", "dev",
),
}
ec.SimulatedSamples = mn.build()
q := `test_metric{env=~"prod|staging"}`
result, err := Exec(nil, ec, q, false)
if err != nil {
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
}
expectedResult := []netstorage.Result{mn[0].toResult(), mn[1].toResult()}
testResultsEqual(t, result, expectedResult)
})
}
func TestSumOverTime(t *testing.T) {
accountID := uint32(123)
projectID := uint32(567)
start := int64(1000e3)
end := int64(1300e3)
step := int64(30e3)
baseEC := EvalConfig{
Start: start,
End: end,
Step: step,
MaxPointsPerSeries: 1e4,
MaxSeries: 1000,
Deadline: searchutil.NewDeadline(time.Now(), time.Hour, ""),
RoundDigits: 100,
MayCache: false,
}
t.Run(`basic_sum_over_time`, func(t *testing.T) {
ec := copyEvalConfig(&baseEC)
metric := newMetric(accountID, projectID,
"__name__", "test_metric",
"app", "api-server",
).withValues(1, 2, 3, 4, 5, 6).withUnix(1000, 1015, 1030, 1045, 1060, 1075)
ec.SimulatedSamples = []*storage.SimulatedSamples{metric.build()}
q := `sum_over_time(test_metric[30s])`
result, err := Exec(nil, ec, q, false)
if err != nil {
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
}
expectedResult := []netstorage.Result{
newMetric(accountID, projectID,
"app", "api-server",
).withValues(1, 5, 9, 6).withUnix(1000, 1030, 1060, 1090).toResult(),
}
testSimulatedResultsEqual(t, result, expectedResult)
})
}
type metricBuilder storage.SimulatedSamples
func newMetric(accountID uint32, projectID uint32, pairs ...string) *metricBuilder {
mn := storage.MetricName{}
for i := 0; i < len(pairs); i += 2 {
mn.AddTag(pairs[i], pairs[i+1])
}
return &metricBuilder{
Name: mn,
Value: []float64{10, 20, 30, 40, 50, 60},
Timestamps: []int64{1000e3, 1200e3, 1400e3, 1600e3, 1800e3, 2000e3},
}
}
func (b *metricBuilder) withUnix(unix ...int64) *metricBuilder {
b.Timestamps = make([]int64, len(unix))
for i := range unix {
b.Timestamps[i] = unix[i] * 1e3
}
return b
}
func (b *metricBuilder) withValues(values ...float64) *metricBuilder {
b.Value = values
return b
}
func (b *metricBuilder) build() *storage.SimulatedSamples {
return (*storage.SimulatedSamples)(b)
}
func (b *metricBuilder) toResult() netstorage.Result {
return netstorage.Result{
MetricName: b.Name,
Values: b.Value,
Timestamps: b.Timestamps,
}
}
type metricBuilders []*metricBuilder
func (b metricBuilders) build() []*storage.SimulatedSamples {
ss := make([]*storage.SimulatedSamples, len(b))
for i := range b {
ss[i] = b[i].build()
}
return ss
}
func testSimulatedResultsEqual(t *testing.T, result, resultExpected []netstorage.Result) {
t.Helper()
result = removeEmptyValuesAndTimeseries(result)
if len(result) != len(resultExpected) {
t.Fatalf(`unexpected timeseries count; got %d; want %d`, len(result), len(resultExpected))
}
for i := range result {
r := &result[i]
rExpected := &resultExpected[i]
testMetricNamesEqual(t, &r.MetricName, &rExpected.MetricName, i)
testRowsEqual(t, r.Values, r.Timestamps, rExpected.Values, rExpected.Timestamps)
}
}
func removeEmptyValuesAndTimeseries(tss []netstorage.Result) []netstorage.Result {
dst := tss[:0]
for i := range tss {
ts := &tss[i]
hasNaNs := slices.ContainsFunc(ts.Values, math.IsNaN)
if !hasNaNs {
// Fast path: nothing to remove.
if len(ts.Values) > 0 {
dst = append(dst, *ts)
}
continue
}
// Slow path: remove NaNs.
srcTimestamps := ts.Timestamps
dstValues := ts.Values[:0]
// Do not reuse ts.Timestamps for dstTimestamps, since ts.Timestamps
// may be shared among multiple time series.
dstTimestamps := make([]int64, 0, len(ts.Timestamps))
for j, v := range ts.Values {
if math.IsNaN(v) {
continue
}
dstValues = append(dstValues, v)
dstTimestamps = append(dstTimestamps, srcTimestamps[j])
}
ts.Values = dstValues
ts.Timestamps = dstTimestamps
if len(ts.Values) > 0 {
dst = append(dst, *ts)
}
}
return dst
}
func TestExtractMetricsFromQuery(t *testing.T) {
query := `(vm_free_disk_space_bytes{job=~"$job", instance=~"$instance"}-vm_free_disk_space_limit_bytes{job=~"$job", instance=~"$instance"})
/
ignoring(path) (
(rate(vm_rows_added_to_storage_total{job=~"$job", instance=~"$instance"}[1d]) -
sum(rate(vm_deduplicated_samples_total{job=~"$job", instance=~"$instance"}[1d])) without (type)) *
(
sum(vm_data_size_bytes{job=~"$job", instance=~"$instance", type!~"indexdb.*"}) without(type) /
sum(vm_rows{job=~"$job", instance=~"$instance", type!~"indexdb.*"}) without(type)
)
+
rate(vm_new_timeseries_created_total{job=~"$job", instance=~"$instance"}[1d]) *
(
sum(vm_data_size_bytes{job=~"$job", instance=~"$instance", type="indexdb/file"}) /
sum(vm_rows{job=~"$job", instance=~"$instance", type="indexdb/file"})
)
)`
metrics, err := ExtractMetricsFromQuery(query)
if err != nil {
t.Fatalf(`unexpected error when extracting metrics from query: %s`, err)
}
t.Logf(`metrics: %v`, metrics)
}

View File

@@ -1,12 +1,12 @@
package promql
import (
"flag"
"fmt"
"math"
"strconv"
"strings"
"sync"
"time"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/metricsql"
@@ -17,10 +17,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
)
var minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+
"This flag could be useful for removing gaps on graphs generated from time series with irregular intervals between samples. "+
"See also '-search.maxStalenessInterval'")
var rollupFuncs = map[string]newRollupFunc{
"absent_over_time": newRollupFuncOneArg(rollupAbsent),
"aggr_over_time": newRollupFuncTwoArgs(rollupFake),
@@ -372,7 +368,7 @@ func getRollupTag(expr metricsql.Expr) (string, error) {
}
func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start, end, step int64, maxPointsPerSeries int,
window, lookbackDelta int64, sharedTimestamps []int64) (
window, lookbackDelta int64, sharedTimestamps []int64, minStalenessInterval time.Duration) (
func(values []float64, timestamps []int64), []*rollupConfig, error) {
preFunc := func(_ []float64, _ []int64) {}
funcName = strings.ToLower(funcName)
@@ -408,6 +404,7 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start
Timestamps: sharedTimestamps,
isDefaultRollup: funcName == "default_rollup",
samplesScannedPerCall: samplesScannedPerCall,
minStalenessInterval: minStalenessInterval,
}
}
@@ -600,6 +597,9 @@ type rollupConfig struct {
//
// If zero, then it is considered that Func scans all the samples passed to it.
samplesScannedPerCall int
// The minimum interval for staleness calculations.
minStalenessInterval time.Duration
}
func (rc *rollupConfig) getTimestamps() []int64 {
@@ -723,8 +723,8 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
maxPrevInterval = rc.LookbackDelta
}
if *minStalenessInterval > 0 {
if msi := minStalenessInterval.Milliseconds(); msi > 0 && maxPrevInterval < msi {
if rc.minStalenessInterval > 0 {
if msi := rc.minStalenessInterval.Milliseconds(); msi > 0 && maxPrevInterval < msi {
maxPrevInterval = msi
}
}

View File

@@ -1149,7 +1149,6 @@ The relabeling can be debugged at `http://victoriametrics:8428/metric-relabel-de
or at our [public demo playground](https://play.victoriametrics.com/select/accounting/1/6a716b0f-38bc-4856-90ce-448fd713e3fe/prometheus/graph/#/relabeling).
See [these docs](https://docs.victoriametrics.com/victoriametrics/relabeling/#relabel-debugging) for more details.
* `-relabelConfigCheckInterval` {{% available_from "#" %}} command-line flag controls how often VictoriaMetrics checks the `-relabelConfig` file for changes and reloads it automatically.
## Federation
@@ -2777,9 +2776,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-relabelConfig string
Optional path to a file with relabeling rules, which are applied to all the ingested metrics. The path can point either to local file or to http url. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#relabeling for details. The config is reloaded on SIGHUP signal orat the interval specified by -relabel.configCheckInterval.
-relabelConfigCheckInterval duration
Interval for checking for changes in '-relabelConfig' file. By default the checking is disabled. Send SIGHUP signal in order to force config check for changes
Optional path to a file with relabeling rules, which are applied to all the ingested metrics. The path can point either to local file or to http url. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#relabeling for details. The config is reloaded on SIGHUP signal
-reloadAuthKey value
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings.
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
@@ -2971,8 +2968,6 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
Path to storage data (default "victoria-metrics-data")
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/ . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.configCheckInterval duration
Interval for checking stream aggregation configuration. By default, the checking is disabled.
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication
-streamAggr.dropInput

View File

@@ -26,8 +26,6 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: upgrade Go builder from Go1.24.6 to Go1.25. See [Go1.25 release notes](https://tip.golang.org/doc/go1.25).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add export functionality for Query (Table view) and RawQuery tabs in CSV/JSON format. See [#9332](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9332).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): add `-relabelConfigCheckInterval` and `-streamAggr.configCheckInterval` flags for periodical stream aggregation and relabel configuration reload. When set, `vmsingle` scans the corresponding files for updates and reloads them automatically. See [#9590](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9590).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-streamAggr.configCheckInterval` and `-relabel.configCheckInterval` for periodic configuration reload. When set, `vmagent` scans the corresponding files for updates and reloads them automatically. See [#9590](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9590).
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): prevent remote write ingestion stop on push error for [Google Pub/Sub](https://docs.victoriametrics.com/victoriametrics/vmagent/#writing-metrics-to-pubsub) integration.
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): properly handle [mTLS authorization and routing](https://docs.victoriametrics.com/victoriametrics/vmauth/#mtls-based-request-routing). Previously it didn't work. See [#29](https://github.com/VictoriaMetrics/VictoriaLogs/issues/29).

View File

@@ -184,8 +184,6 @@ support the following approaches for hot reloading stream aggregation configs fr
```
* By sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload` or `http://victoria-metrics:8428/-/reload`).
* Use the `-remoteWrite.configCheckInterval` {{% available_from "#" %}} flag to control how often `vmagent` reloads configs specified via `-remoteWrite.streamAggr.config`, `-streamAggr.config`, `-remoteWrite.relabelConfig` and `remoteWrite.urlRelabelConfig` flags. Reload is disabled by default.
* Use the `-streamAggr.configCheckInterval` {{% available_from "#" %}} flag to control how often `victoria-metrics` reloads config specified via `-streamAggr.config` flag. Reload is disabled by default.
## Aggregation outputs

View File

@@ -374,11 +374,7 @@ and `-remoteWrite.streamAggr.config`:
* Sending HTTP request to `http://vmagent:8429/-/reload` endpoint. This endpoint can be protected with `-reloadAuthKey` command-line flag.
There is also:
* `-promscrape.configCheckInterval` command-line flag controls how often VictoriaMetrics checks the `-promscrape.config` file for changes and reloads it automatically.
* `-streamAggr.configCheckInterval` {{% available_from "#" %}} command-line flag controls how often VictoriaMetrics checks and automatically reloads configs specified via `-remoteWrite.streamAggr.config` and `-streamAggr.config` flags.
* `-relabel.configCheckInterval` {{% available_from "#" %}} command-line flag controls how often VictoriaMetrics checks and automatically reloads configs specified via `-remoteWrite.relabelConfig` and `remoteWrite.urlRelabelConfig` flags.
There is also `-promscrape.configCheckInterval` command-line flag, which can be used for automatic reloading configs from updated `-promscrape.config` file.
## SRV urls
@@ -1889,8 +1885,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
Optional URL to push metrics exposed at /metrics page. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#push-metrics . By default, metrics exposed at /metrics page aren't pushed to any remote storage
Supports an array of values separated by comma or specified via multiple flags.
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
-relabel.configCheckInterval duration
Interval for checking for changes in configurations defined via -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig flags. By default, the checking is disabled.
-reloadAuthKey value
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides -httpAuth.*
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
@@ -2134,8 +2128,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}Enabled sorting for labels can slow down ingestion performance a bit
-streamAggr.config string
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/ . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
-streamAggr.configCheckInterval duration
Interval for checking for changes in configurations defined via -streamAggr.config and -remoteWrite.streamAggr.config flags. By default, the checking is disabled.
-streamAggr.dedupInterval duration
Input samples are de-duplicated with this interval on aggregator before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication
-streamAggr.dropInput

View File

@@ -4,6 +4,7 @@ import (
"fmt"
"net/http"
"strconv"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
)
@@ -34,4 +35,31 @@ func GetDuration(r *http.Request, argKey string, defaultValue int64) (int64, err
return msecs, nil
}
// GetDurationRaw returns time.Duration from the given argKey query arg.
func GetDurationRaw(r *http.Request, argKey string, defaultValue time.Duration) (time.Duration, error) {
argValue := r.FormValue(argKey)
if len(argValue) == 0 {
return defaultValue, nil
}
if argValue == "undefined" {
// This hack is needed for Grafana, which may send undefined value
return defaultValue, nil
}
secs, err := strconv.ParseFloat(argValue, 64)
if err != nil {
// Try parsing string format
d, err := timeutil.ParseDuration(argValue)
if err != nil {
return 0, fmt.Errorf("cannot parse %q=%q: %w", argKey, argValue, err)
}
return d, nil
}
d := time.Duration(secs * float64(time.Second))
msecs := d.Milliseconds()
if msecs <= 0 || msecs > maxDurationMsecs {
return 0, fmt.Errorf("%s=%s is out of allowed range [%s ... %s]", argKey, d, time.Millisecond, time.Duration(maxDurationMsecs)*time.Millisecond)
}
return d, nil
}
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000

View File

@@ -2105,6 +2105,43 @@ func hasCompositeTagFilters(tfs []*tagFilter, prefix []byte) bool {
return false
}
// MatchSimulatedSamples filters the given simulatedSamples against the provided tag filters.
// It returns only the simulated samples that match any of the given tag filter sets.
// This function is used for debugging and testing purposes to simulate metric queries.
func MatchSimulatedSamples(simulatedSamples []*SimulatedSamples, tagFilterss [][]TagFilter) ([]*SimulatedSamples, error) {
var kb bytesutil.ByteBuffer
matchedSamples := make([]*SimulatedSamples, 0, 1)
for _, rawTfs := range tagFilterss {
tfs := NewTagFilters()
for _, tf := range rawTfs {
err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp)
if err != nil {
return nil, fmt.Errorf("cannot add tagFilter %s: %w", tf.String(), err)
}
}
for idx, mn := range simulatedSamples {
ok, err := matchTagFilters(&mn.Name, toTFPointers(tfs.tfs), &kb)
if err != nil {
return nil, fmt.Errorf("cannot match MetricName %s against tagFilters: %w", mn.Name.String(), err)
}
if ok {
matchedSamples = append(matchedSamples, simulatedSamples[idx])
}
}
}
return matchedSamples, nil
}
func toTFPointers(tfs []tagFilter) []*tagFilter {
tfps := make([]*tagFilter, len(tfs))
for i := range tfs {
tfps[i] = &tfs[i]
}
return tfps
}
func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) {
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
for i, tf := range tfs {

View File

@@ -2045,14 +2045,6 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
fs.MustRemoveDir(path)
}
func toTFPointers(tfs []tagFilter) []*tagFilter {
tfps := make([]*tagFilter, len(tfs))
for i := range tfs {
tfps[i] = &tfs[i]
}
return tfps
}
func newTestStorage() *Storage {
s := &Storage{
cachePath: "test-storage-cache",

View File

@@ -309,6 +309,33 @@ type SearchQuery struct {
// The maximum number of time series the search query can return.
MaxMetrics int
// SimulatedSeries is used for simulating samples returned from storage nodes.
SimulatedSeries []*SimulatedSamples
}
// SimulatedSamples represents simulated metric samples for debug and testing purposes.
// It contains metric name, timestamps and corresponding values.
type SimulatedSamples struct {
Name MetricName
Timestamps []int64
Value []float64
}
// NewSimulatedSeries creates a new SimulatedSamples instance with the given parameters.
// It constructs a metric name from the provided metric labels.
func NewSimulatedSeries(metric map[string]string, timestamp []int64, value []float64) *SimulatedSamples {
ss := &SimulatedSamples{
Timestamps: timestamp,
Value: value,
}
mn := GetMetricName()
defer PutMetricName(mn)
for k, v := range metric {
mn.AddTag(k, v)
}
ss.Name.CopyFrom(mn)
return ss
}
// GetTimeRange returns time range for the given sq.