mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-18 00:03:47 +03:00
Compare commits
26 Commits
vmestimato
...
issue-1060
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
7744c6fcee | ||
|
|
c193e8cdc5 | ||
|
|
c67adbd9c9 | ||
|
|
bcd1dc8c8b | ||
|
|
f3326bf082 | ||
|
|
f3f7602482 | ||
|
|
7d558511d0 | ||
|
|
a2bb3f70a5 | ||
|
|
2c496f4a38 | ||
|
|
ef0ae0fb9a | ||
|
|
4cbedc6b1b | ||
|
|
4f5cd15163 | ||
|
|
5b161ce283 | ||
|
|
43773e1d5c | ||
|
|
517c17b744 | ||
|
|
1c2622d8ae | ||
|
|
9a836dac59 | ||
|
|
799ecb0a08 | ||
|
|
c88dc19052 | ||
|
|
919049f9e2 | ||
|
|
24efe47c6a | ||
|
|
f8d99d9289 | ||
|
|
333a015be5 | ||
|
|
b6196524ba | ||
|
|
e9f1bb911c | ||
|
|
12b79143dc |
@@ -12,6 +12,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
@@ -103,6 +104,9 @@ var (
|
||||
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
|
||||
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
|
||||
"By default, metadata sending is controlled by the global -enableMetadata flag")
|
||||
|
||||
enableMdx = flagutil.NewArrayBool("remoteWrite.mdx.enable", "Whether to only retain metrics from VictoriaMetrics services before sending them to the corresponding -remoteWrite.url. "+
|
||||
"Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -304,6 +308,10 @@ func initRemoteWriteCtxs(urls []string) {
|
||||
}
|
||||
fs.RegisterPathFsMetrics(*tmpDataPath)
|
||||
|
||||
if slices.Contains(*enableMdx, true) && *shardByURL {
|
||||
logger.Fatalf("-remoteWrite.mdx.enable and -remoteWrite.shardByURL cannot be set to true simultaneously.")
|
||||
}
|
||||
|
||||
if *shardByURL {
|
||||
consistentHashNodes := make([]string, 0, len(urls))
|
||||
for i, url := range urls {
|
||||
@@ -859,6 +867,7 @@ type remoteWriteCtx struct {
|
||||
|
||||
sas atomic.Pointer[streamaggr.Aggregators]
|
||||
deduplicator *streamaggr.Deduplicator
|
||||
mdxFilter *mdx.Filter
|
||||
|
||||
streamAggrKeepInput bool
|
||||
streamAggrDropInput bool
|
||||
@@ -873,6 +882,7 @@ type remoteWriteCtx struct {
|
||||
|
||||
rowsPushedAfterRelabel *metrics.Counter
|
||||
rowsDroppedByRelabel *metrics.Counter
|
||||
mdxRowsPreserved *metrics.Counter
|
||||
|
||||
pushFailures *metrics.Counter
|
||||
metadataDroppedOnPushFailure *metrics.Counter
|
||||
@@ -959,7 +969,6 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
for i := range pss {
|
||||
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
|
||||
}
|
||||
|
||||
rwctx := &remoteWriteCtx{
|
||||
idx: argIdx,
|
||||
fq: fq,
|
||||
@@ -976,6 +985,16 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
}
|
||||
rwctx.initStreamAggrConfig()
|
||||
|
||||
if enableMdx.GetOptionalArg(argIdx) {
|
||||
mdxFilter := mdx.NewFilter()
|
||||
rwctx.mdxFilter = mdxFilter
|
||||
rwctx.mdxRowsPreserved = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_preserved_total{path=%q,url=%q}`, queuePath, sanitizedURL))
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_mdx_tracked_vm_instances{path=%q,url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||
return float64(mdxFilter.VmInstancesCount())
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
return rwctx
|
||||
}
|
||||
|
||||
@@ -989,6 +1008,11 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
rwctx.deduplicator.MustStop()
|
||||
rwctx.deduplicator = nil
|
||||
}
|
||||
if rwctx.mdxFilter != nil {
|
||||
rwctx.mdxFilter.MustStop()
|
||||
rwctx.mdxFilter = nil
|
||||
rwctx.mdxRowsPreserved = nil
|
||||
}
|
||||
|
||||
for _, ps := range rwctx.pss {
|
||||
ps.MustStop()
|
||||
@@ -1004,6 +1028,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
|
||||
rwctx.rowsPushedAfterRelabel = nil
|
||||
rwctx.rowsDroppedByRelabel = nil
|
||||
|
||||
}
|
||||
|
||||
// TryPushTimeSeries sends tss series to the configured remote write endpoint
|
||||
@@ -1021,6 +1046,20 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
putRelabelCtx(rctx)
|
||||
}()
|
||||
|
||||
if rwctx.mdxFilter != nil {
|
||||
tssResP := tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tssRes := rwctx.mdxFilter.Filter(tss, *tssResP)
|
||||
defer func() {
|
||||
*tssResP = prompb.ResetTimeSeries(tssRes)
|
||||
tssPool.Put(tssResP)
|
||||
}()
|
||||
|
||||
if len(tssRes) == 0 {
|
||||
return true
|
||||
}
|
||||
tss = tssRes
|
||||
}
|
||||
|
||||
// Apply relabeling
|
||||
rcs := allRelabelConfigs.Load()
|
||||
pcs := rcs.perURL[rwctx.idx]
|
||||
|
||||
@@ -30,6 +30,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add support for [Monitoring Data eXchange (MDX)](https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange): the ability to route only metrics from VictoriaMetrics services to a specific `-remoteWrite.url`. MDX is useful for building monitoring-of-monitoring where one remote storage should receive the full metric stream and another should receive only VictoriaMetrics metrics. Enable per destination with `-remoteWrite.mdx.enable=true`. See [#10600](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600).
|
||||
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
|
||||
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/),[vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/),[vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): fix rare unbounded shutdown delay when config reload takes longer than `-configCheckInterval`. See [#11107](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11107). Thanks to @PleasingFungus for contribution.
|
||||
@@ -38,6 +39,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8)
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent more cases of panic during directory deletion on `NFS`-based mounts. See [#11060](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11060).
|
||||
|
||||
|
||||
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
|
||||
|
||||
Released at 2026-06-08
|
||||
|
||||
@@ -268,6 +268,39 @@ for the collected samples. Examples:
|
||||
```sh
|
||||
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -streamAggr.dedupInterval=60s
|
||||
```
|
||||
|
||||
### Monitoring Data eXchange
|
||||
|
||||
The Monitoring Data eXchange (MDX) feature allows `vmagent` to forward only VictoriaMetrics metrics to selected `-remoteWrite.url` destinations while dropping metrics from non-VictoriaMetrics services.
|
||||
|
||||
To enable MDX, set `-remoteWrite.mdx.enable=true` for the target URL and `-remoteWrite.mdx.enable=false` for other URLs:
|
||||
|
||||
```sh
|
||||
./vmagent \
|
||||
-remoteWrite.url=http://service-to-keep-all-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=false \
|
||||
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=true
|
||||
```
|
||||
When MDX is enabled for a `-remoteWrite.url`, `vmagent` forwards only metrics that:
|
||||
- come from the target that exposes the `vm_app_version` metric (emitted by all VictoriaMetrics components)
|
||||
- contain the `victoriametrics_app=true` label, which will be added automatically to the metrics if the instance was deployed via [VictoriaMetrics Operator](https://docs.victoriametrics.com/operator/).
|
||||
|
||||
`victoriametrics_app=true` label will be added to all metrics that are preserved by MDX if it's absent.
|
||||
|
||||
- contain the label specified via `-mdx.label`.
|
||||
|
||||
```sh
|
||||
./vmagent \
|
||||
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=true \
|
||||
-mdx.label="service=victoriametrics"
|
||||
```
|
||||
In this configuration, metrics with the label `service=victoriametrics` are preserved even if their scrape targets do not expose `vm_app_version` metric.
|
||||
|
||||
The number of VictoriaMetrics metrics preserved by MDX is exposed as `vmagent_remotewrite_mdx_rows_preserved_total`.
|
||||
|
||||
The scope of MDX is at the per-url level, so it works after global level mechanisms, such as stream aggregation, relabeling, complexity limiter, and cardinality limiter. See [Life of a sample](https://docs.victoriametrics.com/victoriametrics/vmagent/#life-of-a-sample).
|
||||
|
||||
### Life of a sample
|
||||
|
||||
@@ -285,18 +318,20 @@ flowchart TB
|
||||
F --> G[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#replication-and-high-availability">replicate</a> to each <b>-remoteWrite.url</b><br/>or <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages">shard</a> if <b>-remoteWrite.shardByURL</b> is set]
|
||||
|
||||
%% Left branch
|
||||
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
|
||||
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange/">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
H2 --> H3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H3 --> H4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
|
||||
|
||||
%% Right branch
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R2 --> R3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R3 --> R4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
|
||||
```
|
||||
|
||||
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:
|
||||
|
||||
183
lib/mdx/filter.go
Normal file
183
lib/mdx/filter.go
Normal file
@@ -0,0 +1,183 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
var (
|
||||
vmLabel = flag.String("mdx.label", "", "Optional label in the form 'name=value' used to identify VictoriaMetrics metrics for MDX. Metrics containing the specified label are forwarded to `-remoteWrite.url` endpoints configured with `-remoteWrite.mdx.enable=true`.")
|
||||
|
||||
vmAppLabelName = "victoriametrics_app"
|
||||
)
|
||||
|
||||
// Filter manages the list of VictoriaMetrics instances discovered from previous data flow, and uses it to filter out metrics that are not from VictoriaMetrics instances.
|
||||
type Filter struct {
|
||||
mu sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
vmInstance map[string]*atomic.Int64
|
||||
filterByLabelName string
|
||||
filterByLabelValue string
|
||||
}
|
||||
|
||||
func NewFilter() *Filter {
|
||||
filter := &Filter{
|
||||
vmInstance: make(map[string]*atomic.Int64),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
|
||||
if len(*vmLabel) != 0 {
|
||||
n := strings.IndexByte(*vmLabel, '=')
|
||||
if n < 0 {
|
||||
logger.Fatalf("missing '=' in `-mdx.label`. It must contain label in the form `name=value`; got %q", *vmLabel)
|
||||
}
|
||||
filter.filterByLabelName = (*vmLabel)[:n]
|
||||
filter.filterByLabelValue = (*vmLabel)[n+1:]
|
||||
}
|
||||
|
||||
filter.wg.Go(filter.cleanStale)
|
||||
return filter
|
||||
}
|
||||
|
||||
func (filter *Filter) VmInstancesCount() int {
|
||||
if filter == nil {
|
||||
return 0
|
||||
}
|
||||
filter.mu.RLock()
|
||||
defer filter.mu.RUnlock()
|
||||
return len(filter.vmInstance)
|
||||
|
||||
}
|
||||
|
||||
func (filter *Filter) cleanStale() {
|
||||
entryTTL := time.Hour * 1
|
||||
ttlSec := int64(entryTTL.Seconds())
|
||||
ticker := time.NewTicker(time.Minute)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
filter.mu.Lock()
|
||||
currTs := time.Now().Unix()
|
||||
|
||||
dst := make(map[string]*atomic.Int64, len(filter.vmInstance))
|
||||
for k, v := range filter.vmInstance {
|
||||
if currTs-v.Load() < ttlSec {
|
||||
dst[k] = v
|
||||
}
|
||||
}
|
||||
if len(dst) != len(filter.vmInstance) {
|
||||
filter.vmInstance = dst
|
||||
}
|
||||
filter.mu.Unlock()
|
||||
case <-filter.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (filter *Filter) MustStop() {
|
||||
if filter == nil {
|
||||
return
|
||||
}
|
||||
close(filter.stopCh)
|
||||
filter.wg.Wait()
|
||||
}
|
||||
|
||||
func (filter *Filter) Filter(tss []prompb.TimeSeries, resTss []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
currTs := time.Now().Unix()
|
||||
var identicalKey []byte
|
||||
|
||||
maybeAddVmAppLabel := func(idx int, labels []prompb.Label) []prompb.Label {
|
||||
for j := idx + 1; j < len(labels); j++ {
|
||||
if labels[j].Name == vmAppLabelName && labels[j].Value == "true" {
|
||||
return labels
|
||||
}
|
||||
}
|
||||
|
||||
copyLabels := make([]prompb.Label, len(labels)+1)
|
||||
copy(copyLabels, labels)
|
||||
|
||||
copyLabels[len(copyLabels)-1] = prompb.Label{Name: vmAppLabelName, Value: "true"}
|
||||
return copyLabels
|
||||
}
|
||||
|
||||
nextTss:
|
||||
for _, ts := range tss {
|
||||
var hasVersionLabel, triedJobInstance bool
|
||||
var job, instance string
|
||||
for i, label := range ts.Labels {
|
||||
if label.Name == vmAppLabelName && label.Value == "true" {
|
||||
resTss = append(resTss, ts)
|
||||
continue nextTss
|
||||
}
|
||||
if filter.filterByLabelName != "" && label.Name == filter.filterByLabelName && label.Value == filter.filterByLabelValue {
|
||||
ts.Labels = maybeAddVmAppLabel(i, ts.Labels)
|
||||
resTss = append(resTss, ts)
|
||||
continue nextTss
|
||||
}
|
||||
|
||||
if label.Name == "__name__" && label.Value == "vm_app_version" {
|
||||
hasVersionLabel = true
|
||||
}
|
||||
if instance == "" && label.Name == "instance" {
|
||||
if label.Value == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
instance = label.Value
|
||||
}
|
||||
if job == "" && label.Name == "job" {
|
||||
if label.Value == "" {
|
||||
continue
|
||||
}
|
||||
|
||||
job = label.Value
|
||||
}
|
||||
if !triedJobInstance && job != "" && instance != "" {
|
||||
identicalKey = identicalKey[:0]
|
||||
identicalKey = strconv.AppendQuote(identicalKey, job)
|
||||
identicalKey = append(identicalKey, ':')
|
||||
identicalKey = strconv.AppendQuote(identicalKey, instance)
|
||||
filter.mu.RLock()
|
||||
ptr, found := filter.vmInstance[bytesutil.ToUnsafeString(identicalKey)]
|
||||
filter.mu.RUnlock()
|
||||
if found {
|
||||
ptr.Store(currTs)
|
||||
ts.Labels = maybeAddVmAppLabel(i, ts.Labels)
|
||||
resTss = append(resTss, ts)
|
||||
continue nextTss
|
||||
}
|
||||
triedJobInstance = true
|
||||
}
|
||||
|
||||
if hasVersionLabel && job != "" && instance != "" {
|
||||
identicalKey = identicalKey[:0]
|
||||
identicalKey = strconv.AppendQuote(identicalKey, job)
|
||||
identicalKey = append(identicalKey, ':')
|
||||
identicalKey = strconv.AppendQuote(identicalKey, instance)
|
||||
|
||||
v := &atomic.Int64{}
|
||||
v.Store(currTs)
|
||||
|
||||
filter.mu.Lock()
|
||||
filter.vmInstance[string(identicalKey)] = v
|
||||
filter.mu.Unlock()
|
||||
ts.Labels = maybeAddVmAppLabel(i, ts.Labels)
|
||||
resTss = append(resTss, ts)
|
||||
continue nextTss
|
||||
}
|
||||
}
|
||||
}
|
||||
return resTss
|
||||
}
|
||||
356
lib/mdx/filter_test.go
Normal file
356
lib/mdx/filter_test.go
Normal file
@@ -0,0 +1,356 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sort"
|
||||
"strings"
|
||||
"testing"
|
||||
"testing/synctest"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
|
||||
)
|
||||
|
||||
func timeSeriessToString(tss []prompb.TimeSeries) string {
|
||||
a := make([]string, len(tss))
|
||||
for i, ts := range tss {
|
||||
a[i] = timeSeriesToString(ts)
|
||||
}
|
||||
sort.Strings(a)
|
||||
return strings.Join(a, "")
|
||||
}
|
||||
|
||||
func timeSeriesToString(ts prompb.TimeSeries) string {
|
||||
labelsString := promrelabel.LabelsToString(ts.Labels)
|
||||
|
||||
return fmt.Sprintf("%s\n", labelsString)
|
||||
}
|
||||
|
||||
func TestMdxInstanceFilter(t *testing.T) {
|
||||
originalVmLabel := *vmLabel
|
||||
*vmLabel = "service=victoriametrics"
|
||||
filter := NewFilter()
|
||||
defer filter.MustStop()
|
||||
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries, expectedInstanceMap map[string]int64) {
|
||||
t.Helper()
|
||||
output := filter.Filter(input, nil)
|
||||
outputString := timeSeriessToString(output)
|
||||
expectedOutputString := timeSeriessToString(expectedOutput)
|
||||
if outputString != expectedOutputString {
|
||||
t.Fatalf("unexpected output; got %s; want %s", outputString, expectedOutputString)
|
||||
}
|
||||
if filter.VmInstancesCount() != len(expectedInstanceMap) {
|
||||
t.Fatalf("unexpected instance map length; got %d; want %d", len(filter.vmInstance), len(expectedInstanceMap))
|
||||
}
|
||||
for k := range expectedInstanceMap {
|
||||
if filter.vmInstance[k] == nil {
|
||||
t.Fatalf("missing instance in filter.vmInstance: %q", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
// the first call
|
||||
f([]prompb.TimeSeries{
|
||||
// 1. metrics with vm_app_version and different order of labels.
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics2:8428"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics3:8428"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
},
|
||||
},
|
||||
// 2.
|
||||
// metrics without vm_app_version but with service=victoriametrics that is specified in `-vm.label`.
|
||||
// it will be preserved, but won't be registered in instance map in MDX
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
},
|
||||
|
||||
// 3. metrics with vm_app_version and service=victoriametrics should be preserved.
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
},
|
||||
},
|
||||
// 4. metrics without vm_app_version and `service=victoriametrics` but with `victoriametrics_app=true`, which should be preserved.
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics6:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
|
||||
// 5. metrics without vm_app_version and service=victoriametrics and `victoriametrics_app=true`, which should be filtered out.
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "node-exporter1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_request_duration_seconds"},
|
||||
{Name: "instance", Value: "service1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
|
||||
// 6. metrics with vm_app_version but job or instance is empty (or missing), they should be dropped.
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: ""},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent2:8429"},
|
||||
{Name: "job", Value: ""},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent2:8429"},
|
||||
},
|
||||
},
|
||||
},
|
||||
// `victoriametrics_app=true` should be added to all preserved metrics if absent.
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics2:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics3:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics6:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
},
|
||||
// only instances that are discovered via `vm_app_version` will be registered in instance map in MDX.
|
||||
map[string]int64{
|
||||
"\"test\":\"victoria-metrics1:8428\"": 0,
|
||||
"\"test\":\"victoria-metrics2:8428\"": 0,
|
||||
"\"test\":\"victoria-metrics3:8428\"": 0,
|
||||
})
|
||||
|
||||
// the second call
|
||||
f([]prompb.TimeSeries{
|
||||
// 1. metrics without vm_app_version, but the instances were already registered in the previous call, so it will be preserved.
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_rows_inserted_total"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
|
||||
{Name: "instance", Value: "victoria-metrics2:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
// 2. metrics without vm_app_version, `service=victoriametrics` and `victoriametrics_app=true`, and the instance wasn't already registered in the previous call, so it will be dropped.
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
|
||||
{Name: "instance", Value: "victoria-metrics7:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
// 3. metrics with service=victoriametrics.
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_rows_inserted_total"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
|
||||
{Name: "instance", Value: "victoria-metrics2:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
},
|
||||
// only instances that are discovered via `vm_app_version` will be registered in instance map in MDX.
|
||||
map[string]int64{
|
||||
"\"test\":\"victoria-metrics1:8428\"": 0,
|
||||
"\"test\":\"victoria-metrics2:8428\"": 0,
|
||||
"\"test\":\"victoria-metrics3:8428\"": 0,
|
||||
})
|
||||
|
||||
*vmLabel = originalVmLabel
|
||||
}
|
||||
|
||||
func TestMdxInstanceCleanup(t *testing.T) {
|
||||
t.Helper()
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
filter := NewFilter()
|
||||
defer filter.MustStop()
|
||||
filter.Filter([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "node-exporter1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_request_duration_seconds"},
|
||||
{Name: "instance", Value: "service1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent1:8429"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
}}, []prompb.TimeSeries{},
|
||||
)
|
||||
f := func(expectedInstanceMap map[string]int64) {
|
||||
t.Helper()
|
||||
if filter.VmInstancesCount() != len(expectedInstanceMap) {
|
||||
t.Fatalf("unexpected instance map length; got %d; want %d", len(filter.vmInstance), len(expectedInstanceMap))
|
||||
}
|
||||
for k := range expectedInstanceMap {
|
||||
if filter.vmInstance[k] == nil {
|
||||
t.Fatalf("missing instance in filter.vmInstance: %q", k)
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
time.Sleep(59 * time.Minute)
|
||||
// the entries should not be cleaned.
|
||||
f(map[string]int64{
|
||||
"\"test\":\"victoria-metrics1:8428\"": 0,
|
||||
"\"test\":\"vmagent1:8429\"": 0,
|
||||
})
|
||||
|
||||
// receive samples from victoria-metrics1:8428 after 59 minutes.
|
||||
// so the entry will be refreshed.
|
||||
filter.Filter([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
}}, []prompb.TimeSeries{},
|
||||
)
|
||||
|
||||
time.Sleep(2 * time.Minute)
|
||||
|
||||
// no samples from vmagent1:8429 in the last hour, so it should be removed from the mdx instance list.
|
||||
f(map[string]int64{
|
||||
"\"test\":\"victoria-metrics1:8428\"": 0,
|
||||
})
|
||||
})
|
||||
|
||||
}
|
||||
Reference in New Issue
Block a user