mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-19 17:56:32 +03:00
Compare commits
5 Commits
master
...
issue-1060
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
f8d99d9289 | ||
|
|
333a015be5 | ||
|
|
b6196524ba | ||
|
|
e9f1bb911c | ||
|
|
12b79143dc |
@@ -12,6 +12,7 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
@@ -102,6 +103,9 @@ var (
|
||||
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
|
||||
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
|
||||
"By default, metadata sending is controlled by the global -enableMetadata flag")
|
||||
|
||||
enableMdx = flagutil.NewArrayBool("remoteWrite.mdx.enable", "Whether to only retain metrics from VictoriaMetrics services before sending them to the corresponding -remoteWrite.url. "+
|
||||
"Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -294,6 +298,10 @@ func initRemoteWriteCtxs(urls []string) {
|
||||
rwctxConsistentHashGlobal = consistenthash.NewConsistentHash(consistentHashNodes, 0)
|
||||
}
|
||||
|
||||
if slices.Contains(*enableMdx, true) {
|
||||
mdx.InitGlobalVmInstanceFilter()
|
||||
}
|
||||
|
||||
rwctxsGlobal = rwctxs
|
||||
rwctxsGlobalIdx = rwctxIdx
|
||||
}
|
||||
@@ -356,6 +364,7 @@ func Stop() {
|
||||
if sl := dailySeriesLimiter; sl != nil {
|
||||
sl.MustStop()
|
||||
}
|
||||
mdx.GlobalVmInstanceFilter.MustStop()
|
||||
}
|
||||
|
||||
// PushDropSamplesOnFailure pushes wr to the configured remote storage systems set via -remoteWrite.url
|
||||
@@ -837,11 +846,14 @@ type remoteWriteCtx struct {
|
||||
// otherwise by the global -enableMetadata flag.
|
||||
enableMetadata bool
|
||||
|
||||
enableMdx bool
|
||||
|
||||
pss []*pendingSeries
|
||||
pssNextIdx atomic.Uint64
|
||||
|
||||
rowsPushedAfterRelabel *metrics.Counter
|
||||
rowsDroppedByRelabel *metrics.Counter
|
||||
rowsDroppedByMdx *metrics.Counter
|
||||
|
||||
pushFailures *metrics.Counter
|
||||
metadataDroppedOnPushFailure *metrics.Counter
|
||||
@@ -928,16 +940,17 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
for i := range pss {
|
||||
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
|
||||
}
|
||||
|
||||
rwctx := &remoteWriteCtx{
|
||||
idx: argIdx,
|
||||
fq: fq,
|
||||
c: c,
|
||||
pss: pss,
|
||||
enableMetadata: isMetadataEnabledForURL(argIdx),
|
||||
enableMdx: enableMdx.GetOptionalArg(argIdx),
|
||||
|
||||
rowsPushedAfterRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_rows_pushed_after_relabel_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
rowsDroppedByRelabel: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_relabel_metrics_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
rowsDroppedByMdx: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
|
||||
pushFailures: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_push_failures_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
metadataDroppedOnPushFailure: metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_metadata_dropped_total{path=%q,url=%q}`, queuePath, sanitizedURL)),
|
||||
@@ -973,6 +986,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
|
||||
rwctx.rowsPushedAfterRelabel = nil
|
||||
rwctx.rowsDroppedByRelabel = nil
|
||||
rwctx.rowsDroppedByMdx = nil
|
||||
}
|
||||
|
||||
// TryPushTimeSeries sends tss series to the configured remote write endpoint
|
||||
@@ -990,17 +1004,31 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
putRelabelCtx(rctx)
|
||||
}()
|
||||
|
||||
if rwctx.enableMdx && mdx.GlobalVmInstanceFilter != nil {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
rowsCountBeforeMdx := getRowsCount(tss)
|
||||
tss = mdx.GlobalVmInstanceFilter.ApplyMdxFilter(tss)
|
||||
rowsCountAfterMdx := getRowsCount(tss)
|
||||
rwctx.rowsDroppedByMdx.Add(rowsCountBeforeMdx - rowsCountAfterMdx)
|
||||
}
|
||||
|
||||
// Apply relabeling
|
||||
rcs := allRelabelConfigs.Load()
|
||||
pcs := rcs.perURL[rwctx.idx]
|
||||
if pcs.Len() > 0 {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before applying relabeling in order to prevent
|
||||
// from affecting time series for other remoteWrite.url configs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
|
||||
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
if rctx == nil {
|
||||
// Make a copy of tss before applying relabeling in order to prevent
|
||||
// from affecting time series for other remoteWrite.url configs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
|
||||
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before dropping aggregated series
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
}
|
||||
rowsCountBeforeRelabel := getRowsCount(tss)
|
||||
tss = rctx.applyRelabeling(tss, pcs)
|
||||
rowsCountAfterRelabel := getRowsCount(tss)
|
||||
|
||||
@@ -25,6 +25,7 @@ The sandbox cluster installation runs under the constant load generated by
|
||||
See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
|
||||
|
||||
## tip
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): introduce `-remoteWrite.mdx.enable` and `-mdx.instanceEntryTTL` command-line flags to support mdx service in `vmagent`, it allows `vmagent` to send only metrics from VictoriaMetrics services to the corresponding `-remoteWrite.url`. See [monitoring-data-exchange](https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange) and [#10600](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600) for detail.
|
||||
|
||||
## [v1.141.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.141.0)
|
||||
|
||||
|
||||
@@ -268,6 +268,29 @@ for the collected samples. Examples:
|
||||
```sh
|
||||
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -streamAggr.dedupInterval=60s
|
||||
```
|
||||
|
||||
### Monitoring Data eXchange
|
||||
|
||||
The MDX (Monitoring Data eXchange) is a monitoring of monitoring metrics collection and sharing service.
|
||||
It aims to send only metrics from the VictoriaMetrics services to the corresponding `-remoteWrite.url`, discarding metrics from non-VictoriaMetrics services.
|
||||
|
||||
When enabling MDX for the `-remoteWrite.url`, `vmagent` will only forward metrics from the instances that emit `vm_app_version`, which is a metric that all VictoriaMetrics services will emit.
|
||||
The number of dropped rows from non-VictoriaMetrics services is exposed as `vmagent_remotewrite_mdx_rows_dropped_total`.
|
||||
|
||||
`vmagent` will maintain the information for the discovered VictoriaMetrics instances and expose the number of these instances via `vmagent_mdx_tracked_vm_instances` metric.
|
||||
To prevent permanently offline VictoriaMetrics instances from continuously consuming `vmagent`'s memory, you also need to set `-mdx.instanceEntryTTL` to indicate that if no metrics are received from a VictoriaMetrics instance for the configured period,
|
||||
then the information of this instance should be cleaned up from the memory. The value should be several times the scrape interval to prevent instances from being mistakenly cleaned up due to occasional network latency.
|
||||
|
||||
To enable MDX, set `-remoteWrite.mdx.enable` for the target URL and `-mdx.instanceEntryTTL`:
|
||||
|
||||
```sh
|
||||
./vmagent \
|
||||
-remoteWrite.url=http://service-to-keep-all-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=false \
|
||||
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=true \
|
||||
-mdx.instanceEntryTTL=60s
|
||||
```
|
||||
|
||||
### Life of a sample
|
||||
|
||||
@@ -285,18 +308,20 @@ flowchart TB
|
||||
F --> G[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#replication-and-high-availability">replicate</a> to each <b>-remoteWrite.url</b><br/>or <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages">shard</a> if <b>-remoteWrite.shardByURL</b> is set]
|
||||
|
||||
%% Left branch
|
||||
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
|
||||
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange/">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
H2 --> H3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H3 --> H4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
|
||||
|
||||
%% Right branch
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R2 --> R3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R3 --> R4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
|
||||
```
|
||||
|
||||
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:
|
||||
|
||||
133
lib/mdx/filter.go
Normal file
133
lib/mdx/filter.go
Normal file
@@ -0,0 +1,133 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
)
|
||||
|
||||
var (
|
||||
mdxInstanceEntryTTL = flagutil.NewExtendedDuration("mdx.instanceEntryTTL", "0", "After not receiving metrics for the VictoriaMetrics instance for the configured time, remove this instance from the MDX instance list."+
|
||||
"It should be several times the scrape interval for VictoriaMetrics instances. The cleanup mechanism helps release memory after a VictoriaMetrics instance is permanently taken offline, preventing the MDX instance list from growing indefinitely."+
|
||||
"It must be explicitly set when -remoteWrite.mdx.enable is set and requires explicit unit suffixes (s, m, h, d, w, y). Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
|
||||
)
|
||||
|
||||
type VmInstanceFilter struct {
|
||||
mu sync.RWMutex
|
||||
wg sync.WaitGroup
|
||||
stopCh chan struct{}
|
||||
vmInstance map[string]*atomic.Int64
|
||||
mdxTrackedVmInstances *metrics.Gauge
|
||||
}
|
||||
|
||||
var GlobalVmInstanceFilter *VmInstanceFilter
|
||||
|
||||
func InitGlobalVmInstanceFilter() {
|
||||
if mdxInstanceEntryTTL.Milliseconds() == 0 {
|
||||
logger.Panicf("-mdx.instanceEntryTTL must be explicitly set when -remoteWrite.mdx.enable is set.")
|
||||
}
|
||||
GlobalVmInstanceFilter = &VmInstanceFilter{
|
||||
vmInstance: make(map[string]*atomic.Int64),
|
||||
stopCh: make(chan struct{}),
|
||||
}
|
||||
GlobalVmInstanceFilter.mdxTrackedVmInstances = metrics.NewGauge("vmagent_mdx_tracked_vm_instances", func() float64 {
|
||||
GlobalVmInstanceFilter.mu.RLock()
|
||||
n := len(GlobalVmInstanceFilter.vmInstance)
|
||||
GlobalVmInstanceFilter.mu.RUnlock()
|
||||
return float64(n)
|
||||
})
|
||||
|
||||
GlobalVmInstanceFilter.wg.Go(GlobalVmInstanceFilter.cleanStale)
|
||||
}
|
||||
|
||||
func (filter *VmInstanceFilter) cleanStale() {
|
||||
if mdxInstanceEntryTTL.Milliseconds() == 0 {
|
||||
return
|
||||
}
|
||||
ticker := time.NewTicker(5 * time.Second)
|
||||
defer ticker.Stop()
|
||||
|
||||
for {
|
||||
select {
|
||||
case <-ticker.C:
|
||||
filter.mu.Lock()
|
||||
currTs := time.Now().Unix()
|
||||
|
||||
dst := make(map[string]*atomic.Int64, len(filter.vmInstance))
|
||||
for k, v := range filter.vmInstance {
|
||||
if currTs-v.Load() < mdxInstanceEntryTTL.Duration().Milliseconds()/1000 {
|
||||
dst[k] = v
|
||||
}
|
||||
}
|
||||
if len(dst) != len(filter.vmInstance) {
|
||||
filter.vmInstance = dst
|
||||
}
|
||||
filter.mu.Unlock()
|
||||
case <-filter.stopCh:
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (filter *VmInstanceFilter) MustStop() {
|
||||
if filter == nil {
|
||||
return
|
||||
}
|
||||
close(filter.stopCh)
|
||||
filter.wg.Wait()
|
||||
}
|
||||
|
||||
func (filter *VmInstanceFilter) ApplyMdxFilter(tss []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
tssDst := tss[:0]
|
||||
for _, ts := range tss {
|
||||
isVmInstance := false
|
||||
var instance string
|
||||
var job string
|
||||
for _, label := range ts.Labels {
|
||||
if label.Name == "__name__" && label.Value == "vm_app_version" {
|
||||
isVmInstance = true
|
||||
}
|
||||
if label.Name == "instance" {
|
||||
instance = label.Value
|
||||
}
|
||||
if label.Name == "job" {
|
||||
job = label.Value
|
||||
}
|
||||
}
|
||||
identicalKey := fmt.Sprintf("%s:%s", job, instance)
|
||||
currTs := time.Now().Unix()
|
||||
|
||||
//fast path
|
||||
filter.mu.RLock()
|
||||
ptr, ok := filter.vmInstance[identicalKey]
|
||||
filter.mu.RUnlock()
|
||||
if ok {
|
||||
ptr.Store(currTs)
|
||||
tssDst = append(tssDst, ts)
|
||||
continue
|
||||
}
|
||||
if !isVmInstance {
|
||||
continue
|
||||
}
|
||||
|
||||
// slow path
|
||||
tssDst = append(tssDst, ts)
|
||||
filter.mu.Lock()
|
||||
if ptr, ok = filter.vmInstance[identicalKey]; ok {
|
||||
ptr.Store(currTs)
|
||||
} else {
|
||||
v := atomic.Int64{}
|
||||
v.Store(currTs)
|
||||
filter.vmInstance[identicalKey] = &v
|
||||
}
|
||||
filter.mu.Unlock()
|
||||
}
|
||||
|
||||
return tssDst
|
||||
}
|
||||
Reference in New Issue
Block a user