Compare commits

...

5 Commits

Author SHA1 Message Date
Jayice
f8d99d9289 polish documentation 2026-04-27 18:02:41 +08:00
Jayice
333a015be5 update CHANGELOG.md 2026-04-27 15:15:59 +08:00
JAYICE
b6196524ba Merge branch 'master' into issue-10600
Signed-off-by: JAYICE <1185430411@qq.com>
2026-04-27 15:13:02 +08:00
Jayice
e9f1bb911c add documentation 2026-04-27 15:10:17 +08:00
Jayice
12b79143dc implement mdx for remote write 2026-04-21 15:48:46 +08:00
4 changed files with 205 additions and 18 deletions

View File

@@ -12,6 +12,7 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/metrics"
@@ -102,6 +103,9 @@ var (
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
"By default, metadata sending is controlled by the global -enableMetadata flag")
enableMdx = flagutil.NewArrayBool("remoteWrite.mdx.enable", "Whether to only retain metrics from VictoriaMetrics services before sending them to the corresponding -remoteWrite.url. "+
"Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
)
var (
@@ -294,6 +298,10 @@ func initRemoteWriteCtxs(urls []string) {
rwctxConsistentHashGlobal = consistenthash.NewConsistentHash(consistentHashNodes, 0)
}
if slices.Contains(*enableMdx, true) {
mdx.InitGlobalVmInstanceFilter()
}
rwctxsGlobal = rwctxs
rwctxsGlobalIdx = rwctxIdx
}
@@ -356,6 +364,7 @@ func Stop() {
if sl := dailySeriesLimiter; sl != nil {
sl.MustStop()
}
mdx.GlobalVmInstanceFilter.MustStop()
}
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url
@@ -837,11 +846,14 @@ type remoteWriteCtx struct {
// otherwise by the global -enableMetadata flag.
enableMetadata bool
enableMdx bool
pss []*pendingSeries
pssNextIdx atomic.Uint64
rowsPushedAfterRelabel *metrics.Counter
rowsDroppedByRelabel *metrics.Counter
rowsDroppedByMdx *metrics.Counter
pushFailures *metrics.Counter
metadataDroppedOnPushFailure *metrics.Counter
@@ -928,16 +940,17 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
for i := range pss {
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
}
rwctx := &remoteWriteCtx{
idx: argIdx,
fq: fq,
c: c,
pss: pss,
enableMetadata: isMetadataEnabledForURL(argIdx),
enableMdx: enableMdx.GetOptionalArg(argIdx),
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
rowsDroppedByMdx: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
pushFailures: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_push_failures_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
metadataDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_metadata_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
@@ -973,6 +986,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.rowsPushedAfterRelabel = nil
rwctx.rowsDroppedByRelabel = nil
rwctx.rowsDroppedByMdx = nil
}
// TryPushTimeSeries sends tss series to the configured remote write endpoint
@@ -990,17 +1004,31 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
putRelabelCtx(rctx)
}()
if rwctx.enableMdx && mdx.GlobalVmInstanceFilter != nil {
rctx = getRelabelCtx()
// Make a copy of tss
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
rowsCountBeforeMdx := getRowsCount(tss)
tss = mdx.GlobalVmInstanceFilter.ApplyMdxFilter(tss)
rowsCountAfterMdx := getRowsCount(tss)
rwctx.rowsDroppedByMdx.Add(rowsCountBeforeMdx - rowsCountAfterMdx)
}
// Apply relabeling
rcs := allRelabelConfigs.Load()
pcs := rcs.perURL[rwctx.idx]
if pcs.Len() > 0 {
rctx = getRelabelCtx()
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
if rctx == nil {
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.url configs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
rctx = getRelabelCtx()
// Make a copy of tss before dropping aggregated series
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
}
rowsCountBeforeRelabel := getRowsCount(tss)
tss = rctx.applyRelabeling(tss, pcs)
rowsCountAfterRelabel := getRowsCount(tss)

View File

@@ -25,6 +25,7 @@ The sandbox cluster installation runs under the constant load generated by
See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
## tip
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): introduce `-remoteWrite.mdx.enable` and `-mdx.instanceEntryTTL` command-line flags to support mdx service in `vmagent`, it allows `vmagent` to send only metrics from VictoriaMetrics services to the corresponding `-remoteWrite.url`. See [monitoring-data-exchange](https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange) and [#10600](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600) for detail.
## [v1.141.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.141.0)

View File

@@ -268,6 +268,29 @@ for the collected samples. Examples:
```sh
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -streamAggr.dedupInterval=60s
```
### Monitoring Data eXchange
The MDX (Monitoring Data eXchange) is a monitoring of monitoring metrics collection and sharing service.
It aims to send only metrics from the VictoriaMetrics services to the corresponding `-remoteWrite.url`, discarding metrics from non-VictoriaMetrics services.
When enabling MDX for the `-remoteWrite.url`, `vmagent` will only forward metrics from the instances that emit `vm_app_version`, which is a metric that all VictoriaMetrics services will emit.
The number of dropped rows from non-VictoriaMetrics services is exposed as `vmagent_remotewrite_mdx_rows_dropped_total`.
`vmagent` will maintain the information for the discovered VictoriaMetrics instances and expose the number of these instances via `vmagent_mdx_tracked_vm_instances` metric.
To prevent permanently offline VictoriaMetrics instances from continuously consuming `vmagent`'s memory, you also need to set `-mdx.instanceEntryTTL` to indicate that if no metrics are received from a VictoriaMetrics instance for the configured period,
then the information of this instance should be cleaned up from the memory. The value should be several times the scrape interval to prevent instances from being mistakenly cleaned up due to occasional network latency.
To enable MDX, set `-remoteWrite.mdx.enable` for the target URL and `-mdx.instanceEntryTTL`:
```sh
./vmagent \
-remoteWrite.url=http://service-to-keep-all-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=false \
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=true \
-mdx.instanceEntryTTL=60s
```
### Life of a sample
@@ -285,18 +308,20 @@ flowchart TB
F --> G[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#replication-and-high-availability">replicate</a> to each <b>-remoteWrite.url</b><br/>or <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages">shard</a> if <b>-remoteWrite.shardByURL</b> is set]
%% Left branch
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange/">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
H2 --> H3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
H3 --> H4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
%% Right branch
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
R2 --> R3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
R3 --> R4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
```
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:

133
lib/mdx/filter.go Normal file
View File

@@ -0,0 +1,133 @@
package mdx
import (
"fmt"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/metrics"
)
var (
mdxInstanceEntryTTL = flagutil.NewExtendedDuration("mdx.instanceEntryTTL", "0", "After not receiving metrics for the VictoriaMetrics instance for the configured time, remove this instance from the MDX instance list."+
"It should be several times the scrape interval for VictoriaMetrics instances. The cleanup mechanism helps release memory after a VictoriaMetrics instance is permanently taken offline, preventing the MDX instance list from growing indefinitely."+
"It must be explicitly set when -remoteWrite.mdx.enable is set and requires explicit unit suffixes (s, m, h, d, w, y). Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
)
type VmInstanceFilter struct {
mu sync.RWMutex
wg sync.WaitGroup
stopCh chan struct{}
vmInstance map[string]*atomic.Int64
mdxTrackedVmInstances *metrics.Gauge
}
var GlobalVmInstanceFilter *VmInstanceFilter
func InitGlobalVmInstanceFilter() {
if mdxInstanceEntryTTL.Milliseconds() == 0 {
logger.Panicf("-mdx.instanceEntryTTL must be explicitly set when -remoteWrite.mdx.enable is set.")
}
GlobalVmInstanceFilter = &VmInstanceFilter{
vmInstance: make(map[string]*atomic.Int64),
stopCh: make(chan struct{}),
}
GlobalVmInstanceFilter.mdxTrackedVmInstances = metrics.NewGauge("vmagent_mdx_tracked_vm_instances", func() float64 {
GlobalVmInstanceFilter.mu.RLock()
n := len(GlobalVmInstanceFilter.vmInstance)
GlobalVmInstanceFilter.mu.RUnlock()
return float64(n)
})
GlobalVmInstanceFilter.wg.Go(GlobalVmInstanceFilter.cleanStale)
}
func (filter *VmInstanceFilter) cleanStale() {
if mdxInstanceEntryTTL.Milliseconds() == 0 {
return
}
ticker := time.NewTicker(5 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
filter.mu.Lock()
currTs := time.Now().Unix()
dst := make(map[string]*atomic.Int64, len(filter.vmInstance))
for k, v := range filter.vmInstance {
if currTs-v.Load() < mdxInstanceEntryTTL.Duration().Milliseconds()/1000 {
dst[k] = v
}
}
if len(dst) != len(filter.vmInstance) {
filter.vmInstance = dst
}
filter.mu.Unlock()
case <-filter.stopCh:
return
}
}
}
func (filter *VmInstanceFilter) MustStop() {
if filter == nil {
return
}
close(filter.stopCh)
filter.wg.Wait()
}
func (filter *VmInstanceFilter) ApplyMdxFilter(tss []prompb.TimeSeries) []prompb.TimeSeries {
tssDst := tss[:0]
for _, ts := range tss {
isVmInstance := false
var instance string
var job string
for _, label := range ts.Labels {
if label.Name == "__name__" && label.Value == "vm_app_version" {
isVmInstance = true
}
if label.Name == "instance" {
instance = label.Value
}
if label.Name == "job" {
job = label.Value
}
}
identicalKey := fmt.Sprintf("%s:%s", job, instance)
currTs := time.Now().Unix()
//fast path
filter.mu.RLock()
ptr, ok := filter.vmInstance[identicalKey]
filter.mu.RUnlock()
if ok {
ptr.Store(currTs)
tssDst = append(tssDst, ts)
continue
}
if !isVmInstance {
continue
}
// slow path
tssDst = append(tssDst, ts)
filter.mu.Lock()
if ptr, ok = filter.vmInstance[identicalKey]; ok {
ptr.Store(currTs)
} else {
v := atomic.Int64{}
v.Store(currTs)
filter.vmInstance[identicalKey] = &v
}
filter.mu.Unlock()
}
return tssDst
}