mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-08 19:33:35 +03:00
Compare commits
10 Commits
follow-up-
...
query-debu
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f9e5881303 | ||
|
|
ab6fd0afed | ||
|
|
8f8ead2c50 | ||
|
|
2f422bad85 | ||
|
|
c0a41b41ca | ||
|
|
68e493cef3 | ||
|
|
06572772d4 | ||
|
|
d12f6c280f | ||
|
|
e62e0685dc | ||
|
|
df92e617db |
@@ -481,7 +481,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
if !httpserver.CheckAuthFlag(w, r, reloadAuthKey) {
|
||||
return true
|
||||
}
|
||||
configReloadRequests.Inc()
|
||||
promscrapeConfigReloadRequests.Inc()
|
||||
procutil.SelfSIGHUP()
|
||||
w.WriteHeader(http.StatusOK)
|
||||
return true
|
||||
@@ -747,7 +747,7 @@ var (
|
||||
promscrapeConfigRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/config"}`)
|
||||
promscrapeStatusConfigRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/api/v1/status/config"}`)
|
||||
|
||||
configReloadRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/-/reload"}`)
|
||||
promscrapeConfigReloadRequests = metrics.NewCounter(`vmagent_http_requests_total{path="/-/reload"}`)
|
||||
)
|
||||
|
||||
func usage() {
|
||||
|
||||
@@ -3,11 +3,9 @@ package remotewrite
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
@@ -32,8 +30,6 @@ var (
|
||||
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
|
||||
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
|
||||
"See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels")
|
||||
relabelConfigCheckInterval = flag.Duration("relabel.configCheckInterval", 0, "Interval for checking for changes in configurations defined via "+
|
||||
"-remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig flags. By default, the checking is disabled.")
|
||||
)
|
||||
|
||||
var labelsGlobal []prompb.Label
|
||||
@@ -71,15 +67,13 @@ func initRelabelConfigs() {
|
||||
}
|
||||
}
|
||||
|
||||
func reloadRelabelConfigs(logReload bool) {
|
||||
func reloadRelabelConfigs() {
|
||||
rcs := allRelabelConfigs.Load()
|
||||
if !rcs.isSet() {
|
||||
return
|
||||
}
|
||||
relabelConfigReloads.Inc()
|
||||
if logReload {
|
||||
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
|
||||
}
|
||||
logger.Infof("reloading relabel configs pointed by -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig")
|
||||
rcs, err := loadRelabelConfigs()
|
||||
if err != nil {
|
||||
relabelConfigReloadErrors.Inc()
|
||||
@@ -277,26 +271,3 @@ func fixPromCompatibleNaming(labels []prompb.Label) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func startRelabelConfigReloader(sighupCh <-chan os.Signal) {
|
||||
configReloaderWG.Add(1)
|
||||
go func() {
|
||||
var tickerCh <-chan time.Time
|
||||
if *relabelConfigCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*relabelConfigCheckInterval)
|
||||
tickerCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
defer configReloaderWG.Done()
|
||||
for {
|
||||
select {
|
||||
case <-configReloaderStopCh:
|
||||
return
|
||||
case <-sighupCh:
|
||||
reloadRelabelConfigs(true)
|
||||
case <-tickerCh:
|
||||
reloadRelabelConfigs(false)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -213,8 +213,20 @@ func Init() {
|
||||
|
||||
dropDanglingQueues()
|
||||
|
||||
startRelabelConfigReloader(sighupCh)
|
||||
startStreamAggrConfigReloader(sighupCh)
|
||||
// Start config reloader.
|
||||
configReloaderWG.Add(1)
|
||||
go func() {
|
||||
defer configReloaderWG.Done()
|
||||
for {
|
||||
select {
|
||||
case <-configReloaderStopCh:
|
||||
return
|
||||
case <-sighupCh:
|
||||
}
|
||||
reloadRelabelConfigs()
|
||||
reloadStreamAggrConfigs()
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
func dropDanglingQueues() {
|
||||
|
||||
@@ -3,9 +3,7 @@ package remotewrite
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"os"
|
||||
"strings"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
@@ -67,8 +65,6 @@ var (
|
||||
streamAggrEnableWindows = flagutil.NewArrayBool("remoteWrite.streamAggr.enableWindows", "Enables aggregation within fixed windows for all remote write's aggregators. "+
|
||||
"This allows to get more precise results, but impacts resource usage as it requires twice more memory to store two states. "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#aggregation-windows.")
|
||||
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking for changes in configurations defined via "+
|
||||
"-streamAggr.config and -remoteWrite.streamAggr.config flags. By default, the checking is disabled.")
|
||||
)
|
||||
|
||||
// CheckStreamAggrConfigs checks -remoteWrite.streamAggr.config and -streamAggr.config.
|
||||
@@ -95,22 +91,20 @@ func CheckStreamAggrConfigs() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
func reloadStreamAggrConfigs(logReload bool) {
|
||||
reloadStreamAggrConfigGlobal(logReload)
|
||||
func reloadStreamAggrConfigs() {
|
||||
reloadStreamAggrConfigGlobal()
|
||||
for _, rwctx := range rwctxsGlobal {
|
||||
rwctx.reloadStreamAggrConfig(logReload)
|
||||
rwctx.reloadStreamAggrConfig()
|
||||
}
|
||||
}
|
||||
|
||||
func reloadStreamAggrConfigGlobal(logReload bool) {
|
||||
func reloadStreamAggrConfigGlobal() {
|
||||
path := *streamAggrGlobalConfig
|
||||
if path == "" {
|
||||
return
|
||||
}
|
||||
|
||||
if logReload {
|
||||
logger.Infof("reloading stream aggregation configs pointed by -streamAggr.config=%q", path)
|
||||
}
|
||||
logger.Infof("reloading stream aggregation configs pointed by -streamAggr.config=%q", path)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
|
||||
|
||||
sasNew, err := newStreamAggrConfigGlobal()
|
||||
@@ -128,9 +122,7 @@ func reloadStreamAggrConfigGlobal(logReload bool) {
|
||||
logger.Infof("successfully reloaded -streamAggr.config=%q", path)
|
||||
} else {
|
||||
sasNew.MustStop()
|
||||
if logReload {
|
||||
logger.Infof("-streamAggr.config=%q wasn't changed since the last reload", path)
|
||||
}
|
||||
logger.Infof("-streamAggr.config=%q wasn't changed since the last reload", path)
|
||||
}
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
|
||||
@@ -179,15 +171,13 @@ func (rwctx *remoteWriteCtx) initStreamAggrConfig() {
|
||||
}
|
||||
}
|
||||
|
||||
func (rwctx *remoteWriteCtx) reloadStreamAggrConfig(logReload bool) {
|
||||
func (rwctx *remoteWriteCtx) reloadStreamAggrConfig() {
|
||||
path := streamAggrConfig.GetOptionalArg(rwctx.idx)
|
||||
if path == "" {
|
||||
return
|
||||
}
|
||||
|
||||
if logReload {
|
||||
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
|
||||
}
|
||||
logger.Infof("reloading stream aggregation configs pointed by -remoteWrite.streamAggr.config=%q", path)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reloads_total{path=%q}`, path)).Inc()
|
||||
|
||||
sasNew, err := rwctx.newStreamAggrConfig()
|
||||
@@ -205,9 +195,7 @@ func (rwctx *remoteWriteCtx) reloadStreamAggrConfig(logReload bool) {
|
||||
logger.Infof("successfully reloaded -remoteWrite.streamAggr.config=%q", path)
|
||||
} else {
|
||||
sasNew.MustStop()
|
||||
if logReload {
|
||||
logger.Infof("-remoteWrite.streamAggr.config=%q wasn't changed since the last reload", path)
|
||||
}
|
||||
logger.Infof("-remoteWrite.streamAggr.config=%q wasn't changed since the last reload", path)
|
||||
}
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_successful{path=%q}`, path)).Set(1)
|
||||
metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_streamaggr_config_reload_success_timestamp_seconds{path=%q}`, path)).Set(fasttime.UnixTimestamp())
|
||||
@@ -268,26 +256,3 @@ func newStreamAggrConfigPerURL(idx int, pushFunc streamaggr.PushFunc) (*streamag
|
||||
}
|
||||
return sas, nil
|
||||
}
|
||||
|
||||
func startStreamAggrConfigReloader(sighupCh <-chan os.Signal) {
|
||||
configReloaderWG.Add(1)
|
||||
go func() {
|
||||
var tickerCh <-chan time.Time
|
||||
if *streamAggrConfigCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
|
||||
tickerCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
defer configReloaderWG.Done()
|
||||
for {
|
||||
select {
|
||||
case <-configReloaderStopCh:
|
||||
return
|
||||
case <-sighupCh:
|
||||
reloadStreamAggrConfigs(true)
|
||||
case <-tickerCh:
|
||||
reloadStreamAggrConfigs(false)
|
||||
}
|
||||
}
|
||||
}()
|
||||
}
|
||||
|
||||
@@ -121,7 +121,7 @@ func (p *vmNativeProcessor) runSingle(ctx context.Context, f native.Filter, srcU
|
||||
pr := bar.NewProxyReader(reader)
|
||||
if pr != nil {
|
||||
reader = pr
|
||||
fmt.Printf("Continue import process with filter %s:\n", f.String())
|
||||
fmt.Fprintf(log.Writer(), "Continue import process with filter %s:\n", f.String())
|
||||
}
|
||||
}
|
||||
|
||||
@@ -191,7 +191,7 @@ func (p *vmNativeProcessor) runBackfilling(ctx context.Context, tenantID string,
|
||||
initParams = []any{srcURL, dstURL, p.filter.String(), tenantID}
|
||||
}
|
||||
|
||||
fmt.Println("") // extra line for better output formatting
|
||||
fmt.Fprintln(log.Writer(), "") // extra line for better output formatting
|
||||
log.Printf(initMessage, initParams...)
|
||||
if len(ranges) > 1 {
|
||||
log.Printf("Selected time range will be split into %d ranges according to %q step", len(ranges), p.filter.Chunk)
|
||||
|
||||
@@ -5,7 +5,6 @@ import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
@@ -20,8 +19,6 @@ import (
|
||||
)
|
||||
|
||||
var (
|
||||
streamAggrConfigCheckInterval = flag.Duration("streamAggr.configCheckInterval", 0, "Interval for checking stream aggregation configuration. "+
|
||||
"By default, the checking is disabled.")
|
||||
streamAggrConfig = flag.String("streamAggr.config", "", "Optional path to file with stream aggregation config. "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/ . "+
|
||||
"See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval")
|
||||
@@ -115,17 +112,10 @@ func InitStreamAggr() {
|
||||
// Start config reloader.
|
||||
saCfgReloaderWG.Add(1)
|
||||
go func() {
|
||||
var tickerCh <-chan time.Time
|
||||
if *streamAggrConfigCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*streamAggrConfigCheckInterval)
|
||||
tickerCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
defer saCfgReloaderWG.Done()
|
||||
for {
|
||||
select {
|
||||
case <-sighupCh:
|
||||
case <-tickerCh:
|
||||
case <-saCfgReloaderStopCh:
|
||||
return
|
||||
}
|
||||
|
||||
@@ -124,7 +124,6 @@ func Stop() {
|
||||
}
|
||||
protoparserutil.StopUnmarshalWorkers()
|
||||
common.MustStopStreamAggr()
|
||||
relabel.Stop()
|
||||
}
|
||||
|
||||
// RequestHandler is a handler for Prometheus remote storage write API
|
||||
|
||||
@@ -3,9 +3,7 @@ package relabel
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
@@ -18,14 +16,11 @@ import (
|
||||
var (
|
||||
relabelConfig = flag.String("relabelConfig", "", "Optional path to a file with relabeling rules, which are applied to all the ingested metrics. "+
|
||||
"The path can point either to local file or to http url. "+
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#relabeling for details. The config is reloaded on SIGHUP signal or"+
|
||||
"at the interval specified by -relabelConfigCheckInterval.")
|
||||
"See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#relabeling for details. The config is reloaded on SIGHUP signal")
|
||||
|
||||
usePromCompatibleNaming = flag.Bool("usePromCompatibleNaming", false, "Whether to replace characters unsupported by Prometheus with underscores "+
|
||||
"in the ingested metric names and label names. For example, foo.bar{a.b='c'} is transformed into foo_bar{a_b='c'} during data ingestion if this flag is set. "+
|
||||
"See https://prometheus.io/docs/concepts/data_model/#metric-names-and-labels")
|
||||
relabelConfigCheckInterval = flag.Duration("relabelConfigCheckInterval", 0, "Interval for checking for changes in '-relabelConfig' file. "+
|
||||
"By default the checking is disabled. Send SIGHUP signal in order to force config check for changes")
|
||||
)
|
||||
|
||||
// Init must be called after flag.Parse and before using the relabel package.
|
||||
@@ -44,8 +39,6 @@ func Init() {
|
||||
return
|
||||
}
|
||||
|
||||
globalStopChan = make(chan struct{})
|
||||
relabelWG.Add(1)
|
||||
configReloads = metrics.NewCounter(`vm_relabel_config_reloads_total`)
|
||||
configReloadErrors = metrics.NewCounter(`vm_relabel_config_reloads_errors_total`)
|
||||
configSuccess = metrics.NewGauge(`vm_relabel_config_last_reload_successful`, nil)
|
||||
@@ -56,46 +49,17 @@ func Init() {
|
||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||
|
||||
go func() {
|
||||
defer relabelWG.Done()
|
||||
var tickerCh <-chan time.Time
|
||||
if *relabelConfigCheckInterval > 0 {
|
||||
ticker := time.NewTicker(*relabelConfigCheckInterval)
|
||||
tickerCh = ticker.C
|
||||
defer ticker.Stop()
|
||||
}
|
||||
var noChangesLogFn func()
|
||||
for {
|
||||
select {
|
||||
case <-sighupCh:
|
||||
logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig)
|
||||
noChangesLogFn = func() {
|
||||
logger.Infof("nothing changed in %q", *relabelConfig)
|
||||
}
|
||||
case <-tickerCh:
|
||||
// silently skip logging for the unchanged config files
|
||||
noChangesLogFn = func() {}
|
||||
case <-globalStopChan:
|
||||
logger.Infof("stopping relabel config reloader")
|
||||
return
|
||||
}
|
||||
pcsNew, err := loadRelabelConfig()
|
||||
for range sighupCh {
|
||||
configReloads.Inc()
|
||||
logger.Infof("received SIGHUP; reloading -relabelConfig=%q...", *relabelConfig)
|
||||
pcs, err := loadRelabelConfig()
|
||||
if err != nil {
|
||||
configReloadErrors.Inc()
|
||||
configSuccess.Set(0)
|
||||
logger.Errorf("cannot load the updated relabelConfig: %s; preserving the previous config", err)
|
||||
continue
|
||||
}
|
||||
if pcsNew.String() == pcs.String() {
|
||||
// set success to 1 since previous reload could have been unsuccessful
|
||||
// do not update configTimestamp as config version remains old.
|
||||
configSuccess.Set(1)
|
||||
noChangesLogFn()
|
||||
continue
|
||||
}
|
||||
configReloads.Inc()
|
||||
pcs = pcsNew
|
||||
pcsGlobal.Store(pcsNew)
|
||||
|
||||
pcsGlobal.Store(pcs)
|
||||
configSuccess.Set(1)
|
||||
configTimestamp.Set(fasttime.UnixTimestamp())
|
||||
logger.Infof("successfully reloaded -relabelConfig=%q", *relabelConfig)
|
||||
@@ -103,21 +67,6 @@ func Init() {
|
||||
}()
|
||||
}
|
||||
|
||||
// Stop stops relabel config reloader watchers
|
||||
func Stop() {
|
||||
if len(*relabelConfig) == 0 {
|
||||
return
|
||||
}
|
||||
|
||||
close(globalStopChan)
|
||||
relabelWG.Wait()
|
||||
}
|
||||
|
||||
var (
|
||||
globalStopChan chan struct{}
|
||||
relabelWG sync.WaitGroup
|
||||
)
|
||||
|
||||
var (
|
||||
configReloads *metrics.Counter
|
||||
configReloadErrors *metrics.Counter
|
||||
|
||||
@@ -262,6 +262,13 @@ func RequestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
return true
|
||||
}
|
||||
return true
|
||||
case "/api/v1/config":
|
||||
httpserver.EnableCORS(w, r)
|
||||
if err := prometheus.ConfigHandler(qt, startTime, w, r); err != nil {
|
||||
httpserver.SendPrometheusError(w, r, err)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
case "/api/v1/export":
|
||||
exportRequests.Inc()
|
||||
if err := prometheus.ExportHandler(startTime, w, r); err != nil {
|
||||
@@ -538,6 +545,13 @@ func handleStaticAndSimpleRequests(w http.ResponseWriter, r *http.Request, path
|
||||
expandWithExprsRequests.Inc()
|
||||
prometheus.ExpandWithExprs(w, r)
|
||||
return true
|
||||
case "/extract-metric-exprs":
|
||||
startTime := time.Now()
|
||||
if err := prometheus.ExtractMetricExprsHandler(startTime, w, r); err != nil {
|
||||
httpserver.Errorf(w, r, "%s", err)
|
||||
return true
|
||||
}
|
||||
return true
|
||||
case "/prettify-query":
|
||||
prettifyQueryRequests.Inc()
|
||||
prometheus.PrettifyQuery(w, r)
|
||||
|
||||
@@ -63,10 +63,18 @@ type Results struct {
|
||||
packedTimeseries []packedTimeseries
|
||||
sr *storage.Search
|
||||
tbf *tmpBlocksFile
|
||||
|
||||
// the result is simulated
|
||||
isSimulated bool
|
||||
simulatedSeries []*storage.SimulatedSamples
|
||||
}
|
||||
|
||||
// Len returns the number of results in rss.
|
||||
func (rss *Results) Len() int {
|
||||
if rss.isSimulated {
|
||||
return len(rss.simulatedSeries)
|
||||
}
|
||||
|
||||
return len(rss.packedTimeseries)
|
||||
}
|
||||
|
||||
@@ -218,6 +226,10 @@ var defaultMaxWorkersPerQuery = func() int {
|
||||
//
|
||||
// rss becomes unusable after the call to RunParallel.
|
||||
func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) error {
|
||||
if rss.isSimulated {
|
||||
return rss.runParallelSimulated(qt, f)
|
||||
}
|
||||
|
||||
qt = qt.NewChild("parallel process of fetched data")
|
||||
defer rss.mustClose()
|
||||
|
||||
@@ -233,6 +245,87 @@ func (rss *Results) RunParallel(qt *querytracer.Tracer, f func(rs *Result, worke
|
||||
return err
|
||||
}
|
||||
|
||||
func (rss *Results) runParallelSimulated(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) error {
|
||||
qt = qt.NewChild("parallel process of fetched data")
|
||||
|
||||
cb := f
|
||||
tmpResult := getTmpResult()
|
||||
defer putTmpResult(tmpResult)
|
||||
|
||||
// For simplicity, let's process serially first. Parallelization can be added if needed.
|
||||
// If parallelization is desired, it would mirror the worker pool logic of the original runParallel,
|
||||
// but iterating over rss.simulatedSamples entries.
|
||||
workerID := uint(0)
|
||||
var firstErr error
|
||||
for _, metric := range rss.simulatedSeries {
|
||||
r := &tmpResult.rs
|
||||
r.reset()
|
||||
r.MetricName.CopyFrom(&metric.Name)
|
||||
for i, ts := range metric.Timestamps {
|
||||
if ts >= rss.tr.MinTimestamp && ts <= rss.tr.MaxTimestamp {
|
||||
r.Values = append(r.Values, metric.Value[i])
|
||||
r.Timestamps = append(r.Timestamps, ts)
|
||||
}
|
||||
}
|
||||
|
||||
// Sort timestamps chronologically to match real storage behavior.
|
||||
// Real storage ensures chronological order through:
|
||||
// 1. Block-level sorting by MinTimestamp
|
||||
// 2. Within-block timestamp ordering via encoding.EnsureNonDecreasingSequence()
|
||||
if len(r.Timestamps) > 1 {
|
||||
// Create pairs for sorting
|
||||
type timestampValue struct {
|
||||
timestamp int64
|
||||
value float64
|
||||
}
|
||||
pairs := make([]timestampValue, len(r.Timestamps))
|
||||
for i := range r.Timestamps {
|
||||
pairs[i] = timestampValue{
|
||||
timestamp: r.Timestamps[i],
|
||||
value: r.Values[i],
|
||||
}
|
||||
}
|
||||
|
||||
// Sort by timestamp
|
||||
sort.Slice(pairs, func(i, j int) bool {
|
||||
return pairs[i].timestamp < pairs[j].timestamp
|
||||
})
|
||||
|
||||
// Extract back to separate slices
|
||||
for i := range pairs {
|
||||
r.Timestamps[i] = pairs[i].timestamp
|
||||
r.Values[i] = pairs[i].value
|
||||
}
|
||||
}
|
||||
|
||||
// The input from the client is most likely already deduplicated, since it's emitted by
|
||||
// vmselect. However, the client may modify the input instead of using the returned one.
|
||||
dedupInterval := storage.GetDedupInterval()
|
||||
if dedupInterval > 0 && len(r.Timestamps) > 0 {
|
||||
r.Timestamps, r.Values = storage.DeduplicateSamples(r.Timestamps, r.Values, dedupInterval)
|
||||
}
|
||||
|
||||
rowProcessed := len(r.Timestamps)
|
||||
|
||||
if rowProcessed > 0 {
|
||||
err := cb(r, workerID)
|
||||
if err != nil {
|
||||
firstErr = err
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// Count total samples across all series
|
||||
totalSamples := 0
|
||||
for _, metric := range rss.simulatedSeries {
|
||||
totalSamples += len(metric.Timestamps)
|
||||
}
|
||||
qt.Donef("series=%d, samples=%d", len(rss.simulatedSeries), totalSamples)
|
||||
|
||||
return firstErr
|
||||
}
|
||||
|
||||
func (rss *Results) runParallel(qt *querytracer.Tracer, f func(rs *Result, workerID uint) error) (int, error) {
|
||||
tswsLen := len(rss.packedTimeseries)
|
||||
if tswsLen == 0 {
|
||||
@@ -1119,6 +1212,10 @@ func SearchMetricNames(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline
|
||||
//
|
||||
// Results.RunParallel or Results.Cancel must be called on the returned Results.
|
||||
func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutil.Deadline) (*Results, error) {
|
||||
if len(sq.SimulatedSeries) > 0 {
|
||||
return processSearchSimulated(qt, sq, deadline)
|
||||
}
|
||||
|
||||
qt = qt.NewChild("fetch matching series: %s", sq)
|
||||
defer qt.Done()
|
||||
if deadline.Exceeded() {
|
||||
@@ -1291,6 +1388,41 @@ func ProcessSearchQuery(qt *querytracer.Tracer, sq *storage.SearchQuery, deadlin
|
||||
return &rss, nil
|
||||
}
|
||||
|
||||
func processSearchSimulated(qt *querytracer.Tracer, sq *storage.SearchQuery, deadline searchutil.Deadline) (*Results, error) {
|
||||
qt = qt.NewChild("fetch matching series (simulated): %s", sq)
|
||||
defer qt.Done()
|
||||
if deadline.Exceeded() {
|
||||
return nil, fmt.Errorf("timeout exceeded before starting the query processing: %s", deadline.String())
|
||||
}
|
||||
|
||||
tr := storage.TimeRange{
|
||||
MinTimestamp: sq.MinTimestamp,
|
||||
MaxTimestamp: sq.MaxTimestamp,
|
||||
}
|
||||
|
||||
// Process simulated samples.
|
||||
matchedSamples, err := storage.MatchSimulatedSamples(sq.SimulatedSeries, sq.TagFilterss)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot match simulated samples: %w", err)
|
||||
}
|
||||
|
||||
// Create a result set similar to ProcessSearchQuery
|
||||
rss := &Results{
|
||||
tr: tr,
|
||||
deadline: deadline,
|
||||
isSimulated: true,
|
||||
simulatedSeries: matchedSamples,
|
||||
}
|
||||
|
||||
if len(matchedSamples) == 0 {
|
||||
qt.Printf("no matching series found")
|
||||
} else {
|
||||
qt.Printf("found %d series", len(rss.simulatedSeries))
|
||||
}
|
||||
|
||||
return rss, nil
|
||||
}
|
||||
|
||||
type blockRef struct {
|
||||
partRef storage.PartRef
|
||||
addr tmpBlockAddr
|
||||
|
||||
20
app/vmselect/prometheus/config_response.qtpl
Normal file
20
app/vmselect/prometheus/config_response.qtpl
Normal file
@@ -0,0 +1,20 @@
|
||||
{% import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
) %}
|
||||
|
||||
{% stripspace %}
|
||||
|
||||
ConfigResponse generates response for /api/v1/config .
|
||||
{% func ConfigResponse(config *ConfigData, qt *querytracer.Tracer) %}
|
||||
{
|
||||
"status":"success",
|
||||
"data":{
|
||||
"minStalenessInterval": {%q= config.MinStalenessInterval %},
|
||||
"maxStalenessInterval": {%q= config.MaxStalenessInterval %}
|
||||
}
|
||||
{% code qt.Done() %}
|
||||
{%= dumpQueryTrace(qt) %}
|
||||
}
|
||||
{% endfunc %}
|
||||
|
||||
{% endstripspace %}
|
||||
73
app/vmselect/prometheus/config_response.qtpl.go
Normal file
73
app/vmselect/prometheus/config_response.qtpl.go
Normal file
@@ -0,0 +1,73 @@
|
||||
// Code generated by qtc from "config_response.qtpl". DO NOT EDIT.
|
||||
// See https://github.com/valyala/quicktemplate for details.
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:1
|
||||
package prometheus
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:1
|
||||
import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/querytracer"
|
||||
)
|
||||
|
||||
// ConfigResponse generates response for /api/v1/config .
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:8
|
||||
import (
|
||||
qtio422016 "io"
|
||||
|
||||
qt422016 "github.com/valyala/quicktemplate"
|
||||
)
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:8
|
||||
var (
|
||||
_ = qtio422016.Copy
|
||||
_ = qt422016.AcquireByteBuffer
|
||||
)
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:8
|
||||
func StreamConfigResponse(qw422016 *qt422016.Writer, config *ConfigData, qt *querytracer.Tracer) {
|
||||
//line app/vmselect/prometheus/config_response.qtpl:8
|
||||
qw422016.N().S(`{"status":"success","data":{"minStalenessInterval":`)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:12
|
||||
qw422016.N().Q(config.MinStalenessInterval)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:12
|
||||
qw422016.N().S(`,"maxStalenessInterval":`)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:13
|
||||
qw422016.N().Q(config.MaxStalenessInterval)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:13
|
||||
qw422016.N().S(`}`)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:15
|
||||
qt.Done()
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:16
|
||||
streamdumpQueryTrace(qw422016, qt)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:16
|
||||
qw422016.N().S(`}`)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
func WriteConfigResponse(qq422016 qtio422016.Writer, config *ConfigData, qt *querytracer.Tracer) {
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
StreamConfigResponse(qw422016, config, qt)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
func ConfigResponse(config *ConfigData, qt *querytracer.Tracer) string {
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
WriteConfigResponse(qb422016, config, qt)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
return qs422016
|
||||
//line app/vmselect/prometheus/config_response.qtpl:18
|
||||
}
|
||||
18
app/vmselect/prometheus/extract_metric_exprs_response.qtpl
Normal file
18
app/vmselect/prometheus/extract_metric_exprs_response.qtpl
Normal file
@@ -0,0 +1,18 @@
|
||||
{% stripspace %}
|
||||
|
||||
ExtractMetricExprsResponse generates response for /extract-metric-exprs .
|
||||
{% func ExtractMetricExprsResponse(metrics []string) %}
|
||||
{
|
||||
"status":"success",
|
||||
"data":[
|
||||
{% if len(metrics) > 0 %}
|
||||
{%q= metrics[0] %}
|
||||
{% for i := 1; i < len(metrics); i++ %}
|
||||
,{%q= metrics[i] %}
|
||||
{% endfor %}
|
||||
{% endif %}
|
||||
]
|
||||
}
|
||||
{% endfunc %}
|
||||
|
||||
{% endstripspace %}
|
||||
@@ -0,0 +1,69 @@
|
||||
// Code generated by qtc from "extract_metric_exprs_response.qtpl". DO NOT EDIT.
|
||||
// See https://github.com/valyala/quicktemplate for details.
|
||||
|
||||
// ExtractMetricExprsResponse generates response for /extract-metric-exprs .
|
||||
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
|
||||
package prometheus
|
||||
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
|
||||
import (
|
||||
qtio422016 "io"
|
||||
|
||||
qt422016 "github.com/valyala/quicktemplate"
|
||||
)
|
||||
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
|
||||
var (
|
||||
_ = qtio422016.Copy
|
||||
_ = qt422016.AcquireByteBuffer
|
||||
)
|
||||
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
|
||||
func StreamExtractMetricExprsResponse(qw422016 *qt422016.Writer, metrics []string) {
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:4
|
||||
qw422016.N().S(`{"status":"success","data":[`)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:8
|
||||
if len(metrics) > 0 {
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:9
|
||||
qw422016.N().Q(metrics[0])
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:10
|
||||
for i := 1; i < len(metrics); i++ {
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:10
|
||||
qw422016.N().S(`,`)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:11
|
||||
qw422016.N().Q(metrics[i])
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:12
|
||||
}
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:13
|
||||
}
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:13
|
||||
qw422016.N().S(`]}`)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
func WriteExtractMetricExprsResponse(qq422016 qtio422016.Writer, metrics []string) {
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
qw422016 := qt422016.AcquireWriter(qq422016)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
StreamExtractMetricExprsResponse(qw422016, metrics)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
qt422016.ReleaseWriter(qw422016)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
}
|
||||
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
func ExtractMetricExprsResponse(metrics []string) string {
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
qb422016 := qt422016.AcquireByteBuffer()
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
WriteExtractMetricExprsResponse(qb422016, metrics)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
qs422016 := string(qb422016.B)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
qt422016.ReleaseByteBuffer(qb422016)
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
return qs422016
|
||||
//line app/vmselect/prometheus/extract_metric_exprs_response.qtpl:16
|
||||
}
|
||||
@@ -1,8 +1,10 @@
|
||||
package prometheus
|
||||
|
||||
import (
|
||||
"encoding/json"
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"math"
|
||||
"net/http"
|
||||
"runtime"
|
||||
@@ -20,6 +22,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/promql"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/querystats"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bufferedwriter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/encoding"
|
||||
@@ -42,6 +45,9 @@ var (
|
||||
maxLookback = flag.Duration("search.maxLookback", 0, "Synonym to -query.lookback-delta from Prometheus. "+
|
||||
"The value is dynamically detected from interval between time series datapoints if not set. It can be overridden on per-query basis via max_lookback arg. "+
|
||||
"See also '-search.maxStalenessInterval' flag, which has the same meaning due to historical reasons")
|
||||
minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+
|
||||
"This flag could be useful for removing gaps on graphs generated from time series with irregular intervals between samples. "+
|
||||
"See also '-search.maxStalenessInterval'")
|
||||
maxStalenessInterval = flag.Duration("search.maxStalenessInterval", 0, "The maximum interval for staleness calculations. "+
|
||||
"By default, it is automatically calculated from the median interval between samples. This flag could be useful for tuning "+
|
||||
"Prometheus data model closer to Influx-style data model. See https://prometheus.io/docs/prometheus/latest/querying/basics/#staleness for details. "+
|
||||
@@ -116,7 +122,7 @@ func FederateHandler(startTime time.Time, w http.ResponseWriter, r *http.Request
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lookbackDelta, err := getMaxLookback(r)
|
||||
lookbackDelta, err := getMaxLookback(r, *maxStalenessInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -611,6 +617,55 @@ func TSDBStatusHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
|
||||
|
||||
var tsdbStatusDuration = metrics.NewSummary(`vm_request_duration_seconds{path="/api/v1/status/tsdb"}`)
|
||||
|
||||
// ConfigData holds the current configuration values for search-related flags
|
||||
type ConfigData struct {
|
||||
MinStalenessInterval string
|
||||
MaxStalenessInterval string
|
||||
}
|
||||
|
||||
// ConfigHandler processes /api/v1/config request.
|
||||
//
|
||||
// It returns the current configuration for search-related flags.
|
||||
func ConfigHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWriter, _ *http.Request) error {
|
||||
config := &ConfigData{
|
||||
MinStalenessInterval: (*minStalenessInterval).String(),
|
||||
MaxStalenessInterval: (*maxStalenessInterval).String(),
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
WriteConfigResponse(bw, config, qt)
|
||||
if err := bw.Flush(); err != nil {
|
||||
return fmt.Errorf("cannot send config response to remote client: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// ExtractMetricExprsHandler processes /extract-metric-exprs request.
|
||||
//
|
||||
// It extracts metric expressions from a given PromQL query.
|
||||
func ExtractMetricExprsHandler(startTime time.Time, w http.ResponseWriter, r *http.Request) error {
|
||||
query := r.FormValue("query")
|
||||
if len(query) == 0 {
|
||||
return fmt.Errorf("missing `query` arg")
|
||||
}
|
||||
|
||||
metrics, err := promql.ExtractMetricsFromQuery(query)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot extract metrics from query: %w", err)
|
||||
}
|
||||
|
||||
w.Header().Set("Content-Type", "application/json")
|
||||
bw := bufferedwriter.Get(w)
|
||||
defer bufferedwriter.Put(bw)
|
||||
WriteExtractMetricExprsResponse(bw, metrics)
|
||||
if err := bw.Flush(); err != nil {
|
||||
return fmt.Errorf("cannot send extract metric exprs response to remote client: %w", err)
|
||||
}
|
||||
return nil
|
||||
}
|
||||
|
||||
// LabelsHandler processes /api/v1/labels request.
|
||||
//
|
||||
// See https://prometheus.io/docs/prometheus/latest/querying/api/#getting-label-names
|
||||
@@ -712,7 +767,8 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
|
||||
|
||||
ct := startTime.UnixNano() / 1e6
|
||||
deadline := searchutil.GetDeadlineForQuery(r, startTime)
|
||||
mayCache := !httputil.GetBool(r, "nocache")
|
||||
isDebug := httputil.GetBool(r, "debug")
|
||||
noCache := httputil.GetBool(r, "nocache") || isDebug
|
||||
query := r.FormValue("query")
|
||||
if len(query) == 0 {
|
||||
return fmt.Errorf("missing `query` arg")
|
||||
@@ -721,7 +777,7 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
lookbackDelta, err := getMaxLookback(r)
|
||||
lookbackDelta, err := getMaxLookback(r, *maxStalenessInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -807,23 +863,14 @@ func QueryHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWr
|
||||
} else {
|
||||
queryOffset = 0
|
||||
}
|
||||
ec := &promql.EvalConfig{
|
||||
Start: start,
|
||||
End: start,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: GetMaxUniqueTimeSeries(),
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
MayCache: mayCache,
|
||||
LookbackDelta: lookbackDelta,
|
||||
RoundDigits: getRoundDigits(r),
|
||||
EnforcedTagFilterss: etfs,
|
||||
CacheTagFilters: etfs,
|
||||
GetRequestURI: func() string {
|
||||
return httpserver.GetRequestURI(r)
|
||||
},
|
||||
ec := newEvalConfig(r, start, start, step, deadline, noCache, lookbackDelta, isDebug, etfs)
|
||||
if isDebug {
|
||||
if err := populateSimulatedData(r, nil, ec); err != nil {
|
||||
_ = r.Body.Close()
|
||||
return fmt.Errorf("cannot read simulated samples: %w", err)
|
||||
}
|
||||
}
|
||||
_ = r.Body.Close()
|
||||
qs := promql.NewQueryStats(query, nil, ec)
|
||||
ec.QueryStats = qs
|
||||
|
||||
@@ -897,8 +944,9 @@ func QueryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
|
||||
func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.ResponseWriter, query string,
|
||||
start, end, step int64, r *http.Request, ct int64, etfs [][]storage.TagFilter) error {
|
||||
deadline := searchutil.GetDeadlineForQuery(r, startTime)
|
||||
mayCache := !httputil.GetBool(r, "nocache")
|
||||
lookbackDelta, err := getMaxLookback(r)
|
||||
isDebug := httputil.GetBool(r, "debug")
|
||||
noCache := httputil.GetBool(r, "nocache") || isDebug
|
||||
lookbackDelta, err := getMaxLookback(r, *maxStalenessInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -913,27 +961,19 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
|
||||
if err := promql.ValidateMaxPointsPerSeries(start, end, step, *maxPointsPerTimeseries); err != nil {
|
||||
return fmt.Errorf("%w; (see -search.maxPointsPerTimeseries command-line flag)", err)
|
||||
}
|
||||
if mayCache {
|
||||
if !noCache {
|
||||
start, end = promql.AdjustStartEnd(start, end, step)
|
||||
}
|
||||
|
||||
ec := &promql.EvalConfig{
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: GetMaxUniqueTimeSeries(),
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
MayCache: mayCache,
|
||||
LookbackDelta: lookbackDelta,
|
||||
RoundDigits: getRoundDigits(r),
|
||||
EnforcedTagFilterss: etfs,
|
||||
CacheTagFilters: etfs,
|
||||
GetRequestURI: func() string {
|
||||
return httpserver.GetRequestURI(r)
|
||||
},
|
||||
ec := newEvalConfig(r, start, end, step, deadline, noCache, lookbackDelta, isDebug, etfs)
|
||||
if isDebug {
|
||||
if err := populateSimulatedData(r, nil, ec); err != nil {
|
||||
_ = r.Body.Close()
|
||||
return fmt.Errorf("cannot read simulated samples: %w", err)
|
||||
}
|
||||
}
|
||||
_ = r.Body.Close()
|
||||
|
||||
qs := promql.NewQueryStats(query, nil, ec)
|
||||
ec.QueryStats = qs
|
||||
|
||||
@@ -969,6 +1009,93 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
|
||||
return nil
|
||||
}
|
||||
|
||||
func newEvalConfig(r *http.Request, start, end, step int64, deadline searchutil.Deadline, noCache bool, lookbackDelta int64, isDebug bool, etfs [][]storage.TagFilter) *promql.EvalConfig {
|
||||
ec := &promql.EvalConfig{
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: GetMaxUniqueTimeSeries(),
|
||||
MinStalenessInterval: *minStalenessInterval,
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
MayCache: !noCache,
|
||||
LookbackDelta: lookbackDelta,
|
||||
RoundDigits: getRoundDigits(r),
|
||||
EnforcedTagFilterss: etfs,
|
||||
CacheTagFilters: etfs,
|
||||
GetRequestURI: func() string {
|
||||
return httpserver.GetRequestURI(r)
|
||||
},
|
||||
}
|
||||
|
||||
return ec
|
||||
}
|
||||
|
||||
func populateSimulatedData(r *http.Request, at *auth.Token, evalConfig *promql.EvalConfig) error {
|
||||
type jsonExportBlockInput struct {
|
||||
Metric map[string]string `json:"metric"`
|
||||
Values []float64 `json:"values"`
|
||||
Timestamps []int64 `json:"timestamps"`
|
||||
}
|
||||
|
||||
// --- Read and Parse Input Samples from r.Body ---
|
||||
var simulatedSeries []*storage.SimulatedSamples
|
||||
decoder := json.NewDecoder(r.Body)
|
||||
lineNum := 0
|
||||
for {
|
||||
var jeb jsonExportBlockInput
|
||||
if err := decoder.Decode(&jeb); err == io.EOF {
|
||||
break
|
||||
} else if err != nil {
|
||||
return fmt.Errorf("error decoding input JSON on line %d: %w", lineNum, err)
|
||||
}
|
||||
|
||||
// Validate that values and timestamps arrays have the same length
|
||||
if len(jeb.Values) != len(jeb.Timestamps) {
|
||||
return fmt.Errorf("mismatched values and timestamps arrays length in debug data on line %d: values=%d, timestamps=%d", lineNum, len(jeb.Values), len(jeb.Timestamps))
|
||||
}
|
||||
|
||||
var mn = storage.GetMetricName()
|
||||
defer storage.PutMetricName(mn)
|
||||
for k, v := range jeb.Metric {
|
||||
mn.AddTag(k, v)
|
||||
}
|
||||
|
||||
ss := &storage.SimulatedSamples{
|
||||
Value: jeb.Values,
|
||||
Timestamps: jeb.Timestamps,
|
||||
}
|
||||
ss.Name.CopyFrom(mn)
|
||||
simulatedSeries = append(simulatedSeries, ss)
|
||||
lineNum++
|
||||
}
|
||||
|
||||
// It doesn't make sense to debug with empty samples
|
||||
if len(simulatedSeries) == 0 {
|
||||
return fmt.Errorf("no simulated samples found")
|
||||
}
|
||||
|
||||
minStalenessInterval, err := httputil.GetDurationRaw(r, "min_staleness_interval", evalConfig.MinStalenessInterval)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse `min_staleness_interval` arg: %w", err)
|
||||
}
|
||||
|
||||
maxStalenessInterval, err := httputil.GetDurationRaw(r, "max_staleness_interval", *maxStalenessInterval)
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse `max_staleness_interval` arg: %w", err)
|
||||
}
|
||||
|
||||
evalConfig.SimulatedSamples = simulatedSeries
|
||||
evalConfig.MinStalenessInterval = minStalenessInterval
|
||||
evalConfig.LookbackDelta, err = getMaxLookback(r, maxStalenessInterval)
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
func removeEmptyValuesAndTimeseries(tss []netstorage.Result) []netstorage.Result {
|
||||
dst := tss[:0]
|
||||
for i := range tss {
|
||||
@@ -1044,7 +1171,7 @@ func adjustLastPoints(tss []netstorage.Result, start, end int64) []netstorage.Re
|
||||
return tss
|
||||
}
|
||||
|
||||
func getMaxLookback(r *http.Request) (int64, error) {
|
||||
func getMaxLookback(r *http.Request, maxStalenessInterval time.Duration) (int64, error) {
|
||||
d := maxLookback.Milliseconds()
|
||||
if d == 0 {
|
||||
d = maxStalenessInterval.Milliseconds()
|
||||
|
||||
@@ -134,6 +134,10 @@ type EvalConfig struct {
|
||||
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
|
||||
LookbackDelta int64
|
||||
|
||||
// MaxStalenessInterval corresponds to -search.maxStalenessInterval,
|
||||
// but customized per query request.
|
||||
MinStalenessInterval time.Duration
|
||||
|
||||
// How many decimal digits after the point to leave in response.
|
||||
RoundDigits int
|
||||
|
||||
@@ -158,6 +162,9 @@ type EvalConfig struct {
|
||||
|
||||
timestamps []int64
|
||||
timestampsOnce sync.Once
|
||||
|
||||
// Simulated samples
|
||||
SimulatedSamples []*storage.SimulatedSamples
|
||||
}
|
||||
|
||||
// copyEvalConfig returns src copy.
|
||||
@@ -176,6 +183,8 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig {
|
||||
ec.CacheTagFilters = src.CacheTagFilters
|
||||
ec.GetRequestURI = src.GetRequestURI
|
||||
ec.QueryStats = src.QueryStats
|
||||
ec.MinStalenessInterval = src.MinStalenessInterval
|
||||
ec.SimulatedSamples = src.SimulatedSamples
|
||||
|
||||
// do not copy src.timestamps - they must be generated again.
|
||||
return &ec
|
||||
@@ -929,7 +938,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
|
||||
}
|
||||
|
||||
ecSQ := copyEvalConfig(ec)
|
||||
ecSQ.Start -= window + step + maxSilenceInterval()
|
||||
ecSQ.Start -= window + step + maxSilenceInterval(ec.MinStalenessInterval)
|
||||
ecSQ.End += step
|
||||
ecSQ.Step = step
|
||||
ecSQ.MaxPointsPerSeries = *maxPointsSubqueryPerTimeseries
|
||||
@@ -946,7 +955,7 @@ func evalRollupFuncWithSubquery(qt *querytracer.Tracer, ec *EvalConfig, funcName
|
||||
return nil, nil
|
||||
}
|
||||
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
|
||||
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
|
||||
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.MinStalenessInterval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1684,7 +1693,7 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
|
||||
}
|
||||
// Obtain rollup configs before fetching data from db, so type errors could be caught earlier.
|
||||
sharedTimestamps := getTimestamps(ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries)
|
||||
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps)
|
||||
preFunc, rcs, err := getRollupConfigs(funcName, rf, expr, ec.Start, ec.End, ec.Step, ec.MaxPointsPerSeries, window, ec.LookbackDelta, sharedTimestamps, ec.MinStalenessInterval)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
@@ -1694,7 +1703,7 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
|
||||
tfss = searchutil.JoinTagFilterss(tfss, ec.EnforcedTagFilterss)
|
||||
minTimestamp := ec.Start
|
||||
if needSilenceIntervalForRollupFunc[funcName] {
|
||||
minTimestamp -= maxSilenceInterval()
|
||||
minTimestamp -= maxSilenceInterval(ec.MinStalenessInterval)
|
||||
}
|
||||
if window > ec.Step {
|
||||
minTimestamp -= window
|
||||
@@ -1702,6 +1711,8 @@ func evalRollupFuncNoCache(qt *querytracer.Tracer, ec *EvalConfig, funcName stri
|
||||
minTimestamp -= ec.Step
|
||||
}
|
||||
sq := storage.NewSearchQuery(minTimestamp, ec.End, tfss, ec.MaxSeries)
|
||||
sq.SimulatedSeries = ec.SimulatedSamples
|
||||
|
||||
rss, err := netstorage.ProcessSearchQuery(qt, sq, ec.Deadline)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
@@ -1787,7 +1798,7 @@ func getRollupMemoryLimiter() *memoryLimiter {
|
||||
return &rollupMemoryLimiter
|
||||
}
|
||||
|
||||
func maxSilenceInterval() int64 {
|
||||
func maxSilenceInterval(minStalenessInterval time.Duration) int64 {
|
||||
d := minStalenessInterval.Milliseconds()
|
||||
if d <= 0 {
|
||||
d = 5 * 60 * 1000
|
||||
|
||||
@@ -61,12 +61,15 @@ func Exec(qt *querytracer.Tracer, ec *EvalConfig, q string, isFirstPointOnly boo
|
||||
}
|
||||
}
|
||||
|
||||
var rv []*timeseries
|
||||
|
||||
qid := activeQueriesV.Add(ec, q)
|
||||
rv, err := evalExpr(qt, ec, e)
|
||||
rv, err = evalExpr(qt, ec, e)
|
||||
activeQueriesV.Remove(qid)
|
||||
if err != nil {
|
||||
return nil, err
|
||||
}
|
||||
|
||||
if isFirstPointOnly {
|
||||
// Remove all the points except the first one from every time series.
|
||||
for _, ts := range rv {
|
||||
@@ -325,3 +328,23 @@ func escapeDots(s string) string {
|
||||
}
|
||||
return string(result)
|
||||
}
|
||||
|
||||
// ExtractMetricsFromQuery visits all the expressions in query and returns all the metrics found in the query.
|
||||
func ExtractMetricsFromQuery(query string) ([]string, error) {
|
||||
expr, err := metricsql.Parse(query)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("error parsing query: %w", err)
|
||||
}
|
||||
|
||||
var metrics []string
|
||||
metricsql.VisitAll(expr, func(e metricsql.Expr) {
|
||||
if me, ok := e.(*metricsql.MetricExpr); ok {
|
||||
metricStr := string(me.AppendString(nil))
|
||||
if metricStr != "" {
|
||||
metrics = append(metrics, metricStr)
|
||||
}
|
||||
}
|
||||
})
|
||||
|
||||
return metrics, nil
|
||||
}
|
||||
|
||||
313
app/vmselect/promql/exec_debug_test.go
Normal file
313
app/vmselect/promql/exec_debug_test.go
Normal file
@@ -0,0 +1,313 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"math"
|
||||
"slices"
|
||||
"testing"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/netstorage"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/app/vmselect/searchutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
)
|
||||
|
||||
func TestSimulatedExec(t *testing.T) {
|
||||
accountID := uint32(123)
|
||||
projectID := uint32(567)
|
||||
start := int64(1000e3)
|
||||
end := int64(2000e3)
|
||||
step := int64(200e3)
|
||||
|
||||
// Base EvalConfig that will be copied for each test
|
||||
baseEC := EvalConfig{
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: 1e4,
|
||||
MaxSeries: 1000,
|
||||
Deadline: searchutil.NewDeadline(time.Now(), time.Hour, ""),
|
||||
RoundDigits: 100,
|
||||
MayCache: false,
|
||||
}
|
||||
|
||||
t.Run(`simple_metric_exact_match`, func(t *testing.T) {
|
||||
t.Skip()
|
||||
ec := copyEvalConfig(&baseEC)
|
||||
mn := newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"a", "b",
|
||||
)
|
||||
|
||||
ec.SimulatedSamples = []*storage.SimulatedSamples{mn.build()}
|
||||
|
||||
q := `test_metric{a="b"}`
|
||||
result, err := Exec(nil, ec, q, false)
|
||||
if err != nil {
|
||||
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
|
||||
}
|
||||
|
||||
// Expected result
|
||||
expectedMN := storage.MetricName{
|
||||
MetricGroup: []byte("test_metric"),
|
||||
Tags: []storage.Tag{
|
||||
{
|
||||
Key: []byte("a"),
|
||||
Value: []byte("b"),
|
||||
},
|
||||
},
|
||||
}
|
||||
expectedResult := []netstorage.Result{
|
||||
{
|
||||
MetricName: expectedMN,
|
||||
Values: mn.Value,
|
||||
Timestamps: mn.Timestamps,
|
||||
},
|
||||
}
|
||||
|
||||
testResultsEqual(t, result, expectedResult)
|
||||
})
|
||||
|
||||
t.Run(`filtered_by_tag_value`, func(t *testing.T) {
|
||||
t.Skip()
|
||||
|
||||
// Create a copy of base EvalConfig
|
||||
ec := copyEvalConfig(&baseEC)
|
||||
mn := metricBuilders{
|
||||
newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"a", "b",
|
||||
"region", "us-west",
|
||||
),
|
||||
newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"a", "b",
|
||||
"region", "us-east",
|
||||
),
|
||||
}
|
||||
ec.SimulatedSamples = mn.build()
|
||||
|
||||
q := `test_metric{region="us-west"}`
|
||||
result, err := Exec(nil, ec, q, false)
|
||||
if err != nil {
|
||||
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
|
||||
}
|
||||
|
||||
// Expected result
|
||||
expectedMN := storage.MetricName{
|
||||
MetricGroup: []byte("test_metric"),
|
||||
Tags: []storage.Tag{
|
||||
{
|
||||
Key: []byte("a"),
|
||||
Value: []byte("b"),
|
||||
},
|
||||
{
|
||||
Key: []byte("region"),
|
||||
Value: []byte("us-west"),
|
||||
},
|
||||
},
|
||||
}
|
||||
expectedResult := []netstorage.Result{
|
||||
{
|
||||
MetricName: expectedMN,
|
||||
Values: mn[0].Value,
|
||||
Timestamps: mn[0].Timestamps,
|
||||
},
|
||||
}
|
||||
|
||||
testResultsEqual(t, result, expectedResult)
|
||||
})
|
||||
|
||||
t.Run(`regex_match_on_tag`, func(t *testing.T) {
|
||||
ec := copyEvalConfig(&baseEC)
|
||||
mn := metricBuilders{
|
||||
newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"env", "prod",
|
||||
),
|
||||
newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"env", "staging",
|
||||
),
|
||||
newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"env", "dev",
|
||||
),
|
||||
}
|
||||
ec.SimulatedSamples = mn.build()
|
||||
|
||||
q := `test_metric{env=~"prod|staging"}`
|
||||
result, err := Exec(nil, ec, q, false)
|
||||
if err != nil {
|
||||
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
|
||||
}
|
||||
|
||||
expectedResult := []netstorage.Result{mn[0].toResult(), mn[1].toResult()}
|
||||
testResultsEqual(t, result, expectedResult)
|
||||
})
|
||||
}
|
||||
|
||||
func TestSumOverTime(t *testing.T) {
|
||||
accountID := uint32(123)
|
||||
projectID := uint32(567)
|
||||
start := int64(1000e3)
|
||||
end := int64(1300e3)
|
||||
step := int64(30e3)
|
||||
|
||||
baseEC := EvalConfig{
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: 1e4,
|
||||
MaxSeries: 1000,
|
||||
Deadline: searchutil.NewDeadline(time.Now(), time.Hour, ""),
|
||||
RoundDigits: 100,
|
||||
MayCache: false,
|
||||
}
|
||||
|
||||
t.Run(`basic_sum_over_time`, func(t *testing.T) {
|
||||
ec := copyEvalConfig(&baseEC)
|
||||
|
||||
metric := newMetric(accountID, projectID,
|
||||
"__name__", "test_metric",
|
||||
"app", "api-server",
|
||||
).withValues(1, 2, 3, 4, 5, 6).withUnix(1000, 1015, 1030, 1045, 1060, 1075)
|
||||
ec.SimulatedSamples = []*storage.SimulatedSamples{metric.build()}
|
||||
|
||||
q := `sum_over_time(test_metric[30s])`
|
||||
result, err := Exec(nil, ec, q, false)
|
||||
if err != nil {
|
||||
t.Fatalf(`unexpected error when executing %q: %s`, q, err)
|
||||
}
|
||||
|
||||
expectedResult := []netstorage.Result{
|
||||
newMetric(accountID, projectID,
|
||||
"app", "api-server",
|
||||
).withValues(1, 5, 9, 6).withUnix(1000, 1030, 1060, 1090).toResult(),
|
||||
}
|
||||
|
||||
testSimulatedResultsEqual(t, result, expectedResult)
|
||||
})
|
||||
}
|
||||
|
||||
type metricBuilder storage.SimulatedSamples
|
||||
|
||||
func newMetric(accountID uint32, projectID uint32, pairs ...string) *metricBuilder {
|
||||
mn := storage.MetricName{}
|
||||
for i := 0; i < len(pairs); i += 2 {
|
||||
mn.AddTag(pairs[i], pairs[i+1])
|
||||
}
|
||||
return &metricBuilder{
|
||||
Name: mn,
|
||||
Value: []float64{10, 20, 30, 40, 50, 60},
|
||||
Timestamps: []int64{1000e3, 1200e3, 1400e3, 1600e3, 1800e3, 2000e3},
|
||||
}
|
||||
}
|
||||
|
||||
func (b *metricBuilder) withUnix(unix ...int64) *metricBuilder {
|
||||
b.Timestamps = make([]int64, len(unix))
|
||||
for i := range unix {
|
||||
b.Timestamps[i] = unix[i] * 1e3
|
||||
}
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *metricBuilder) withValues(values ...float64) *metricBuilder {
|
||||
b.Value = values
|
||||
return b
|
||||
}
|
||||
|
||||
func (b *metricBuilder) build() *storage.SimulatedSamples {
|
||||
return (*storage.SimulatedSamples)(b)
|
||||
}
|
||||
|
||||
func (b *metricBuilder) toResult() netstorage.Result {
|
||||
return netstorage.Result{
|
||||
MetricName: b.Name,
|
||||
Values: b.Value,
|
||||
Timestamps: b.Timestamps,
|
||||
}
|
||||
}
|
||||
|
||||
type metricBuilders []*metricBuilder
|
||||
|
||||
func (b metricBuilders) build() []*storage.SimulatedSamples {
|
||||
ss := make([]*storage.SimulatedSamples, len(b))
|
||||
for i := range b {
|
||||
ss[i] = b[i].build()
|
||||
}
|
||||
return ss
|
||||
}
|
||||
|
||||
func testSimulatedResultsEqual(t *testing.T, result, resultExpected []netstorage.Result) {
|
||||
t.Helper()
|
||||
result = removeEmptyValuesAndTimeseries(result)
|
||||
|
||||
if len(result) != len(resultExpected) {
|
||||
t.Fatalf(`unexpected timeseries count; got %d; want %d`, len(result), len(resultExpected))
|
||||
}
|
||||
for i := range result {
|
||||
r := &result[i]
|
||||
rExpected := &resultExpected[i]
|
||||
testMetricNamesEqual(t, &r.MetricName, &rExpected.MetricName, i)
|
||||
testRowsEqual(t, r.Values, r.Timestamps, rExpected.Values, rExpected.Timestamps)
|
||||
}
|
||||
}
|
||||
|
||||
func removeEmptyValuesAndTimeseries(tss []netstorage.Result) []netstorage.Result {
|
||||
dst := tss[:0]
|
||||
for i := range tss {
|
||||
ts := &tss[i]
|
||||
hasNaNs := slices.ContainsFunc(ts.Values, math.IsNaN)
|
||||
if !hasNaNs {
|
||||
// Fast path: nothing to remove.
|
||||
if len(ts.Values) > 0 {
|
||||
dst = append(dst, *ts)
|
||||
}
|
||||
continue
|
||||
}
|
||||
|
||||
// Slow path: remove NaNs.
|
||||
srcTimestamps := ts.Timestamps
|
||||
dstValues := ts.Values[:0]
|
||||
// Do not reuse ts.Timestamps for dstTimestamps, since ts.Timestamps
|
||||
// may be shared among multiple time series.
|
||||
dstTimestamps := make([]int64, 0, len(ts.Timestamps))
|
||||
for j, v := range ts.Values {
|
||||
if math.IsNaN(v) {
|
||||
continue
|
||||
}
|
||||
dstValues = append(dstValues, v)
|
||||
dstTimestamps = append(dstTimestamps, srcTimestamps[j])
|
||||
}
|
||||
ts.Values = dstValues
|
||||
ts.Timestamps = dstTimestamps
|
||||
if len(ts.Values) > 0 {
|
||||
dst = append(dst, *ts)
|
||||
}
|
||||
}
|
||||
return dst
|
||||
}
|
||||
|
||||
func TestExtractMetricsFromQuery(t *testing.T) {
|
||||
query := `(vm_free_disk_space_bytes{job=~"$job", instance=~"$instance"}-vm_free_disk_space_limit_bytes{job=~"$job", instance=~"$instance"})
|
||||
/
|
||||
ignoring(path) (
|
||||
(rate(vm_rows_added_to_storage_total{job=~"$job", instance=~"$instance"}[1d]) -
|
||||
sum(rate(vm_deduplicated_samples_total{job=~"$job", instance=~"$instance"}[1d])) without (type)) *
|
||||
(
|
||||
sum(vm_data_size_bytes{job=~"$job", instance=~"$instance", type!~"indexdb.*"}) without(type) /
|
||||
sum(vm_rows{job=~"$job", instance=~"$instance", type!~"indexdb.*"}) without(type)
|
||||
)
|
||||
+
|
||||
rate(vm_new_timeseries_created_total{job=~"$job", instance=~"$instance"}[1d]) *
|
||||
(
|
||||
sum(vm_data_size_bytes{job=~"$job", instance=~"$instance", type="indexdb/file"}) /
|
||||
sum(vm_rows{job=~"$job", instance=~"$instance", type="indexdb/file"})
|
||||
)
|
||||
)`
|
||||
metrics, err := ExtractMetricsFromQuery(query)
|
||||
if err != nil {
|
||||
t.Fatalf(`unexpected error when extracting metrics from query: %s`, err)
|
||||
}
|
||||
t.Logf(`metrics: %v`, metrics)
|
||||
}
|
||||
@@ -1,12 +1,12 @@
|
||||
package promql
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"math"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/VictoriaMetrics/metricsql"
|
||||
@@ -17,10 +17,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
|
||||
)
|
||||
|
||||
var minStalenessInterval = flag.Duration("search.minStalenessInterval", 0, "The minimum interval for staleness calculations. "+
|
||||
"This flag could be useful for removing gaps on graphs generated from time series with irregular intervals between samples. "+
|
||||
"See also '-search.maxStalenessInterval'")
|
||||
|
||||
var rollupFuncs = map[string]newRollupFunc{
|
||||
"absent_over_time": newRollupFuncOneArg(rollupAbsent),
|
||||
"aggr_over_time": newRollupFuncTwoArgs(rollupFake),
|
||||
@@ -372,7 +368,7 @@ func getRollupTag(expr metricsql.Expr) (string, error) {
|
||||
}
|
||||
|
||||
func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start, end, step int64, maxPointsPerSeries int,
|
||||
window, lookbackDelta int64, sharedTimestamps []int64) (
|
||||
window, lookbackDelta int64, sharedTimestamps []int64, minStalenessInterval time.Duration) (
|
||||
func(values []float64, timestamps []int64), []*rollupConfig, error) {
|
||||
preFunc := func(_ []float64, _ []int64) {}
|
||||
funcName = strings.ToLower(funcName)
|
||||
@@ -408,6 +404,7 @@ func getRollupConfigs(funcName string, rf rollupFunc, expr metricsql.Expr, start
|
||||
Timestamps: sharedTimestamps,
|
||||
isDefaultRollup: funcName == "default_rollup",
|
||||
samplesScannedPerCall: samplesScannedPerCall,
|
||||
minStalenessInterval: minStalenessInterval,
|
||||
}
|
||||
}
|
||||
|
||||
@@ -600,6 +597,9 @@ type rollupConfig struct {
|
||||
//
|
||||
// If zero, then it is considered that Func scans all the samples passed to it.
|
||||
samplesScannedPerCall int
|
||||
|
||||
// The minimum interval for staleness calculations.
|
||||
minStalenessInterval time.Duration
|
||||
}
|
||||
|
||||
func (rc *rollupConfig) getTimestamps() []int64 {
|
||||
@@ -723,8 +723,8 @@ func (rc *rollupConfig) doInternal(dstValues []float64, tsm *timeseriesMap, valu
|
||||
if rc.LookbackDelta > 0 && maxPrevInterval > rc.LookbackDelta {
|
||||
maxPrevInterval = rc.LookbackDelta
|
||||
}
|
||||
if *minStalenessInterval > 0 {
|
||||
if msi := minStalenessInterval.Milliseconds(); msi > 0 && maxPrevInterval < msi {
|
||||
if rc.minStalenessInterval > 0 {
|
||||
if msi := rc.minStalenessInterval.Milliseconds(); msi > 0 && maxPrevInterval < msi {
|
||||
maxPrevInterval = msi
|
||||
}
|
||||
}
|
||||
|
||||
@@ -1149,7 +1149,6 @@ The relabeling can be debugged at `http://victoriametrics:8428/metric-relabel-de
|
||||
or at our [public demo playground](https://play.victoriametrics.com/select/accounting/1/6a716b0f-38bc-4856-90ce-448fd713e3fe/prometheus/graph/#/relabeling).
|
||||
See [these docs](https://docs.victoriametrics.com/victoriametrics/relabeling/#relabel-debugging) for more details.
|
||||
|
||||
* `-relabelConfigCheckInterval` {{% available_from "#" %}} command-line flag controls how often VictoriaMetrics checks the `-relabelConfig` file for changes and reloads it automatically.
|
||||
|
||||
## Federation
|
||||
|
||||
@@ -2777,9 +2776,7 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-relabelConfig string
|
||||
Optional path to a file with relabeling rules, which are applied to all the ingested metrics. The path can point either to local file or to http url. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#relabeling for details. The config is reloaded on SIGHUP signal orat the interval specified by -relabel.configCheckInterval.
|
||||
-relabelConfigCheckInterval duration
|
||||
Interval for checking for changes in '-relabelConfig' file. By default the checking is disabled. Send SIGHUP signal in order to force config check for changes
|
||||
Optional path to a file with relabeling rules, which are applied to all the ingested metrics. The path can point either to local file or to http url. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#relabeling for details. The config is reloaded on SIGHUP signal
|
||||
-reloadAuthKey value
|
||||
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides httpAuth.* settings.
|
||||
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
|
||||
@@ -2971,8 +2968,6 @@ Pass `-help` to VictoriaMetrics in order to see the list of supported command-li
|
||||
Path to storage data (default "victoria-metrics-data")
|
||||
-streamAggr.config string
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/ . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
|
||||
-streamAggr.configCheckInterval duration
|
||||
Interval for checking stream aggregation configuration. By default, the checking is disabled.
|
||||
-streamAggr.dedupInterval duration
|
||||
Input samples are de-duplicated with this interval before optional aggregation with -streamAggr.config . See also -streamAggr.dropInputLabels and -dedup.minScrapeInterval and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication
|
||||
-streamAggr.dropInput
|
||||
|
||||
@@ -26,8 +26,6 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
* FEATURE: upgrade Go builder from Go1.24.6 to Go1.25. See [Go1.25 release notes](https://tip.golang.org/doc/go1.25).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add export functionality for Query (Table view) and RawQuery tabs in CSV/JSON format. See [#9332](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9332).
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): add `-relabelConfigCheckInterval` and `-streamAggr.configCheckInterval` flags for periodical stream aggregation and relabel configuration reload. When set, `vmsingle` scans the corresponding files for updates and reloads them automatically. See [#9590](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9590).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-streamAggr.configCheckInterval` and `-relabel.configCheckInterval` for periodic configuration reload. When set, `vmagent` scans the corresponding files for updates and reloads them automatically. See [#9590](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9590).
|
||||
|
||||
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): prevent remote write ingestion stop on push error for [Google Pub/Sub](https://docs.victoriametrics.com/victoriametrics/vmagent/#writing-metrics-to-pubsub) integration.
|
||||
* BUGFIX: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): properly handle [mTLS authorization and routing](https://docs.victoriametrics.com/victoriametrics/vmauth/#mtls-based-request-routing). Previously it didn't work. See [#29](https://github.com/VictoriaMetrics/VictoriaLogs/issues/29).
|
||||
|
||||
@@ -184,8 +184,6 @@ support the following approaches for hot reloading stream aggregation configs fr
|
||||
```
|
||||
|
||||
* By sending HTTP request to `/-/reload` endpoint (e.g. `http://vmagent:8429/-/reload` or `http://victoria-metrics:8428/-/reload`).
|
||||
* Use the `-remoteWrite.configCheckInterval` {{% available_from "#" %}} flag to control how often `vmagent` reloads configs specified via `-remoteWrite.streamAggr.config`, `-streamAggr.config`, `-remoteWrite.relabelConfig` and `remoteWrite.urlRelabelConfig` flags. Reload is disabled by default.
|
||||
* Use the `-streamAggr.configCheckInterval` {{% available_from "#" %}} flag to control how often `victoria-metrics` reloads config specified via `-streamAggr.config` flag. Reload is disabled by default.
|
||||
|
||||
## Aggregation outputs
|
||||
|
||||
|
||||
@@ -374,11 +374,7 @@ and `-remoteWrite.streamAggr.config`:
|
||||
|
||||
* Sending HTTP request to `http://vmagent:8429/-/reload` endpoint. This endpoint can be protected with `-reloadAuthKey` command-line flag.
|
||||
|
||||
There is also:
|
||||
|
||||
* `-promscrape.configCheckInterval` command-line flag controls how often VictoriaMetrics checks the `-promscrape.config` file for changes and reloads it automatically.
|
||||
* `-streamAggr.configCheckInterval` {{% available_from "#" %}} command-line flag controls how often VictoriaMetrics checks and automatically reloads configs specified via `-remoteWrite.streamAggr.config` and `-streamAggr.config` flags.
|
||||
* `-relabel.configCheckInterval` {{% available_from "#" %}} command-line flag controls how often VictoriaMetrics checks and automatically reloads configs specified via `-remoteWrite.relabelConfig` and `remoteWrite.urlRelabelConfig` flags.
|
||||
There is also `-promscrape.configCheckInterval` command-line flag, which can be used for automatic reloading configs from updated `-promscrape.config` file.
|
||||
|
||||
## SRV urls
|
||||
|
||||
@@ -1889,8 +1885,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
|
||||
Optional URL to push metrics exposed at /metrics page. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#push-metrics . By default, metrics exposed at /metrics page aren't pushed to any remote storage
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Value can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-relabel.configCheckInterval duration
|
||||
Interval for checking for changes in configurations defined via -remoteWrite.relabelConfig and -remoteWrite.urlRelabelConfig flags. By default, the checking is disabled.
|
||||
-reloadAuthKey value
|
||||
Auth key for /-/reload http endpoint. It must be passed via authKey query arg. It overrides -httpAuth.*
|
||||
Flag value can be read from the given file when using -reloadAuthKey=file:///abs/path/to/file or -reloadAuthKey=file://./relative/path/to/file . Flag value can be read from the given http/https url when using -reloadAuthKey=http://host/path or -reloadAuthKey=https://host/path
|
||||
@@ -2134,8 +2128,6 @@ See the docs at https://docs.victoriametrics.com/victoriametrics/vmagent/ .
|
||||
Whether to sort labels for incoming samples before writing them to all the configured remote storage systems. This may be needed for reducing memory usage at remote storage when the order of labels in incoming samples is random. For example, if m{k1="v1",k2="v2"} may be sent as m{k2="v2",k1="v1"}Enabled sorting for labels can slow down ingestion performance a bit
|
||||
-streamAggr.config string
|
||||
Optional path to file with stream aggregation config. See https://docs.victoriametrics.com/victoriametrics/stream-aggregation/ . See also -streamAggr.keepInput, -streamAggr.dropInput and -streamAggr.dedupInterval
|
||||
-streamAggr.configCheckInterval duration
|
||||
Interval for checking for changes in configurations defined via -streamAggr.config and -remoteWrite.streamAggr.config flags. By default, the checking is disabled.
|
||||
-streamAggr.dedupInterval duration
|
||||
Input samples are de-duplicated with this interval on aggregator before optional aggregation with -streamAggr.config . See also -dedup.minScrapeInterval and https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication
|
||||
-streamAggr.dropInput
|
||||
|
||||
@@ -4,6 +4,7 @@ import (
|
||||
"fmt"
|
||||
"net/http"
|
||||
"strconv"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeutil"
|
||||
)
|
||||
@@ -34,4 +35,31 @@ func GetDuration(r *http.Request, argKey string, defaultValue int64) (int64, err
|
||||
return msecs, nil
|
||||
}
|
||||
|
||||
// GetDurationRaw returns time.Duration from the given argKey query arg.
|
||||
func GetDurationRaw(r *http.Request, argKey string, defaultValue time.Duration) (time.Duration, error) {
|
||||
argValue := r.FormValue(argKey)
|
||||
if len(argValue) == 0 {
|
||||
return defaultValue, nil
|
||||
}
|
||||
if argValue == "undefined" {
|
||||
// This hack is needed for Grafana, which may send undefined value
|
||||
return defaultValue, nil
|
||||
}
|
||||
secs, err := strconv.ParseFloat(argValue, 64)
|
||||
if err != nil {
|
||||
// Try parsing string format
|
||||
d, err := timeutil.ParseDuration(argValue)
|
||||
if err != nil {
|
||||
return 0, fmt.Errorf("cannot parse %q=%q: %w", argKey, argValue, err)
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
d := time.Duration(secs * float64(time.Second))
|
||||
msecs := d.Milliseconds()
|
||||
if msecs <= 0 || msecs > maxDurationMsecs {
|
||||
return 0, fmt.Errorf("%s=%s is out of allowed range [%s ... %s]", argKey, d, time.Millisecond, time.Duration(maxDurationMsecs)*time.Millisecond)
|
||||
}
|
||||
return d, nil
|
||||
}
|
||||
|
||||
const maxDurationMsecs = 100 * 365 * 24 * 3600 * 1000
|
||||
|
||||
@@ -2105,6 +2105,43 @@ func hasCompositeTagFilters(tfs []*tagFilter, prefix []byte) bool {
|
||||
return false
|
||||
}
|
||||
|
||||
// MatchSimulatedSamples filters the given simulatedSamples against the provided tag filters.
|
||||
// It returns only the simulated samples that match any of the given tag filter sets.
|
||||
// This function is used for debugging and testing purposes to simulate metric queries.
|
||||
func MatchSimulatedSamples(simulatedSamples []*SimulatedSamples, tagFilterss [][]TagFilter) ([]*SimulatedSamples, error) {
|
||||
var kb bytesutil.ByteBuffer
|
||||
matchedSamples := make([]*SimulatedSamples, 0, 1)
|
||||
for _, rawTfs := range tagFilterss {
|
||||
tfs := NewTagFilters()
|
||||
for _, tf := range rawTfs {
|
||||
err := tfs.Add(tf.Key, tf.Value, tf.IsNegative, tf.IsRegexp)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot add tagFilter %s: %w", tf.String(), err)
|
||||
}
|
||||
}
|
||||
|
||||
for idx, mn := range simulatedSamples {
|
||||
ok, err := matchTagFilters(&mn.Name, toTFPointers(tfs.tfs), &kb)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("cannot match MetricName %s against tagFilters: %w", mn.Name.String(), err)
|
||||
}
|
||||
if ok {
|
||||
matchedSamples = append(matchedSamples, simulatedSamples[idx])
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return matchedSamples, nil
|
||||
}
|
||||
|
||||
func toTFPointers(tfs []tagFilter) []*tagFilter {
|
||||
tfps := make([]*tagFilter, len(tfs))
|
||||
for i := range tfs {
|
||||
tfps[i] = &tfs[i]
|
||||
}
|
||||
return tfps
|
||||
}
|
||||
|
||||
func matchTagFilters(mn *MetricName, tfs []*tagFilter, kb *bytesutil.ByteBuffer) (bool, error) {
|
||||
kb.B = marshalCommonPrefix(kb.B[:0], nsPrefixTagToMetricIDs)
|
||||
for i, tf := range tfs {
|
||||
|
||||
@@ -2045,14 +2045,6 @@ func TestSearchTSIDWithTimeRange(t *testing.T) {
|
||||
fs.MustRemoveDir(path)
|
||||
}
|
||||
|
||||
func toTFPointers(tfs []tagFilter) []*tagFilter {
|
||||
tfps := make([]*tagFilter, len(tfs))
|
||||
for i := range tfs {
|
||||
tfps[i] = &tfs[i]
|
||||
}
|
||||
return tfps
|
||||
}
|
||||
|
||||
func newTestStorage() *Storage {
|
||||
s := &Storage{
|
||||
cachePath: "test-storage-cache",
|
||||
|
||||
@@ -309,6 +309,33 @@ type SearchQuery struct {
|
||||
|
||||
// The maximum number of time series the search query can return.
|
||||
MaxMetrics int
|
||||
|
||||
// SimulatedSeries is used for simulating samples returned from storage nodes.
|
||||
SimulatedSeries []*SimulatedSamples
|
||||
}
|
||||
|
||||
// SimulatedSamples represents simulated metric samples for debug and testing purposes.
|
||||
// It contains metric name, timestamps and corresponding values.
|
||||
type SimulatedSamples struct {
|
||||
Name MetricName
|
||||
Timestamps []int64
|
||||
Value []float64
|
||||
}
|
||||
|
||||
// NewSimulatedSeries creates a new SimulatedSamples instance with the given parameters.
|
||||
// It constructs a metric name from the provided metric labels.
|
||||
func NewSimulatedSeries(metric map[string]string, timestamp []int64, value []float64) *SimulatedSamples {
|
||||
ss := &SimulatedSamples{
|
||||
Timestamps: timestamp,
|
||||
Value: value,
|
||||
}
|
||||
mn := GetMetricName()
|
||||
defer PutMetricName(mn)
|
||||
for k, v := range metric {
|
||||
mn.AddTag(k, v)
|
||||
}
|
||||
ss.Name.CopyFrom(mn)
|
||||
return ss
|
||||
}
|
||||
|
||||
// GetTimeRange returns time range for the given sq.
|
||||
|
||||
Reference in New Issue
Block a user