Compare commits

...

12 Commits

Author SHA1 Message Date
Jayice
517c17b744 address review comments 2026-06-01 20:10:44 +08:00
Jayice
1c2622d8ae address review comments 2026-06-01 20:04:58 +08:00
Jayice
9a836dac59 address review comments 2026-06-01 20:00:52 +08:00
Jayice
799ecb0a08 support specify label and value to filter metrics to mdx url 2026-06-01 19:50:00 +08:00
Jayice
c88dc19052 address review comments 2026-06-01 14:52:16 +08:00
Jayice
919049f9e2 push slice back to pool 2026-05-25 17:34:41 +08:00
Jayice
24efe47c6a add unit test & address review comments 2026-05-25 17:08:23 +08:00
Jayice
f8d99d9289 polish documentation 2026-04-27 18:02:41 +08:00
Jayice
333a015be5 update CHANGELOG.md 2026-04-27 15:15:59 +08:00
JAYICE
b6196524ba Merge branch 'master' into issue-10600
Signed-off-by: JAYICE <1185430411@qq.com>
2026-04-27 15:13:02 +08:00
Jayice
e9f1bb911c add documentation 2026-04-27 15:10:17 +08:00
Jayice
12b79143dc implement mdx for remote write 2026-04-21 15:48:46 +08:00
5 changed files with 468 additions and 18 deletions

View File

@@ -12,6 +12,7 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/metrics"
@@ -102,6 +103,9 @@ var (
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
"By default, metadata sending is controlled by the global -enableMetadata flag")
enableMdx = flagutil.NewArrayBool("remoteWrite.mdx.enable", "Whether to only retain metrics from VictoriaMetrics services before sending them to the corresponding -remoteWrite.url. "+
"Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
)
var (
@@ -286,6 +290,10 @@ func initRemoteWriteCtxs(urls []string) {
rwctxIdx[i] = i
}
if slices.Contains(*enableMdx, true) && *shardByURL {
logger.Fatalf("-remoteWrite.mdx.enable and -remoteWrite.shardByURL cannot be set to true simultaneously.")
}
if *shardByURL {
consistentHashNodes := make([]string, 0, len(urls))
for i, url := range urls {
@@ -294,6 +302,10 @@ func initRemoteWriteCtxs(urls []string) {
rwctxConsistentHashGlobal = consistenthash.NewConsistentHash(consistentHashNodes, 0)
}
if slices.Contains(*enableMdx, true) {
mdx.InitGlobalFilter()
}
rwctxsGlobal = rwctxs
rwctxsGlobalIdx = rwctxIdx
}
@@ -356,6 +368,7 @@ func Stop() {
if sl := dailySeriesLimiter; sl != nil {
sl.MustStop()
}
mdx.GlobalFilter.MustStop()
}
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url
@@ -837,11 +850,14 @@ type remoteWriteCtx struct {
// otherwise by the global -enableMetadata flag.
enableMetadata bool
enableMdx bool
pss []*pendingSeries
pssNextIdx atomic.Uint64
rowsPushedAfterRelabel *metrics.Counter
rowsDroppedByRelabel *metrics.Counter
rowsDroppedByMdx *metrics.Counter
pushFailures *metrics.Counter
metadataDroppedOnPushFailure *metrics.Counter
@@ -928,16 +944,17 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
for i := range pss {
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
}
rwctx := &remoteWriteCtx{
idx: argIdx,
fq: fq,
c: c,
pss: pss,
enableMetadata: isMetadataEnabledForURL(argIdx),
enableMdx: enableMdx.GetOptionalArg(argIdx),
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
rowsDroppedByMdx: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
pushFailures: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_push_failures_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
metadataDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_metadata_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
@@ -973,6 +990,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.rowsPushedAfterRelabel = nil
rwctx.rowsDroppedByRelabel = nil
rwctx.rowsDroppedByMdx = nil
}
// TryPushTimeSeries sends tss series to the configured remote write endpoint
@@ -990,17 +1008,30 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
putRelabelCtx(rctx)
}()
if rwctx.enableMdx && mdx.GlobalFilter != nil {
rctx = getRelabelCtx()
// Make a copy of tss
rowsCountBeforeMdx := getRowsCount(tss)
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = mdx.GlobalFilter.Filter(tss, *v)
rowsCountAfterMdx := getRowsCount(tss)
rwctx.rowsDroppedByMdx.Add(rowsCountBeforeMdx - rowsCountAfterMdx)
}
// Apply relabeling
rcs := allRelabelConfigs.Load()
pcs := rcs.perURL[rwctx.idx]
if pcs.Len() > 0 {
rctx = getRelabelCtx()
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
if rctx == nil {
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
rctx = getRelabelCtx()
// Make a copy of tss before dropping aggregated series
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
}
rowsCountBeforeRelabel := getRowsCount(tss)
tss = rctx.applyRelabeling(tss, pcs)
rowsCountAfterRelabel := getRowsCount(tss)

View File

@@ -25,6 +25,7 @@ The sandbox cluster installation runs under the constant load generated by
See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): introduce `-remoteWrite.mdx.enable` and `-mdx.instanceEntryTTL` command-line flags to support mdx service in `vmagent`, it allows `vmagent` to send only metrics from VictoriaMetrics services to the corresponding `-remoteWrite.url`. See [monitoring-data-exchange](https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange) and [#10600](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600) for detail.
## [v1.141.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.141.0)

View File

@@ -268,6 +268,40 @@ for the collected samples. Examples:
```sh
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -streamAggr.dedupInterval=60s
```
### Monitoring Data eXchange
The MDX (Monitoring Data eXchange) is a monitoring of monitoring metrics collection and sharing feature.
It aims to send only metrics from the VictoriaMetrics services to the corresponding `-remoteWrite.url`, discarding metrics from non-VictoriaMetrics services.
When enabling MDX for the `-remoteWrite.url`, `vmagent` will only forward metrics from the instances that emit `vm_app_version`, which is a metric that all VictoriaMetrics services will emit.
The number of dropped rows from non-VictoriaMetrics services is exposed as `vmagent_remotewrite_mdx_rows_dropped_total`.
To enable MDX, set `-remoteWrite.mdx.enable=true` for the target URL and `-remoteWrite.mdx.enable=false` for other URLs:
```sh
./vmagent \
-remoteWrite.url=http://service-to-keep-all-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=false \
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=true
```
`vmagent` relies on the `vm_app_version` metric to identify the VictoriaMetrics instance. `vmagent` will not recognize an instance as a VictoriaMetrics instance and retain the instance's metrics until it receives the `vm_app_version` metric from this instance.
If the metrics from the VictoriaMetrics instances have specific label and value in your setup(e.g. service,app label), you can also specify the label name in `-mdx.keepMetricsWithLabel.name` and its value in `-mdx.keepMetricsWithLabel.value`. `vmagent` will retain metrics with the specified label and value to the MDX URL:
```sh
./vmagent \
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=true \
-mdx.keepMetricsWithLabel.name=service \
-mdx.keepMetricsWithLabel.value=victoriametrics
```
Otherwise, `vmagent` will rely on the `vm_app_version` metric mechanism to automatically determine which metrics should be retained.
If `vmagent` does not receive metrics from a Victoriametrics instance for a period, it will remove that instance from the discovered VictoriaMetrics instances list to reduce memory usage.
The TTL period can be configured by `-mdx.instanceEntryTTL` and defaults to 1 hour, it should work well in most scenarios. If you want to release memory more promptly, you can reduce the value, but note that the minimum recommended value is 5 times scrape interval.
### Life of a sample
@@ -285,18 +319,20 @@ flowchart TB
F --> G[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#replication-and-high-availability">replicate</a> to each <b>-remoteWrite.url</b><br/>or <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages">shard</a> if <b>-remoteWrite.shardByURL</b> is set]
%% Left branch
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange/">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
H2 --> H3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
H3 --> H4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
%% Right branch
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
R2 --> R3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
R3 --> R4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
```
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:

156
lib/mdx/filter.go Normal file
View File

@@ -0,0 +1,156 @@
package mdx
import (
"flag"
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/metrics"
)
var (
mdxInstanceEntryTTL = flagutil.NewExtendedDuration("mdx.instanceEntryTTL", "1h", "After not receiving metrics for the VictoriaMetrics instance for the configured time, remove this instance from the MDX instance list."+
"It should be several times the scrape interval for VictoriaMetrics instances. The cleanup mechanism helps release memory after a VictoriaMetrics instance is permanently taken offline, preventing the MDX instance list from growing indefinitely."+
"It must be explicitly set when -remoteWrite.mdx.enable is set and requires explicit unit suffixes (s, m, h, d, w, y). Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
keepMetricsWithLabelName = flag.String("mdx.keepMetricsWithLabel.name", "", "Keep metrics containing specific label and label value to the `-remoteWrite.url` that configured with `-remoteWrite.mdx.enable=true`. "+
"See also -mdx.keepMetricsWithLabel.value.")
keepMetricsWithLabelValue = flag.String("mdx.keepMetricsWithLabel.value", "", "Keep metrics containing specific label and label value to the `-remoteWrite.url` that configured with `-remoteWrite.mdx.enable=true`. "+
"See also -mdx.keepMetricsWithLabel.name")
)
// Filter manages the list of VictoriaMetrics instances discovered from previous data flow, and uses it to filter out metrics that are not from VictoriaMetrics instances.
type Filter struct {
mu sync.RWMutex
wg sync.WaitGroup
stopCh chan struct{}
vmInstance map[string]*atomic.Int64
filterByLabel bool
}
var GlobalFilter *Filter
func InitGlobalFilter() {
GlobalFilter = &Filter{
vmInstance: make(map[string]*atomic.Int64),
stopCh: make(chan struct{}),
}
if len(*keepMetricsWithLabelName) > 0 && len(*keepMetricsWithLabelValue) > 0 {
GlobalFilter.filterByLabel = true
} else if len(*keepMetricsWithLabelName) > 0 || len(*keepMetricsWithLabelValue) > 0 {
logger.Fatalf("Both -mdx.keepMetricsWithLabel.name and -mdx.keepMetricsWithLabel.value must be set if one of them is set.")
}
_ = metrics.NewGauge("vmagent_mdx_tracked_vm_instances", func() float64 {
GlobalFilter.mu.RLock()
n := len(GlobalFilter.vmInstance)
GlobalFilter.mu.RUnlock()
return float64(n)
})
if mdxInstanceEntryTTL.Milliseconds() != 0 {
GlobalFilter.wg.Go(GlobalFilter.cleanStale)
}
}
func (filter *Filter) cleanStale() {
ttlSec := int64(mdxInstanceEntryTTL.Duration().Seconds())
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
filter.mu.Lock()
currTs := time.Now().Unix()
dst := make(map[string]*atomic.Int64, len(filter.vmInstance))
for k, v := range filter.vmInstance {
if currTs-v.Load() < ttlSec {
dst[k] = v
}
}
if len(dst) != len(filter.vmInstance) {
filter.vmInstance = dst
}
filter.mu.Unlock()
case <-filter.stopCh:
return
}
}
}
func (filter *Filter) MustStop() {
if filter == nil {
return
}
close(filter.stopCh)
filter.wg.Wait()
}
func (filter *Filter) Filter(tss []prompb.TimeSeries, resTss []prompb.TimeSeries) []prompb.TimeSeries {
for _, ts := range tss {
isVmInstance := false
var instance string
var job string
isStored := false
for _, label := range ts.Labels {
if label.Name == "__name__" && label.Value == "vm_app_version" {
isVmInstance = true
}
if label.Name == "instance" {
instance = label.Value
}
if label.Name == "job" {
job = label.Value
}
if filter.filterByLabel {
if label.Name == *keepMetricsWithLabelName && label.Value == *keepMetricsWithLabelValue {
resTss = append(resTss, ts)
isStored = true
}
}
}
if len(job) == 0 || len(instance) == 0 {
continue
}
identicalKey := fmt.Sprintf("%q:%q", job, instance)
currTs := time.Now().Unix()
//fast path
filter.mu.RLock()
ptr, ok := filter.vmInstance[identicalKey]
filter.mu.RUnlock()
if ok {
ptr.Store(currTs)
if !isStored {
resTss = append(resTss, ts)
isStored = true
}
continue
}
if !isVmInstance {
continue
}
// slow path
if !isStored {
resTss = append(resTss, ts)
isStored = true
}
filter.mu.Lock()
if ptr, ok = filter.vmInstance[identicalKey]; ok {
ptr.Store(currTs)
} else {
v := atomic.Int64{}
v.Store(currTs)
filter.vmInstance[identicalKey] = &v
}
filter.mu.Unlock()
}
return resTss
}

226
lib/mdx/filter_test.go Normal file
View File

@@ -0,0 +1,226 @@
package mdx
import (
"fmt"
"sort"
"strings"
"sync/atomic"
"testing"
"testing/synctest"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
)
func timeSeriessToString(tss []prompb.TimeSeries) string {
a := make([]string, len(tss))
for i, ts := range tss {
a[i] = timeSeriesToString(ts)
}
sort.Strings(a)
return strings.Join(a, "")
}
func timeSeriesToString(ts prompb.TimeSeries) string {
labelsString := promrelabel.LabelsToString(ts.Labels)
return fmt.Sprintf("%s\n", labelsString)
}
func TestMdxInstanceFilter(t *testing.T) {
filter := Filter{
vmInstance: make(map[string]*atomic.Int64),
}
_ = mdxInstanceEntryTTL.Set("120s")
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries, expectedInstanceMap map[string]int64) {
t.Helper()
output := filter.Filter(input, []prompb.TimeSeries{})
if len(output) != len(expectedOutput) {
t.Fatalf("unexpected output length; got %d; want %d", len(output), len(expectedOutput))
}
if timeSeriessToString(output) != timeSeriessToString(expectedOutput) {
t.Fatalf("unexpected output; got %s; want %s", timeSeriessToString(output), timeSeriessToString(expectedOutput))
}
if len(filter.vmInstance) != len(expectedInstanceMap) {
t.Fatalf("unexpected instance map length; got %d; want %d", len(filter.vmInstance), len(expectedInstanceMap))
}
for k := range expectedInstanceMap {
if filter.vmInstance[k] == nil {
t.Fatalf("missing instance in filter.vmInstance: %q", k)
}
}
}
f([]prompb.TimeSeries{{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "node-exporter1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds"},
{Name: "instance", Value: "service1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent1:8429"},
{Name: "job", Value: "test"},
},
}},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent1:8429"},
{Name: "job", Value: "test"},
},
},
}, map[string]int64{
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
fmt.Sprintf("%q:%q", "test", "vmagent1:8429"): 0,
})
}
func TestMdxFilterByLabel(t *testing.T) {
filter := Filter{
vmInstance: make(map[string]*atomic.Int64),
filterByLabel: true,
}
*keepMetricsWithLabelName = "service"
*keepMetricsWithLabelValue = "victoriametrics"
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries) {
t.Helper()
output := filter.Filter(input, []prompb.TimeSeries{})
if len(output) != len(expectedOutput) {
t.Fatalf("unexpected output length; got %d; want %d", len(output), len(expectedOutput))
}
if timeSeriessToString(output) != timeSeriessToString(expectedOutput) {
t.Fatalf("unexpected output; got %s; want %s", timeSeriessToString(output), timeSeriessToString(expectedOutput))
}
}
f([]prompb.TimeSeries{{
Labels: []prompb.Label{
{Name: "__name__", Value: "up"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "service", Value: "victoriametrics"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "node-exporter1"},
{Name: "job", Value: "test"},
},
}},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "up"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "service", Value: "victoriametrics"},
},
},
})
*keepMetricsWithLabelName = ""
*keepMetricsWithLabelValue = ""
}
func TestMdxInstanceCleanup(t *testing.T) {
t.Helper()
synctest.Test(t, func(t *testing.T) {
_ = mdxInstanceEntryTTL.Set("10s")
InitGlobalFilter()
// init instance list
GlobalFilter.Filter([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "node-exporter1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds"},
{Name: "instance", Value: "service1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent1:8429"},
{Name: "job", Value: "test"},
},
}}, []prompb.TimeSeries{},
)
f := func(expectedInstanceMap map[string]int64) {
t.Helper()
if len(GlobalFilter.vmInstance) != len(expectedInstanceMap) {
t.Fatalf("unexpected instance map length; got %d; want %d", len(GlobalFilter.vmInstance), len(expectedInstanceMap))
}
for k := range expectedInstanceMap {
if GlobalFilter.vmInstance[k] == nil {
t.Fatalf("missing instance in filter.vmInstance: %q", k)
}
}
}
f(map[string]int64{
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
fmt.Sprintf("%q:%q", "test", "vmagent1:8429"): 0,
})
// receive samples from victoria-metrics1:8428 after 9 seconds.
time.Sleep(9 * time.Second)
GlobalFilter.Filter([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
}}, []prompb.TimeSeries{},
)
// no samples from vmagent1:8429 in the last 10 seconds, so it should be removed from the mdx instance list.
time.Sleep(9 * time.Second)
f(map[string]int64{
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
})
GlobalFilter.MustStop()
})
}