Compare commits

..

4 Commits

Author SHA1 Message Date
JAYICE
f8d3cb9ec9 Merge branch 'master' into issue-11028
Signed-off-by: JAYICE <1185430411@qq.com>
2026-06-05 13:40:35 +08:00
Jayice
f7c7017d9b upgrade lib/metricql 2026-06-05 13:29:20 +08:00
Jayice
4d14b0f95a update documentation 2026-06-03 17:21:44 +08:00
Jayice
7ef0733071 support range() function in MetricQL 2026-06-03 15:39:05 +08:00
12 changed files with 48 additions and 465 deletions

View File

@@ -12,7 +12,6 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/metrics"
@@ -104,9 +103,6 @@ var (
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
"By default, metadata sending is controlled by the global -enableMetadata flag")
enableMdx = flagutil.NewArrayBool("remoteWrite.mdx.enable", "Whether to only retain metrics from VictoriaMetrics services before sending them to the corresponding -remoteWrite.url. "+
"Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
)
var (
@@ -308,10 +304,6 @@ func initRemoteWriteCtxs(urls []string) {
}
fs.RegisterPathFsMetrics(*tmpDataPath)
if slices.Contains(*enableMdx, true) && *shardByURL {
logger.Fatalf("-remoteWrite.mdx.enable and -remoteWrite.shardByURL cannot be set to true simultaneously.")
}
if *shardByURL {
consistentHashNodes := make([]string, 0, len(urls))
for i, url := range urls {
@@ -867,7 +859,6 @@ type remoteWriteCtx struct {
sas atomic.Pointer[streamaggr.Aggregators]
deduplicator *streamaggr.Deduplicator
mdxFilter *mdx.Filter
streamAggrKeepInput bool
streamAggrDropInput bool
@@ -882,7 +873,6 @@ type remoteWriteCtx struct {
rowsPushedAfterRelabel *metrics.Counter
rowsDroppedByRelabel *metrics.Counter
rowsPreservedByMdx *metrics.Counter
pushFailures *metrics.Counter
metadataDroppedOnPushFailure *metrics.Counter
@@ -969,6 +959,7 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
for i := range pss {
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
}
rwctx := &remoteWriteCtx{
idx: argIdx,
fq: fq,
@@ -985,15 +976,6 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
}
rwctx.initStreamAggrConfig()
if enableMdx.GetOptionalArg(argIdx) {
rwctx.mdxFilter = mdx.NewFilter()
rwctx.rowsPreservedByMdx = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_preserved_total{path=%q,url=%q}`, queuePath, sanitizedURL))
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_mdx_tracked_vm_instances{path=%q,url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(rwctx.mdxFilter.VmInstancesCount())
})
}
return rwctx
}
@@ -1007,10 +989,6 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.deduplicator.MustStop()
rwctx.deduplicator = nil
}
if rwctx.mdxFilter != nil {
rwctx.mdxFilter.MustStop()
rwctx.mdxFilter = nil
}
for _, ps := range rwctx.pss {
ps.MustStop()
@@ -1026,7 +1004,6 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.rowsPushedAfterRelabel = nil
rwctx.rowsDroppedByRelabel = nil
rwctx.rowsPreservedByMdx = nil
}
// TryPushTimeSeries sends tss series to the configured remote write endpoint
@@ -1035,38 +1012,24 @@ func (rwctx *remoteWriteCtx) MustStop() {
func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) bool {
var rctx *relabelCtx
var v *[]prompb.TimeSeries
acquiredTssFromPool := false
defer func() {
if acquiredTssFromPool {
*v = prompb.ResetTimeSeries(tss)
tssPool.Put(v)
}
if rctx != nil {
putRelabelCtx(rctx)
if rctx == nil {
return
}
*v = prompb.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}()
if rwctx.mdxFilter != nil {
acquiredTssFromPool = true
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = rwctx.mdxFilter.Filter(tss, *v)
rowsCountAfterMdx := getRowsCount(tss)
rwctx.rowsPreservedByMdx.Add(rowsCountAfterMdx)
if len(tss) == 0 {
return true
}
}
// Apply relabeling
rcs := allRelabelConfigs.Load()
pcs := rcs.perURL[rwctx.idx]
if pcs.Len() > 0 {
rctx = getRelabelCtx()
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
rctx = getRelabelCtx()
acquiredTssFromPool = true
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
rowsCountBeforeRelabel := getRowsCount(tss)
@@ -1086,7 +1049,6 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
if rctx == nil {
rctx = getRelabelCtx()
// Make a copy of tss before dropping aggregated series
acquiredTssFromPool = true
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
}
@@ -1096,7 +1058,6 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
if rctx == nil {
rctx = getRelabelCtx()
// Make a copy of tss before dropping aggregated series
acquiredTssFromPool = true
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
}

View File

@@ -8801,6 +8801,17 @@ func TestExecSuccess(t *testing.T) {
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`range()`, func(t *testing.T) {
t.Parallel()
q := `range()`
r := netstorage.Result{
MetricName: metricNameExpected,
Values: []float64{1000, 1000, 1000, 1000, 1000, 1000},
Timestamps: timestampsExpected,
}
resultExpected := []netstorage.Result{r}
f(q, resultExpected)
})
t.Run(`step()`, func(t *testing.T) {
t.Parallel()
q := `time() / step()`

View File

@@ -90,6 +90,7 @@ var transformFuncs = map[string]transformFunc{
"rand": newTransformRand(newRandFloat64),
"rand_exponential": newTransformRand(newRandExpFloat64),
"rand_normal": newTransformRand(newRandNormFloat64),
"range": newTransformFuncZeroArgs(transformRange),
"range_avg": newTransformFuncRange(runningAvg),
"range_first": transformRangeFirst,
"range_last": transformRangeLast,
@@ -2808,6 +2809,10 @@ func transformEnd(tfa *transformFuncArg) float64 {
return float64(tfa.ec.End) / 1e3
}
func transformRange(tfa *transformFuncArg) float64 {
return float64(tfa.ec.End-tfa.ec.Start) / 1e3
}
// copyTimeseries returns a copy of tss.
func copyTimeseries(tss []*timeseries) []*timeseries {
rvs := make([]*timeseries, len(tss))

View File

@@ -1357,7 +1357,7 @@ by replacing all the values bigger or equal to 30 with 40.
`end()` is a [transform function](#transform-functions), which returns the unix timestamp in seconds for the last point.
It is known as `end` query arg passed to [/api/v1/query_range](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#range-query).
See also [start](#start), [time](#time) and [now](#now).
See also [start](#start), [time](#time), [now](#now) and [range](#range).
#### exp
@@ -1575,6 +1575,12 @@ with [normal distribution](https://en.wikipedia.org/wiki/Normal_distribution). O
See also [rand](#rand) and [rand_exponential](#rand_exponential).
#### range
`range()` is a [transform function](#transform-functions), which returns the range duration of the current query range evaluation in seconds and is equivalent to `end() - start()`.
See also [start](#start) and [end](#end).
#### range_avg
`range_avg(q)` is a [transform function](#transform-functions), which calculates the avg value across points per each time series returned by `q`.
@@ -1789,7 +1795,7 @@ This function is supported by PromQL.
It is known as `start` query arg passed to [/api/v1/query_range](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#range-query).
See also [end](#end), [time](#time) and [now](#now).
See also [end](#end), [time](#time), [now](#now) and [range](#range).
#### step

View File

@@ -25,7 +25,6 @@ The sandbox cluster installation runs under the constant load generated by
See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): introduce `-remoteWrite.mdx.enable` and `-mdx.instanceEntryTTL` command-line flags to support mdx service in `vmagent`, it allows `vmagent` to send only metrics from VictoriaMetrics services to the corresponding `-remoteWrite.url`. See [monitoring-data-exchange](https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange) and [#10600](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600) for detail.
* SECURITY: upgrade Go builder from Go1.26.3 to Go1.26.4. See [the list of issues addressed in Go1.26.4](https://github.com/golang/go/issues?q=milestone%3AGo1.26.4%20label%3ACherryPickApproved).
@@ -35,6 +34,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): properly log user information when a missing route error occurs. See [#11052](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11052).
* FEATURE: [vmctl](https://docs.victoriametrics.com/vmctl/): add the ability to migrate data from [Mimir](https://docs.victoriametrics.com/victoriametrics/vmctl/mimir/#) object storage to VictoriaMetrics. See [#7717](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/7717).
* FEATURE: [dashboards](https://github.com/VictoriaMetrics/VictoriaMetrics/tree/master/dashboards): show the full `version` label in the `Version` panel when `short_version` label is empty (e.g. custom builds from feature branch). Previously, the panel could appear empty. See [#11047](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11047).
* FEATURE: [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/): support `range()` function, which returns the range duration of the current query range evaluation in seconds. See [#11028](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11028).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): fix the `Notifiers` page in web UI appearing blank despite the API returning notifier data correctly. See [#11035](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11035).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): reset the group evaluation timestamp if it exceeds the current host time. Previously, vmalert could use future timestamps for evaluations if the system clock was shifted backward. See [#10985](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10985).

View File

@@ -268,31 +268,6 @@ for the collected samples. Examples:
```sh
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -streamAggr.dedupInterval=60s
```
### Monitoring Data eXchange
The MDX (Monitoring Data eXchange) feature aims to send only metrics from the VictoriaMetrics services to the corresponding `-remoteWrite.url`, discarding metrics from non-VictoriaMetrics services.
To enable MDX, set `-remoteWrite.mdx.enable=true` for the target URL and `-remoteWrite.mdx.enable=false` for other URLs:
```sh
./vmagent \
-remoteWrite.url=http://service-to-keep-all-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=false \
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=true
```
When enabling MDX for the `-remoteWrite.url`, `vmagent` will only forward the metrics from the instances that emit `vm_app_version`, which is a metric that all VictoriaMetrics services will emit,
or the metrics contain the label specified by `mdx.label`:
```sh
./vmagent \
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=true \
-mdx.label="service=victoriametrics"
```
The number of preserved rows from non-VictoriaMetrics services is exposed as `vmagent_remotewrite_mdx_rows_preserved_total`.
### Life of a sample
@@ -310,20 +285,18 @@ flowchart TB
F --> G[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#replication-and-high-availability">replicate</a> to each <b>-remoteWrite.url</b><br/>or <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages">shard</a> if <b>-remoteWrite.shardByURL</b> is set]
%% Left branch
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange/">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
H2 --> H3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
H3 --> H4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
%% Right branch
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
R2 --> R3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
R3 --> R4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
```
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:

2
go.mod
View File

@@ -11,7 +11,7 @@ require (
github.com/VictoriaMetrics/easyproto v1.2.0
github.com/VictoriaMetrics/fastcache v1.13.3
github.com/VictoriaMetrics/metrics v1.43.2
github.com/VictoriaMetrics/metricsql v0.87.0
github.com/VictoriaMetrics/metricsql v0.87.1
github.com/aws/aws-sdk-go-v2 v1.41.5
github.com/aws/aws-sdk-go-v2/config v1.32.14
github.com/aws/aws-sdk-go-v2/feature/s3/manager v1.22.13

4
go.sum
View File

@@ -60,8 +60,8 @@ github.com/VictoriaMetrics/fastcache v1.13.3 h1:rBabE0iIxcqKEMCwUmwHZ9dgEqXerg8F
github.com/VictoriaMetrics/fastcache v1.13.3/go.mod h1:hHXhl4DA2fTL2HTZDJFXWgW0LNjo6B+4aj2Wmng3TjU=
github.com/VictoriaMetrics/metrics v1.43.2 h1:+8pIQEGwchKS5CYFyvv3LKvNXGi7baZ9hmIV4RHqibY=
github.com/VictoriaMetrics/metrics v1.43.2/go.mod h1:xDM82ULLYCYdFRgQ2JBxi8Uf1+8En1So9YUwlGTOqTc=
github.com/VictoriaMetrics/metricsql v0.87.0 h1:Koxh3GkB/Z0f3O0bEChVFxiE4YZoxYyn5TzmGJfSfaw=
github.com/VictoriaMetrics/metricsql v0.87.0/go.mod h1:d4EisFO6ONP/HIGDYTAtwrejJBBeKGQYiRl095bS4QQ=
github.com/VictoriaMetrics/metricsql v0.87.1 h1:GdIblCDgXsrBJcBSDtFT8SLK7P+QHijdQmcr4L/f0Go=
github.com/VictoriaMetrics/metricsql v0.87.1/go.mod h1:d4EisFO6ONP/HIGDYTAtwrejJBBeKGQYiRl095bS4QQ=
github.com/VividCortex/ewma v1.2.0 h1:f58SaIzcDXrSy3kWaHNvuJgJ3Nmz59Zji6XoJR/q1ow=
github.com/VividCortex/ewma v1.2.0/go.mod h1:nz4BbCtbLyFDeC9SUHbtcT5644juEuWfUAUnGx7j5l4=
github.com/alecthomas/units v0.0.0-20240927000941-0f3dac36c52b h1:mimo19zliBX/vSQ6PWWSL9lK8qwHozUj03+zLoEB8O0=

View File

@@ -1,159 +0,0 @@
package mdx
import (
"flag"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
var (
vmLabel = flag.String("mdx.label", "", "Optional label in the form 'name=value' to identify metrics from VictoriaMetrics. The metrics contain this label will be kept and sent to the `-remoteWrite.url` that configured with `-remoteWrite.mdx.enable=true`.")
)
// Filter manages the list of VictoriaMetrics instances discovered from previous data flow, and uses it to filter out metrics that are not from VictoriaMetrics instances.
type Filter struct {
mu sync.RWMutex
wg sync.WaitGroup
stopCh chan struct{}
vmInstance map[string]*atomic.Int64
filterByLabel bool
filterByCustomLabelName string
filterByCustomLabelValue string
}
func NewFilter() *Filter {
filter := &Filter{
vmInstance: make(map[string]*atomic.Int64),
stopCh: make(chan struct{}),
}
if len(*vmLabel) != 0 {
n := strings.IndexByte(*vmLabel, '=')
if n < 0 {
logger.Fatalf("missing '=' in `-mdx.label`. It must contain label in the form `name=value`; got %q", *vmLabel)
}
filter.filterByCustomLabelName = (*vmLabel)[:n]
filter.filterByCustomLabelValue = (*vmLabel)[n+1:]
filter.filterByLabel = true
}
filter.wg.Go(filter.cleanStale)
return filter
}
func (filter *Filter) VmInstancesCount() int {
filter.mu.RLock()
defer filter.mu.RUnlock()
return len(filter.vmInstance)
}
func (filter *Filter) cleanStale() {
entryTTL := time.Hour * 1
ttlSec := int64(entryTTL.Seconds())
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
filter.mu.Lock()
currTs := time.Now().Unix()
dst := make(map[string]*atomic.Int64, len(filter.vmInstance))
for k, v := range filter.vmInstance {
if currTs-v.Load() < ttlSec {
dst[k] = v
}
}
if len(dst) != len(filter.vmInstance) {
filter.vmInstance = dst
}
filter.mu.Unlock()
case <-filter.stopCh:
return
}
}
}
func (filter *Filter) MustStop() {
if filter == nil {
return
}
close(filter.stopCh)
filter.wg.Wait()
}
func (filter *Filter) Filter(tss []prompb.TimeSeries, resTss []prompb.TimeSeries) []prompb.TimeSeries {
currTs := time.Now().Unix()
var identicalKey []byte
nextTss:
for _, ts := range tss {
var hasVersionLabel, triedJobInstance bool
var job, instance string
for _, label := range ts.Labels {
if filter.filterByLabel && label.Name == filter.filterByCustomLabelName && label.Value == filter.filterByCustomLabelValue {
resTss = append(resTss, ts)
continue nextTss
}
if label.Name == "__name__" && label.Value == "vm_app_version" {
hasVersionLabel = true
}
if instance == "" && label.Name == "instance" {
if label.Value == "" {
continue
}
instance = label.Value
}
if job == "" && label.Name == "job" {
if label.Value == "" {
continue
}
job = label.Value
}
if !triedJobInstance && job != "" && instance != "" {
identicalKey = identicalKey[:0]
identicalKey = strconv.AppendQuote(identicalKey, job)
identicalKey = append(identicalKey, ':')
identicalKey = strconv.AppendQuote(identicalKey, instance)
filter.mu.RLock()
ptr, found := filter.vmInstance[bytesutil.ToUnsafeString(identicalKey)]
filter.mu.RUnlock()
if found {
ptr.Store(currTs)
resTss = append(resTss, ts)
continue nextTss
}
triedJobInstance = true
}
if hasVersionLabel && job != "" && instance != "" {
identicalKey = identicalKey[:0]
identicalKey = strconv.AppendQuote(identicalKey, job)
identicalKey = append(identicalKey, ':')
identicalKey = strconv.AppendQuote(identicalKey, instance)
v := &atomic.Int64{}
v.Store(currTs)
filter.mu.Lock()
filter.vmInstance[string(identicalKey)] = v
filter.mu.Unlock()
resTss = append(resTss, ts)
continue nextTss
}
}
}
return resTss
}

View File

@@ -1,216 +0,0 @@
package mdx
import (
"fmt"
"sort"
"strings"
"testing"
"testing/synctest"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
)
func timeSeriessToString(tss []prompb.TimeSeries) string {
a := make([]string, len(tss))
for i, ts := range tss {
a[i] = timeSeriesToString(ts)
}
sort.Strings(a)
return strings.Join(a, "")
}
func timeSeriesToString(ts prompb.TimeSeries) string {
labelsString := promrelabel.LabelsToString(ts.Labels)
return fmt.Sprintf("%s\n", labelsString)
}
func TestMdxInstanceFilter(t *testing.T) {
filter := NewFilter()
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries, expectedInstanceMap map[string]int64) {
t.Helper()
output := filter.Filter(input, []prompb.TimeSeries{})
if len(output) != len(expectedOutput) {
t.Fatalf("unexpected output length; got %d; want %d", len(output), len(expectedOutput))
}
if timeSeriessToString(output) != timeSeriessToString(expectedOutput) {
t.Fatalf("unexpected output; got %s; want %s", timeSeriessToString(output), timeSeriessToString(expectedOutput))
}
if len(filter.vmInstance) != len(expectedInstanceMap) {
t.Fatalf("unexpected instance map length; got %d; want %d", len(filter.vmInstance), len(expectedInstanceMap))
}
for k := range expectedInstanceMap {
if filter.vmInstance[k] == nil {
t.Fatalf("missing instance in filter.vmInstance: %q", k)
}
}
}
f([]prompb.TimeSeries{{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "node-exporter1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds"},
{Name: "instance", Value: "service1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent1:8429"},
{Name: "job", Value: "test"},
},
}},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent1:8429"},
{Name: "job", Value: "test"},
},
},
}, map[string]int64{
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
fmt.Sprintf("%q:%q", "test", "vmagent1:8429"): 0,
})
}
func TestMdxFilterByLabel(t *testing.T) {
*vmLabel = "service=victoriametrics"
filter := NewFilter()
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries) {
t.Helper()
output := filter.Filter(input, []prompb.TimeSeries{})
if len(output) != len(expectedOutput) {
t.Fatalf("unexpected output length; got %d; want %d", len(output), len(expectedOutput))
}
if timeSeriessToString(output) != timeSeriessToString(expectedOutput) {
t.Fatalf("unexpected output; got %s; want %s", timeSeriessToString(output), timeSeriessToString(expectedOutput))
}
}
f([]prompb.TimeSeries{{
Labels: []prompb.Label{
{Name: "__name__", Value: "up"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "service", Value: "victoriametrics"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "node-exporter1"},
{Name: "job", Value: "test"},
},
}},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "up"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "service", Value: "victoriametrics"},
},
},
})
*vmLabel = ""
}
func TestMdxInstanceCleanup(t *testing.T) {
t.Helper()
synctest.Test(t, func(t *testing.T) {
filter := NewFilter()
// init instance list
filter.Filter([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "node-exporter1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds"},
{Name: "instance", Value: "service1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent1:8429"},
{Name: "job", Value: "test"},
},
}}, []prompb.TimeSeries{},
)
f := func(expectedInstanceMap map[string]int64) {
t.Helper()
if len(filter.vmInstance) != len(expectedInstanceMap) {
t.Fatalf("unexpected instance map length; got %d; want %d", len(filter.vmInstance), len(expectedInstanceMap))
}
for k := range expectedInstanceMap {
if filter.vmInstance[k] == nil {
t.Fatalf("missing instance in filter.vmInstance: %q", k)
}
}
}
f(map[string]int64{
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
fmt.Sprintf("%q:%q", "test", "vmagent1:8429"): 0,
})
// receive samples from victoria-metrics1:8428 after 9 seconds.
time.Sleep(59 * time.Minute)
filter.Filter([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
}}, []prompb.TimeSeries{},
)
// no samples from vmagent1:8429 in the last 10 seconds, so it should be removed from the mdx instance list.
time.Sleep(2 * time.Minute)
f(map[string]int64{
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
})
filter.MustStop()
})
}

View File

@@ -74,6 +74,7 @@ var transformFuncs = map[string]bool{
"rand": true,
"rand_exponential": true,
"rand_normal": true,
"range": true,
"range_avg": true,
"range_first": true,
"range_last": true,

2
vendor/modules.txt vendored
View File

@@ -145,7 +145,7 @@ github.com/VictoriaMetrics/fastcache
# github.com/VictoriaMetrics/metrics v1.43.2
## explicit; go 1.24.0
github.com/VictoriaMetrics/metrics
# github.com/VictoriaMetrics/metricsql v0.87.0
# github.com/VictoriaMetrics/metricsql v0.87.1
## explicit; go 1.24.2
github.com/VictoriaMetrics/metricsql
github.com/VictoriaMetrics/metricsql/binaryop