mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-04 01:22:05 +03:00
Compare commits
12 Commits
issue-1102
...
issue-1060
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
517c17b744 | ||
|
|
1c2622d8ae | ||
|
|
9a836dac59 | ||
|
|
799ecb0a08 | ||
|
|
c88dc19052 | ||
|
|
919049f9e2 | ||
|
|
24efe47c6a | ||
|
|
f8d99d9289 | ||
|
|
333a015be5 | ||
|
|
b6196524ba | ||
|
|
e9f1bb911c | ||
|
|
12b79143dc |
@@ -12,6 +12,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
@@ -102,6 +103,9 @@ var (
|
||||
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
|
||||
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
|
||||
"By default, metadata sending is controlled by the global -enableMetadata flag")
|
||||
|
||||
enableMdx = flagutil.NewArrayBool("remoteWrite.mdx.enable", "Whether to only retain metrics from VictoriaMetrics services before sending them to the corresponding -remoteWrite.url. "+
|
||||
"Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -286,6 +290,10 @@ func initRemoteWriteCtxs(urls []string) {
|
||||
rwctxIdx[i] = i
|
||||
}
|
||||
|
||||
if slices.Contains(*enableMdx, true) && *shardByURL {
|
||||
logger.Fatalf("-remoteWrite.mdx.enable and -remoteWrite.shardByURL cannot be set to true simultaneously.")
|
||||
}
|
||||
|
||||
if *shardByURL {
|
||||
consistentHashNodes := make([]string, 0, len(urls))
|
||||
for i, url := range urls {
|
||||
@@ -294,6 +302,10 @@ func initRemoteWriteCtxs(urls []string) {
|
||||
rwctxConsistentHashGlobal = consistenthash.NewConsistentHash(consistentHashNodes, 0)
|
||||
}
|
||||
|
||||
if slices.Contains(*enableMdx, true) {
|
||||
mdx.InitGlobalFilter()
|
||||
}
|
||||
|
||||
rwctxsGlobal = rwctxs
|
||||
rwctxsGlobalIdx = rwctxIdx
|
||||
}
|
||||
@@ -356,6 +368,7 @@ func Stop() {
|
||||
if sl := dailySeriesLimiter; sl != nil {
|
||||
sl.MustStop()
|
||||
}
|
||||
mdx.GlobalFilter.MustStop()
|
||||
}
|
||||
|
||||
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url
|
||||
@@ -837,11 +850,14 @@ type remoteWriteCtx struct {
|
||||
// otherwise by the global -enableMetadata flag.
|
||||
enableMetadata bool
|
||||
|
||||
enableMdx bool
|
||||
|
||||
pss []*pendingSeries
|
||||
pssNextIdx atomic.Uint64
|
||||
|
||||
rowsPushedAfterRelabel *metrics.Counter
|
||||
rowsDroppedByRelabel *metrics.Counter
|
||||
rowsDroppedByMdx *metrics.Counter
|
||||
|
||||
pushFailures *metrics.Counter
|
||||
metadataDroppedOnPushFailure *metrics.Counter
|
||||
@@ -928,16 +944,17 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
for i := range pss {
|
||||
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
|
||||
}
|
||||
|
||||
rwctx := &remoteWriteCtx{
|
||||
idx: argIdx,
|
||||
fq: fq,
|
||||
c: c,
|
||||
pss: pss,
|
||||
enableMetadata: isMetadataEnabledForURL(argIdx),
|
||||
enableMdx: enableMdx.GetOptionalArg(argIdx),
|
||||
|
||||
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
rowsDroppedByMdx: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
|
||||
pushFailures: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_push_failures_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
metadataDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_metadata_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
@@ -973,6 +990,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
|
||||
rwctx.rowsPushedAfterRelabel = nil
|
||||
rwctx.rowsDroppedByRelabel = nil
|
||||
rwctx.rowsDroppedByMdx = nil
|
||||
}
|
||||
|
||||
// TryPushTimeSeries sends tss series to the configured remote write endpoint
|
||||
@@ -990,17 +1008,30 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
putRelabelCtx(rctx)
|
||||
}()
|
||||
|
||||
if rwctx.enableMdx && mdx.GlobalFilter != nil {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss
|
||||
rowsCountBeforeMdx := getRowsCount(tss)
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = mdx.GlobalFilter.Filter(tss, *v)
|
||||
rowsCountAfterMdx := getRowsCount(tss)
|
||||
rwctx.rowsDroppedByMdx.Add(rowsCountBeforeMdx - rowsCountAfterMdx)
|
||||
}
|
||||
|
||||
// Apply relabeling
|
||||
rcs := allRelabelConfigs.Load()
|
||||
pcs := rcs.perURL[rwctx.idx]
|
||||
if pcs.Len() > 0 {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before applying relabeling in order to prevent
|
||||
// from affecting time series for other remoteWrite.url configs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
|
||||
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
if rctx == nil {
|
||||
// Make a copy of tss before applying relabeling in order to prevent
|
||||
// from affecting time series for other remoteWrite.url configs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
|
||||
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before dropping aggregated series
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
}
|
||||
rowsCountBeforeRelabel := getRowsCount(tss)
|
||||
tss = rctx.applyRelabeling(tss, pcs)
|
||||
rowsCountAfterRelabel := getRowsCount(tss)
|
||||
|
||||
@@ -25,6 +25,7 @@ The sandbox cluster installation runs under the constant load generated by
|
||||
See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
|
||||
|
||||
## tip
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): introduce `-remoteWrite.mdx.enable` and `-mdx.instanceEntryTTL` command-line flags to support mdx service in `vmagent`, it allows `vmagent` to send only metrics from VictoriaMetrics services to the corresponding `-remoteWrite.url`. See [monitoring-data-exchange](https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange) and [#10600](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600) for detail.
|
||||
|
||||
## [v1.141.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.141.0)
|
||||
|
||||
|
||||
@@ -268,6 +268,40 @@ for the collected samples. Examples:
|
||||
```sh
|
||||
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -streamAggr.dedupInterval=60s
|
||||
```
|
||||
|
||||
### Monitoring Data eXchange
|
||||
|
||||
The MDX (Monitoring Data eXchange) is a monitoring of monitoring metrics collection and sharing feature.
|
||||
It aims to send only metrics from the VictoriaMetrics services to the corresponding `-remoteWrite.url`, discarding metrics from non-VictoriaMetrics services.
|
||||
|
||||
When enabling MDX for the `-remoteWrite.url`, `vmagent` will only forward metrics from the instances that emit `vm_app_version`, which is a metric that all VictoriaMetrics services will emit.
|
||||
The number of dropped rows from non-VictoriaMetrics services is exposed as `vmagent_remotewrite_mdx_rows_dropped_total`.
|
||||
|
||||
To enable MDX, set `-remoteWrite.mdx.enable=true` for the target URL and `-remoteWrite.mdx.enable=false` for other URLs:
|
||||
|
||||
```sh
|
||||
./vmagent \
|
||||
-remoteWrite.url=http://service-to-keep-all-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=false \
|
||||
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=true
|
||||
```
|
||||
|
||||
`vmagent` relies on the `vm_app_version` metric to identify the VictoriaMetrics instance. `vmagent` will not recognize an instance as a VictoriaMetrics instance and retain the instance's metrics until it receives the `vm_app_version` metric from this instance.
|
||||
|
||||
If the metrics from the VictoriaMetrics instances have specific label and value in your setup(e.g. service,app label), you can also specify the label name in `-mdx.keepMetricsWithLabel.name` and its value in `-mdx.keepMetricsWithLabel.value`. `vmagent` will retain metrics with the specified label and value to the MDX URL:
|
||||
|
||||
```sh
|
||||
./vmagent \
|
||||
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=true \
|
||||
-mdx.keepMetricsWithLabel.name=service \
|
||||
-mdx.keepMetricsWithLabel.value=victoriametrics
|
||||
```
|
||||
Otherwise, `vmagent` will rely on the `vm_app_version` metric mechanism to automatically determine which metrics should be retained.
|
||||
|
||||
If `vmagent` does not receive metrics from a Victoriametrics instance for a period, it will remove that instance from the discovered VictoriaMetrics instances list to reduce memory usage.
|
||||
The TTL period can be configured by `-mdx.instanceEntryTTL` and defaults to 1 hour, it should work well in most scenarios. If you want to release memory more promptly, you can reduce the value, but note that the minimum recommended value is 5 times scrape interval.
|
||||
|
||||
### Life of a sample
|
||||
|
||||
@@ -285,18 +319,20 @@ flowchart TB
|
||||
F --> G[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#replication-and-high-availability">replicate</a> to each <b>-remoteWrite.url</b><br/>or <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages">shard</a> if <b>-remoteWrite.shardByURL</b> is set]
|
||||
|
||||
%% Left branch
|
||||
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
|
||||
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange/">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
H2 --> H3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H3 --> H4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
|
||||
|
||||
%% Right branch
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R2 --> R3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R3 --> R4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
|
||||
```
|
||||
|
||||
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:
|
||||
|
||||
156
lib/mdx/filter.go
Normal file
156
lib/mdx/filter.go
Normal file
@@ -0,0 +1,156 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
mdxInstanceEntryTTL = flagutil.NewExtendedDuration("mdx.instanceEntryTTL", "1h", "After not receiving metrics for the VictoriaMetrics instance for the configured time, remove this instance from the MDX instance list."+
|
||||
"It should be several times the scrape interval for VictoriaMetrics instances. The cleanup mechanism helps release memory after a VictoriaMetrics instance is permanently taken offline, preventing the MDX instance list from growing indefinitely."+
|
||||
"It must be explicitly set when -remoteWrite.mdx.enable is set and requires explicit unit suffixes (s, m, h, d, w, y). Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
|
||||
keepMetricsWithLabelName = flag.String("mdx.keepMetricsWithLabel.name", "", "Keep metrics containing specific label and label value to the `-remoteWrite.url` that configured with `-remoteWrite.mdx.enable=true`. "+
|
||||
"See also -mdx.keepMetricsWithLabel.value.")
|
||||
keepMetricsWithLabelValue = flag.String("mdx.keepMetricsWithLabel.value", "", "Keep metrics containing specific label and label value to the `-remoteWrite.url` that configured with `-remoteWrite.mdx.enable=true`. "+
|
||||
"See also -mdx.keepMetricsWithLabel.name")
|
||||
)
|
||||
|
||||
// Filter manages the list of VictoriaMetrics instances discovered from previous data flow, and uses it to filter out metrics that are not from VictoriaMetrics instances.
|
||||
type Filter struct {
|
||||
mu sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
vmInstance map[string]*atomic.Int64
|
||||
filterByLabel bool
|
||||
}
|
||||
|
||||
var GlobalFilter *Filter
|
||||
|
||||
func InitGlobalFilter() {
|
||||
GlobalFilter = &Filter{
|
||||
vmInstance: make(map[string]*atomic.Int64),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
if len(*keepMetricsWithLabelName) > 0 && len(*keepMetricsWithLabelValue) > 0 {
|
||||
GlobalFilter.filterByLabel = true
|
||||
} else if len(*keepMetricsWithLabelName) > 0 || len(*keepMetricsWithLabelValue) > 0 {
|
||||
logger.Fatalf("Both -mdx.keepMetricsWithLabel.name and -mdx.keepMetricsWithLabel.value must be set if one of them is set.")
|
||||
}
|
||||
|
||||
_ = metrics.NewGauge("vmagent_mdx_tracked_vm_instances", func() float64 {
|
||||
GlobalFilter.mu.RLock()
|
||||
n := len(GlobalFilter.vmInstance)
|
||||
GlobalFilter.mu.RUnlock()
|
||||
return float64(n)
|
||||
})
|
||||
if mdxInstanceEntryTTL.Milliseconds() != 0 {
|
||||
GlobalFilter.wg.Go(GlobalFilter.cleanStale)
|
||||
}
|
||||
}
|
||||
|
||||
func (filter *Filter) cleanStale() {
|
||||
ttlSec := int64(mdxInstanceEntryTTL.Duration().Seconds())
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
filter.mu.Lock()
|
||||
currTs := time.Now().Unix()
|
||||
|
||||
dst := make(map[string]*atomic.Int64, len(filter.vmInstance))
|
||||
for k, v := range filter.vmInstance {
|
||||
if currTs-v.Load() < ttlSec {
|
||||
dst[k] = v
|
||||
}
|
||||
}
|
||||
if len(dst) != len(filter.vmInstance) {
|
||||
filter.vmInstance = dst
|
||||
}
|
||||
filter.mu.Unlock()
|
||||
case <-filter.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (filter *Filter) MustStop() {
|
||||
if filter == nil {
|
||||
return
|
||||
}
|
||||
close(filter.stopCh)
|
||||
filter.wg.Wait()
|
||||
}
|
||||
|
||||
func (filter *Filter) Filter(tss []prompb.TimeSeries, resTss []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
for _, ts := range tss {
|
||||
isVmInstance := false
|
||||
var instance string
|
||||
var job string
|
||||
isStored := false
|
||||
for _, label := range ts.Labels {
|
||||
if label.Name == "__name__" && label.Value == "vm_app_version" {
|
||||
isVmInstance = true
|
||||
}
|
||||
if label.Name == "instance" {
|
||||
instance = label.Value
|
||||
}
|
||||
if label.Name == "job" {
|
||||
job = label.Value
|
||||
}
|
||||
if filter.filterByLabel {
|
||||
if label.Name == *keepMetricsWithLabelName && label.Value == *keepMetricsWithLabelValue {
|
||||
resTss = append(resTss, ts)
|
||||
isStored = true
|
||||
}
|
||||
}
|
||||
}
|
||||
if len(job) == 0 || len(instance) == 0 {
|
||||
continue
|
||||
}
|
||||
identicalKey := fmt.Sprintf("%q:%q", job, instance)
|
||||
currTs := time.Now().Unix()
|
||||
|
||||
//fast path
|
||||
filter.mu.RLock()
|
||||
ptr, ok := filter.vmInstance[identicalKey]
|
||||
filter.mu.RUnlock()
|
||||
if ok {
|
||||
ptr.Store(currTs)
|
||||
if !isStored {
|
||||
resTss = append(resTss, ts)
|
||||
isStored = true
|
||||
}
|
||||
continue
|
||||
}
|
||||
if !isVmInstance {
|
||||
continue
|
||||
}
|
||||
|
||||
// slow path
|
||||
if !isStored {
|
||||
resTss = append(resTss, ts)
|
||||
isStored = true
|
||||
}
|
||||
filter.mu.Lock()
|
||||
if ptr, ok = filter.vmInstance[identicalKey]; ok {
|
||||
ptr.Store(currTs)
|
||||
} else {
|
||||
v := atomic.Int64{}
|
||||
v.Store(currTs)
|
||||
filter.vmInstance[identicalKey] = &v
|
||||
}
|
||||
filter.mu.Unlock()
|
||||
}
|
||||
|
||||
return resTss
|
||||
}
|
||||
226
lib/mdx/filter_test.go
Normal file
226
lib/mdx/filter_test.go
Normal file
@@ -0,0 +1,226 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"sync/atomic"
|
||||
"testing"
|
||||
"testing/synctest"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
)
|
||||
|
||||
func timeSeriessToString(tss []prompb.TimeSeries) string {
|
||||
a := make([]string, len(tss))
|
||||
for i, ts := range tss {
|
||||
a[i] = timeSeriesToString(ts)
|
||||
}
|
||||
sort.Strings(a)
|
||||
return strings.Join(a, "")
|
||||
}
|
||||
|
||||
func timeSeriesToString(ts prompb.TimeSeries) string {
|
||||
labelsString := promrelabel.LabelsToString(ts.Labels)
|
||||
|
||||
return fmt.Sprintf("%s\n", labelsString)
|
||||
}
|
||||
|
||||
func TestMdxInstanceFilter(t *testing.T) {
|
||||
filter := Filter{
|
||||
vmInstance: make(map[string]*atomic.Int64),
|
||||
}
|
||||
_ = mdxInstanceEntryTTL.Set("120s")
|
||||
|
||||
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries, expectedInstanceMap map[string]int64) {
|
||||
t.Helper()
|
||||
output := filter.Filter(input, []prompb.TimeSeries{})
|
||||
if len(output) != len(expectedOutput) {
|
||||
t.Fatalf("unexpected output length; got %d; want %d", len(output), len(expectedOutput))
|
||||
}
|
||||
if timeSeriessToString(output) != timeSeriessToString(expectedOutput) {
|
||||
t.Fatalf("unexpected output; got %s; want %s", timeSeriessToString(output), timeSeriessToString(expectedOutput))
|
||||
}
|
||||
if len(filter.vmInstance) != len(expectedInstanceMap) {
|
||||
t.Fatalf("unexpected instance map length; got %d; want %d", len(filter.vmInstance), len(expectedInstanceMap))
|
||||
}
|
||||
for k := range expectedInstanceMap {
|
||||
if filter.vmInstance[k] == nil {
|
||||
t.Fatalf("missing instance in filter.vmInstance: %q", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
f([]prompb.TimeSeries{{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "node-exporter1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_request_duration_seconds"},
|
||||
{Name: "instance", Value: "service1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent1:8429"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
}},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent1:8429"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
}, map[string]int64{
|
||||
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
|
||||
fmt.Sprintf("%q:%q", "test", "vmagent1:8429"): 0,
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
func TestMdxFilterByLabel(t *testing.T) {
|
||||
filter := Filter{
|
||||
vmInstance: make(map[string]*atomic.Int64),
|
||||
filterByLabel: true,
|
||||
}
|
||||
*keepMetricsWithLabelName = "service"
|
||||
*keepMetricsWithLabelValue = "victoriametrics"
|
||||
|
||||
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries) {
|
||||
t.Helper()
|
||||
output := filter.Filter(input, []prompb.TimeSeries{})
|
||||
if len(output) != len(expectedOutput) {
|
||||
t.Fatalf("unexpected output length; got %d; want %d", len(output), len(expectedOutput))
|
||||
}
|
||||
if timeSeriessToString(output) != timeSeriessToString(expectedOutput) {
|
||||
t.Fatalf("unexpected output; got %s; want %s", timeSeriessToString(output), timeSeriessToString(expectedOutput))
|
||||
}
|
||||
}
|
||||
f([]prompb.TimeSeries{{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "up"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "node-exporter1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
}},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "up"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
},
|
||||
})
|
||||
*keepMetricsWithLabelName = ""
|
||||
*keepMetricsWithLabelValue = ""
|
||||
}
|
||||
|
||||
func TestMdxInstanceCleanup(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
_ = mdxInstanceEntryTTL.Set("10s")
|
||||
InitGlobalFilter()
|
||||
|
||||
// init instance list
|
||||
GlobalFilter.Filter([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "node-exporter1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_request_duration_seconds"},
|
||||
{Name: "instance", Value: "service1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent1:8429"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
}}, []prompb.TimeSeries{},
|
||||
)
|
||||
f := func(expectedInstanceMap map[string]int64) {
|
||||
t.Helper()
|
||||
if len(GlobalFilter.vmInstance) != len(expectedInstanceMap) {
|
||||
t.Fatalf("unexpected instance map length; got %d; want %d", len(GlobalFilter.vmInstance), len(expectedInstanceMap))
|
||||
}
|
||||
for k := range expectedInstanceMap {
|
||||
if GlobalFilter.vmInstance[k] == nil {
|
||||
t.Fatalf("missing instance in filter.vmInstance: %q", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
f(map[string]int64{
|
||||
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
|
||||
fmt.Sprintf("%q:%q", "test", "vmagent1:8429"): 0,
|
||||
})
|
||||
|
||||
// receive samples from victoria-metrics1:8428 after 9 seconds.
|
||||
time.Sleep(9 * time.Second)
|
||||
GlobalFilter.Filter([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
}}, []prompb.TimeSeries{},
|
||||
)
|
||||
|
||||
// no samples from vmagent1:8429 in the last 10 seconds, so it should be removed from the mdx instance list.
|
||||
time.Sleep(9 * time.Second)
|
||||
f(map[string]int64{
|
||||
fmt.Sprintf("%q:%q", "test", "victoria-metrics1:8428"): 0,
|
||||
})
|
||||
GlobalFilter.MustStop()
|
||||
})
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user