Compare commits

...

30 Commits

Author SHA1 Message Date
Jayice
abf19663a6 support pool labels in mdx filter 2026-06-18 16:57:17 +08:00
Jayice
7744c6fcee address review comments 2026-06-18 02:18:20 +08:00
Jayice
c193e8cdc5 address review comments 2026-06-18 00:05:34 +08:00
Jayice
c67adbd9c9 address review comments 2026-06-17 23:25:34 +08:00
Jayice
bcd1dc8c8b address review comments 2026-06-17 23:22:04 +08:00
JAYICE
f3326bf082 Merge branch 'master' into issue-10600
Signed-off-by: JAYICE <1185430411@qq.com>
2026-06-17 19:56:03 +08:00
Jayice
f3f7602482 address review comments 2026-06-17 19:50:55 +08:00
Max Kotliar
b58c73ac90 app/vmauth: refactor oidc discovery pool creation (#11123)
Initializing `oidcDP` outside or before `parseJWTUsers` would simplify its reuse with SSO implementation https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11122

PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11123
2026-06-17 13:52:39 +03:00
Nikolay
77efbb2e36 lib/fs: retry file deletion on NFS error
This commit adds additional file removal retries.
This is follow-up for
https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9842.

According to the stack trace, NFS also could return error for
`deleteFilePath`.
This retry is safe, since part already removed from `parts.json` and
performs a skips empty directories on restart.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11060
2026-06-17 09:16:42 +02:00
Alexander Frolov
e388e41430 app/vmagent: properly copy metadata unit value
Previously, Unit value at metrics metadata was incorrectly referenced instead of memory copy:

ed795a8443/app/vmagent/remotewrite/pendingseries.go (L209-L212)

 This commit properly copies Unit into dedicated buffer and prevents memory corruption.

Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11120
2026-06-17 09:16:04 +02:00
Jayice
7d558511d0 make linter happy 2026-06-15 15:46:41 +08:00
JAYICE
a2bb3f70a5 Merge branch 'master' into issue-10600 2026-06-15 15:40:08 +08:00
Jayice
2c496f4a38 address review comments 2026-06-15 15:25:35 +08:00
Jayice
ef0ae0fb9a address review comments 2026-06-05 14:40:28 +08:00
Jayice
4cbedc6b1b Merge remote-tracking branch 'origin/issue-10600' into issue-10600 2026-06-05 14:32:01 +08:00
Jayice
4f5cd15163 address review comments 2026-06-05 14:30:21 +08:00
JAYICE
5b161ce283 Merge branch 'master' into issue-10600 2026-06-05 13:44:59 +08:00
Jayice
43773e1d5c address review comments 2026-06-04 21:29:30 +08:00
Jayice
517c17b744 address review comments 2026-06-01 20:10:44 +08:00
Jayice
1c2622d8ae address review comments 2026-06-01 20:04:58 +08:00
Jayice
9a836dac59 address review comments 2026-06-01 20:00:52 +08:00
Jayice
799ecb0a08 support specify label and value to filter metrics to mdx url 2026-06-01 19:50:00 +08:00
Jayice
c88dc19052 address review comments 2026-06-01 14:52:16 +08:00
Jayice
919049f9e2 push slice back to pool 2026-05-25 17:34:41 +08:00
Jayice
24efe47c6a add unit test & address review comments 2026-05-25 17:08:23 +08:00
Jayice
f8d99d9289 polish documentation 2026-04-27 18:02:41 +08:00
Jayice
333a015be5 update CHANGELOG.md 2026-04-27 15:15:59 +08:00
JAYICE
b6196524ba Merge branch 'master' into issue-10600
Signed-off-by: JAYICE <1185430411@qq.com>
2026-04-27 15:13:02 +08:00
Jayice
e9f1bb911c add documentation 2026-04-27 15:10:17 +08:00
Jayice
12b79143dc implement mdx for remote write 2026-04-21 15:48:46 +08:00
10 changed files with 709 additions and 41 deletions

View File

@@ -209,13 +209,12 @@ func (wr *writeRequest) tryPushMetadata(mms []prompb.MetricMetadata) bool {
func (wr *writeRequest) copyMetadata(dst, src *prompb.MetricMetadata) {
// Direct copy for non-string fields, which are safe by value.
dst.Type = src.Type
dst.Unit = src.Unit
dst.AccountID = src.AccountID
dst.ProjectID = src.ProjectID
// Pre-allocate memory for all string fields.
neededBufLen := len(src.MetricFamilyName) + len(src.Help)
neededBufLen := len(src.MetricFamilyName) + len(src.Help) + len(src.Unit)
bufLen := len(wr.metadatabuf)
wr.metadatabuf = slicesutil.SetLength(wr.metadatabuf, bufLen+neededBufLen)
buf := wr.metadatabuf[:bufLen]
@@ -230,6 +229,11 @@ func (wr *writeRequest) copyMetadata(dst, src *prompb.MetricMetadata) {
buf = append(buf, src.Help...)
dst.Help = bytesutil.ToUnsafeString(buf[bufLen:])
// Copy Unit
bufLen = len(buf)
buf = append(buf, src.Unit...)
dst.Unit = bytesutil.ToUnsafeString(buf[bufLen:])
wr.metadatabuf = buf
}

View File

@@ -12,6 +12,7 @@ import (
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/metrics"
@@ -103,6 +104,9 @@ var (
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
"By default, metadata sending is controlled by the global -enableMetadata flag")
enableMdx = flagutil.NewArrayBool("remoteWrite.mdx.enable", "Whether to only retain metrics from VictoriaMetrics services before sending them to the corresponding -remoteWrite.url. "+
"Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
)
var (
@@ -304,6 +308,10 @@ func initRemoteWriteCtxs(urls []string) {
}
fs.RegisterPathFsMetrics(*tmpDataPath)
if slices.Contains(*enableMdx, true) && *shardByURL {
logger.Fatalf("-remoteWrite.mdx.enable and -remoteWrite.shardByURL cannot be set to true simultaneously.")
}
if *shardByURL {
consistentHashNodes := make([]string, 0, len(urls))
for i, url := range urls {
@@ -859,6 +867,7 @@ type remoteWriteCtx struct {
sas atomic.Pointer[streamaggr.Aggregators]
deduplicator *streamaggr.Deduplicator
mdxFilter *mdx.Filter
streamAggrKeepInput bool
streamAggrDropInput bool
@@ -873,6 +882,7 @@ type remoteWriteCtx struct {
rowsPushedAfterRelabel *metrics.Counter
rowsDroppedByRelabel *metrics.Counter
mdxRowsPreserved *metrics.Counter
pushFailures *metrics.Counter
metadataDroppedOnPushFailure *metrics.Counter
@@ -959,7 +969,6 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
for i := range pss {
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
}
rwctx := &remoteWriteCtx{
idx: argIdx,
fq: fq,
@@ -976,6 +985,16 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
}
rwctx.initStreamAggrConfig()
if enableMdx.GetOptionalArg(argIdx) {
mdxFilter := mdx.NewFilter()
rwctx.mdxFilter = mdxFilter
rwctx.mdxRowsPreserved = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_preserved_total{path=%q,url=%q}`, queuePath, sanitizedURL))
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_mdx_tracked_vm_instances{path=%q,url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(mdxFilter.VmInstancesCount())
})
}
return rwctx
}
@@ -989,6 +1008,11 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.deduplicator.MustStop()
rwctx.deduplicator = nil
}
if rwctx.mdxFilter != nil {
rwctx.mdxFilter.MustStop()
rwctx.mdxFilter = nil
rwctx.mdxRowsPreserved = nil
}
for _, ps := range rwctx.pss {
ps.MustStop()
@@ -1004,6 +1028,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.rowsPushedAfterRelabel = nil
rwctx.rowsDroppedByRelabel = nil
}
// TryPushTimeSeries sends tss series to the configured remote write endpoint
@@ -1011,8 +1036,13 @@ func (rwctx *remoteWriteCtx) MustStop() {
// TryPushTimeSeries doesn't modify tss, so tss can be passed concurrently to TryPush across distinct rwctx instances.
func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) bool {
var rctx *relabelCtx
var mctx *mdx.Ctx
var v *[]prompb.TimeSeries
defer func() {
if mctx != nil {
mctx.Reset()
mdx.CtxPool.Put(mctx)
}
if rctx == nil {
return
}
@@ -1021,6 +1051,22 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
putRelabelCtx(rctx)
}()
if rwctx.mdxFilter != nil {
mctx = mdx.CtxPool.Get().(*mdx.Ctx)
mctx.Reset()
tssResP := tssPool.Get().(*[]prompb.TimeSeries)
tssRes := rwctx.mdxFilter.Filter(tss, *tssResP, mctx)
defer func() {
*tssResP = prompb.ResetTimeSeries(tssRes)
tssPool.Put(tssResP)
}()
if len(tssRes) == 0 {
return true
}
tss = tssRes
}
// Apply relabeling
rcs := allRelabelConfigs.Load()
pcs := rcs.perURL[rwctx.idx]

View File

@@ -911,7 +911,8 @@ func reloadAuthConfigData(data []byte) (bool, error) {
return false, fmt.Errorf("failed to parse auth config: %w", err)
}
jui, oidcDP, err := parseJWTUsers(ac)
oidcDP := &oidcDiscovererPool{}
jui, err := parseJWTUsers(ac, oidcDP)
if err != nil {
return false, fmt.Errorf("failed to parse JWT users from auth config: %w", err)
}

View File

@@ -72,9 +72,8 @@ type JWTConfig struct {
verifierPool atomic.Pointer[jwt.VerifierPool]
}
func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
func parseJWTUsers(ac *AuthConfig, oidcDP *oidcDiscovererPool) ([]*UserInfo, error) {
jui := make([]*UserInfo, 0, len(ac.Users))
oidcDP := &oidcDiscovererPool{}
uniqClaims := make(map[string]*UserInfo)
var sortedClaims []string
@@ -85,10 +84,10 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
}
if ui.AuthToken != "" || ui.BearerToken != "" || ui.Username != "" || ui.Password != "" {
return nil, nil, fmt.Errorf("auth_token, bearer_token, username and password cannot be specified if jwt is set")
return nil, fmt.Errorf("auth_token, bearer_token, username and password cannot be specified if jwt is set")
}
if len(jwtToken.PublicKeys) == 0 && len(jwtToken.PublicKeyFiles) == 0 && !jwtToken.SkipVerify && jwtToken.OIDC == nil {
return nil, nil, fmt.Errorf("jwt must contain at least a single public key, public_key_files, oidc or have skip_verify=true")
return nil, fmt.Errorf("jwt must contain at least a single public key, public_key_files, oidc or have skip_verify=true")
}
var claimsString string
sortedClaims = sortedClaims[:0]
@@ -97,7 +96,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
sortedClaims = append(sortedClaims, fmt.Sprintf("%s=%s", ck, cv))
pc, err := jwt.NewClaim(ck, cv)
if err != nil {
return nil, nil, fmt.Errorf("incorrect match claim, key=%q, value regex=%q: %w", ck, cv, err)
return nil, fmt.Errorf("incorrect match claim, key=%q, value regex=%q: %w", ck, cv, err)
}
parsedClaims = append(parsedClaims, pc)
}
@@ -106,7 +105,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
claimsString = strings.Join(sortedClaims, ",")
if oldUI, ok := uniqClaims[claimsString]; ok {
return nil, nil, fmt.Errorf("duplicate match claims=%q found for name=%q at idx=%d; the previous one is set for name=%q", claimsString, ui.Name, idx, oldUI.Name)
return nil, fmt.Errorf("duplicate match claims=%q found for name=%q at idx=%d; the previous one is set for name=%q", claimsString, ui.Name, idx, oldUI.Name)
}
uniqClaims[claimsString] = &ui
if len(jwtToken.PublicKeys) > 0 || len(jwtToken.PublicKeyFiles) > 0 {
@@ -115,7 +114,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
for i := range jwtToken.PublicKeys {
k, err := jwt.ParseKey([]byte(jwtToken.PublicKeys[i]))
if err != nil {
return nil, nil, err
return nil, err
}
keys = append(keys, k)
}
@@ -123,52 +122,52 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
for _, filePath := range jwtToken.PublicKeyFiles {
keyData, err := os.ReadFile(filePath)
if err != nil {
return nil, nil, fmt.Errorf("cannot read public key from file %q: %w", filePath, err)
return nil, fmt.Errorf("cannot read public key from file %q: %w", filePath, err)
}
k, err := jwt.ParseKey(keyData)
if err != nil {
return nil, nil, fmt.Errorf("cannot parse public key from file %q: %w", filePath, err)
return nil, fmt.Errorf("cannot parse public key from file %q: %w", filePath, err)
}
keys = append(keys, k)
}
vp, err := jwt.NewVerifierPool(keys)
if err != nil {
return nil, nil, err
return nil, err
}
jwtToken.verifierPool.Store(vp)
}
if jwtToken.OIDC != nil {
if len(jwtToken.PublicKeys) > 0 || len(jwtToken.PublicKeyFiles) > 0 || jwtToken.SkipVerify {
return nil, nil, fmt.Errorf("jwt with oidc cannot contain public keys or have skip_verify=true")
return nil, fmt.Errorf("jwt with oidc cannot contain public keys or have skip_verify=true")
}
if jwtToken.OIDC.Issuer == "" {
return nil, nil, fmt.Errorf("oidc issuer cannot be empty")
return nil, fmt.Errorf("oidc issuer cannot be empty")
}
isserURL, err := url.Parse(jwtToken.OIDC.Issuer)
if err != nil {
return nil, nil, fmt.Errorf("oidc issuer %q must be a valid URL", jwtToken.OIDC.Issuer)
return nil, fmt.Errorf("oidc issuer %q must be a valid URL", jwtToken.OIDC.Issuer)
}
if isserURL.Scheme != "https" && isserURL.Scheme != "http" {
return nil, nil, fmt.Errorf("oidc issuer %q must have http or https scheme", jwtToken.OIDC.Issuer)
return nil, fmt.Errorf("oidc issuer %q must have http or https scheme", jwtToken.OIDC.Issuer)
}
oidcDP.createOrAdd(ui.JWT.OIDC.Issuer, &ui.JWT.verifierPool)
}
if err := parseJWTPlaceholdersForUserInfo(&ui, true); err != nil {
return nil, nil, err
return nil, err
}
if err := ui.initURLs(); err != nil {
return nil, nil, err
return nil, err
}
metricLabels, err := ui.getMetricLabels()
if err != nil {
return nil, nil, fmt.Errorf("cannot parse metric_labels: %w", err)
return nil, fmt.Errorf("cannot parse metric_labels: %w", err)
}
ui.requests = ac.ms.GetOrCreateCounter(`vmauth_user_requests_total` + metricLabels)
ui.requestErrors = ac.ms.GetOrCreateCounter(`vmauth_user_request_errors_total` + metricLabels)
@@ -187,7 +186,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
rt, err := newRoundTripper(ui.TLSCAFile, ui.TLSCertFile, ui.TLSKeyFile, ui.TLSServerName, ui.TLSInsecureSkipVerify)
if err != nil {
return nil, nil, fmt.Errorf("cannot initialize HTTP RoundTripper: %w", err)
return nil, fmt.Errorf("cannot initialize HTTP RoundTripper: %w", err)
}
ui.rt = rt
@@ -200,7 +199,7 @@ func parseJWTUsers(ac *AuthConfig) ([]*UserInfo, *oidcDiscovererPool, error) {
return len(jui[i].JWT.MatchClaims) > len(jui[j].JWT.MatchClaims)
})
return jui, oidcDP, nil
return jui, nil
}
var tokenPool sync.Pool

View File

@@ -39,16 +39,14 @@ XOtclIk1uhc03oL9nOQ=
}
return
}
users, oidcDP, err := parseJWTUsers(ac)
oidcDP := &oidcDiscovererPool{}
users, err := parseJWTUsers(ac, oidcDP)
if err == nil {
t.Fatalf("expecting non-nil error; got %v", users)
}
if expErr != err.Error() {
t.Fatalf("unexpected error; got\n%q\nwant \n%q", err.Error(), expErr)
}
if oidcDP != nil {
t.Fatalf("expecting nil oidcDP; got %v", oidcDP)
}
}
// unauthorized_user cannot be used with jwt
@@ -326,7 +324,8 @@ XOtclIk1uhc03oL9nOQ=
t.Fatalf("unexpected error: %s", err)
}
jui, oidcDP, err := parseJWTUsers(ac)
oidcDP := &oidcDiscovererPool{}
jui, err := parseJWTUsers(ac, oidcDP)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}

View File

@@ -30,11 +30,15 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add support for [Monitoring Data eXchange (MDX)](https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange): the ability to route only metrics from VictoriaMetrics services to a specific `-remoteWrite.url`. MDX is useful for building monitoring-of-monitoring where one remote storage should receive the full metric stream and another should receive only VictoriaMetrics metrics. Enable per destination with `-remoteWrite.mdx.enable=true`. See [#10600](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
* BUGFIX: [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/),[vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/),[vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/) and [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/): fix rare unbounded shutdown delay when config reload takes longer than `-configCheckInterval`. See [#11107](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11107). Thanks to @PleasingFungus for contribution.
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fix corrupted metrics metadata when a response contains multiple rows. See [#11115](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11115). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): fix potential corruption of remote-write metadata `Unit` values. See [#11120](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11120). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmbackup](https://docs.victoriametrics.com/vmbackup/), [vmbackupmanager](https://docs.victoriametrics.com/victoriametrics/vmbackupmanager/): do not fail backup list if directory is absent while using `fs://` destination to align with other protocols. See [6c3c548](https://github.com/VictoriaMetrics/VictoriaMetrics/commit/6c3c548ddb0385b749e731f52276f130e2a4e4a8)
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent more cases of panic during directory deletion on `NFS`-based mounts. See [#11060](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11060).
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)

View File

@@ -268,6 +268,39 @@ for the collected samples. Examples:
```sh
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -streamAggr.dedupInterval=60s
```
### Monitoring Data eXchange
The Monitoring Data eXchange (MDX) feature allows `vmagent` to forward only VictoriaMetrics metrics to selected `-remoteWrite.url` destinations while dropping metrics from non-VictoriaMetrics services.
To enable MDX, set `-remoteWrite.mdx.enable=true` for the target URL and `-remoteWrite.mdx.enable=false` for other URLs:
```sh
./vmagent \
-remoteWrite.url=http://service-to-keep-all-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=false \
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=true
```
When MDX is enabled for a `-remoteWrite.url`, `vmagent` forwards only metrics that:
- come from the target that exposes the `vm_app_version` metric (emitted by all VictoriaMetrics components)
- contain the `victoriametrics_app=true` label, which will be added automatically to the metrics if the instance was deployed via [VictoriaMetrics Operator](https://docs.victoriametrics.com/operator/).
`victoriametrics_app=true` label will be added to all metrics that are preserved by MDX if it's absent.
- contain the label specified via `-mdx.label`.
```sh
./vmagent \
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=true \
-mdx.label="service=victoriametrics"
```
In this configuration, metrics with the label `service=victoriametrics` are preserved even if their scrape targets do not expose `vm_app_version` metric.
The number of VictoriaMetrics metrics preserved by MDX is exposed as `vmagent_remotewrite_mdx_rows_preserved_total`.
The scope of MDX is at the per-url level, so it works after global level mechanisms, such as stream aggregation, relabeling, complexity limiter, and cardinality limiter. See [Life of a sample](https://docs.victoriametrics.com/victoriametrics/vmagent/#life-of-a-sample).
### Life of a sample
@@ -285,18 +318,20 @@ flowchart TB
F --> G[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#replication-and-high-availability">replicate</a> to each <b>-remoteWrite.url</b><br/>or <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages">shard</a> if <b>-remoteWrite.shardByURL</b> is set]
%% Left branch
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange/">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
H2 --> H3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
H3 --> H4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
%% Right branch
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
R2 --> R3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
R3 --> R4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
```
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:

View File

@@ -179,6 +179,9 @@ func tryRemoveDir(dirPath string) bool {
// times simultaneously and properly close it, fs caching may still
// confuse NFS client.
if err := os.RemoveAll(dirEntryPath); err != nil {
if os.IsNotExist(err) {
return
}
if !isTemporaryNFSError(err) {
logger.Fatalf("FATAL: cannot remove %q: %s", dirEntryPath, err)
}
@@ -203,8 +206,9 @@ func tryRemoveDir(dirPath string) bool {
deleteFilePath := filepath.Join(dirPath, deleteDirFilename)
// Remove the deleteDirFilename file, since there are no other entries left in the directory.
MustRemovePath(deleteFilePath)
if !tryRemovePath(deleteFilePath) {
return false
}
// Sync the directory after the removing deletDirFilename file in order to make sure
// all the metadata files are removed at some exotic filesystems such as OSSFS2.
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/649
@@ -212,7 +216,9 @@ func tryRemoveDir(dirPath string) bool {
MustSyncPath(dirPath)
// Remove the dirPath itself
MustRemovePath(dirPath)
if !tryRemovePath(dirPath) {
return false
}
// Do not sync the parent directory for the dirPath - the caller can do this if needed.
// It is OK if the dirPath will remain undeleted after unclean shutdown - it will be deleted
@@ -221,6 +227,23 @@ func tryRemoveDir(dirPath string) bool {
return true
}
// tryRemovePath removes given path and returns true on success
// or false if error is temporary NFS error
func tryRemovePath(path string) bool {
if err := os.Remove(path); err != nil {
if os.IsNotExist(err) {
return true
}
if !isTemporaryNFSError(err) {
logger.Fatalf("FATAL: cannot remove %q: %s", path, err)
}
nfsDirRemoveFailedAttempts.Inc()
return false
}
return true
}
var (
dirRemoverWG sync.WaitGroup
nfsDirRemoveFailedAttempts = metrics.NewCounter(`vm_nfs_dir_remove_failed_attempts_total`)

198
lib/mdx/filter.go Normal file
View File

@@ -0,0 +1,198 @@
package mdx
import (
"flag"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
)
var (
vmLabel = flag.String("mdx.label", "", "Optional label in the form 'name=value' used to identify VictoriaMetrics metrics for MDX. Metrics containing the specified label are forwarded to `-remoteWrite.url` endpoints configured with `-remoteWrite.mdx.enable=true`.")
vmAppLabelName = "victoriametrics_app"
)
type Ctx struct {
// pool for labels, which are used when adding victoriametrics_app label to the original labels.
labels []prompb.Label
}
func (ctx *Ctx) Reset() {
promrelabel.CleanLabels(ctx.labels)
ctx.labels = ctx.labels[:0]
}
var CtxPool = &sync.Pool{
New: func() any {
return &Ctx{}
},
}
// Filter manages the list of VictoriaMetrics instances discovered from previous data flow, and uses it to filter out metrics that are not from VictoriaMetrics instances.
type Filter struct {
mu sync.RWMutex
wg sync.WaitGroup
stopCh chan struct{}
vmInstance map[string]*atomic.Int64
filterByLabelName string
filterByLabelValue string
}
func NewFilter() *Filter {
filter := &Filter{
vmInstance: make(map[string]*atomic.Int64),
stopCh: make(chan struct{}),
}
if len(*vmLabel) != 0 {
n := strings.IndexByte(*vmLabel, '=')
if n < 0 {
logger.Fatalf("missing '=' in `-mdx.label`. It must contain label in the form `name=value`; got %q", *vmLabel)
}
filter.filterByLabelName = (*vmLabel)[:n]
filter.filterByLabelValue = (*vmLabel)[n+1:]
}
filter.wg.Go(filter.cleanStale)
return filter
}
func (filter *Filter) VmInstancesCount() int {
if filter == nil {
return 0
}
filter.mu.RLock()
defer filter.mu.RUnlock()
return len(filter.vmInstance)
}
func (filter *Filter) cleanStale() {
entryTTL := time.Hour * 1
ttlSec := int64(entryTTL.Seconds())
ticker := time.NewTicker(time.Minute)
defer ticker.Stop()
for {
select {
case <-ticker.C:
filter.mu.Lock()
currTs := time.Now().Unix()
dst := make(map[string]*atomic.Int64, len(filter.vmInstance))
for k, v := range filter.vmInstance {
if currTs-v.Load() < ttlSec {
dst[k] = v
}
}
if len(dst) != len(filter.vmInstance) {
filter.vmInstance = dst
}
filter.mu.Unlock()
case <-filter.stopCh:
return
}
}
}
func (filter *Filter) MustStop() {
if filter == nil {
return
}
close(filter.stopCh)
filter.wg.Wait()
}
func (filter *Filter) Filter(tss []prompb.TimeSeries, resTss []prompb.TimeSeries, ctx *Ctx) []prompb.TimeSeries {
currTs := time.Now().Unix()
var identicalKey []byte
poolLabels := ctx.labels[:0]
maybeAddVmAppLabel := func(idx int, labels []prompb.Label) []prompb.Label {
for j := idx + 1; j < len(labels); j++ {
if labels[j].Name == vmAppLabelName && labels[j].Value == "true" {
return labels
}
}
poolLabelsLen := len(poolLabels)
poolLabels = append(poolLabels, labels...)
poolLabels = append(poolLabels, prompb.Label{Name: vmAppLabelName, Value: "true"})
return poolLabels[poolLabelsLen:]
}
nextTss:
for _, ts := range tss {
var hasVersionLabel, triedJobInstance bool
var job, instance string
for i, label := range ts.Labels {
if label.Name == vmAppLabelName && label.Value == "true" {
resTss = append(resTss, ts)
continue nextTss
}
if filter.filterByLabelName != "" && label.Name == filter.filterByLabelName && label.Value == filter.filterByLabelValue {
ts.Labels = maybeAddVmAppLabel(i, ts.Labels)
resTss = append(resTss, ts)
continue nextTss
}
if label.Name == "__name__" && label.Value == "vm_app_version" {
hasVersionLabel = true
}
if instance == "" && label.Name == "instance" {
if label.Value == "" {
continue
}
instance = label.Value
}
if job == "" && label.Name == "job" {
if label.Value == "" {
continue
}
job = label.Value
}
if !triedJobInstance && job != "" && instance != "" {
identicalKey = identicalKey[:0]
identicalKey = strconv.AppendQuote(identicalKey, job)
identicalKey = append(identicalKey, ':')
identicalKey = strconv.AppendQuote(identicalKey, instance)
filter.mu.RLock()
ptr, found := filter.vmInstance[bytesutil.ToUnsafeString(identicalKey)]
filter.mu.RUnlock()
if found {
ptr.Store(currTs)
ts.Labels = maybeAddVmAppLabel(i, ts.Labels)
resTss = append(resTss, ts)
continue nextTss
}
triedJobInstance = true
}
if hasVersionLabel && job != "" && instance != "" {
identicalKey = identicalKey[:0]
identicalKey = strconv.AppendQuote(identicalKey, job)
identicalKey = append(identicalKey, ':')
identicalKey = strconv.AppendQuote(identicalKey, instance)
v := &atomic.Int64{}
v.Store(currTs)
filter.mu.Lock()
filter.vmInstance[string(identicalKey)] = v
filter.mu.Unlock()
ts.Labels = maybeAddVmAppLabel(i, ts.Labels)
resTss = append(resTss, ts)
continue nextTss
}
}
}
return resTss
}

359
lib/mdx/filter_test.go Normal file
View File

@@ -0,0 +1,359 @@
package mdx
import (
"fmt"
"sort"
"strings"
"testing"
"testing/synctest"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
)
func timeSeriessToString(tss []prompb.TimeSeries) string {
a := make([]string, len(tss))
for i, ts := range tss {
a[i] = timeSeriesToString(ts)
}
sort.Strings(a)
return strings.Join(a, "")
}
func timeSeriesToString(ts prompb.TimeSeries) string {
labelsString := promrelabel.LabelsToString(ts.Labels)
return fmt.Sprintf("%s\n", labelsString)
}
func TestMdxInstanceFilter(t *testing.T) {
originalVmLabel := *vmLabel
*vmLabel = "service=victoriametrics"
filter := NewFilter()
defer filter.MustStop()
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries, expectedInstanceMap map[string]int64) {
t.Helper()
ctx := Ctx{}
output := filter.Filter(input, nil, &ctx)
outputString := timeSeriessToString(output)
expectedOutputString := timeSeriessToString(expectedOutput)
if outputString != expectedOutputString {
t.Fatalf("unexpected output; got %s; want %s", outputString, expectedOutputString)
}
if filter.VmInstancesCount() != len(expectedInstanceMap) {
t.Fatalf("unexpected instance map length; got %d; want %d", len(filter.vmInstance), len(expectedInstanceMap))
}
for k := range expectedInstanceMap {
if filter.vmInstance[k] == nil {
t.Fatalf("missing instance in filter.vmInstance: %q", k)
}
}
}
// the first call
f([]prompb.TimeSeries{
// 1. metrics with vm_app_version and different order of labels.
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics2:8428"},
{Name: "__name__", Value: "vm_app_version"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "job", Value: "test"},
{Name: "instance", Value: "victoria-metrics3:8428"},
{Name: "__name__", Value: "vm_app_version"},
},
},
// 2.
// metrics without vm_app_version but with service=victoriametrics that is specified in `-vm.label`.
// it will be preserved, but won't be registered in instance map in MDX
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_slow_queries_total"},
{Name: "job", Value: "test"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "service", Value: "victoriametrics"},
},
},
// 3. metrics with vm_app_version and service=victoriametrics should be preserved.
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics5:8428"},
{Name: "job", Value: "test"},
{Name: "service", Value: "victoriametrics"},
{Name: "__name__", Value: "vm_app_version"},
},
},
// 4. metrics without vm_app_version and `service=victoriametrics` but with `victoriametrics_app=true`, which should be preserved.
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics6:8428"},
{Name: "job", Value: "test"},
{Name: "__name__", Value: "vm_slow_queries_total"},
{Name: "victoriametrics_app", Value: "true"},
},
},
// 5. metrics without vm_app_version and service=victoriametrics and `victoriametrics_app=true`, which should be filtered out.
{
Labels: []prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "node-exporter1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds"},
{Name: "instance", Value: "service1"},
{Name: "job", Value: "test"},
},
},
// 6. metrics with vm_app_version but job or instance is empty (or missing), they should be dropped.
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: ""},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent2:8429"},
{Name: "job", Value: ""},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent2:8429"},
},
},
},
// `victoriametrics_app=true` should be added to all preserved metrics if absent.
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics2:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics3:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_slow_queries_total"},
{Name: "service", Value: "victoriametrics"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics5:8428"},
{Name: "job", Value: "test"},
{Name: "__name__", Value: "vm_app_version"},
{Name: "service", Value: "victoriametrics"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics6:8428"},
{Name: "job", Value: "test"},
{Name: "__name__", Value: "vm_slow_queries_total"},
{Name: "victoriametrics_app", Value: "true"},
},
},
},
// only instances that are discovered via `vm_app_version` will be registered in instance map in MDX.
map[string]int64{
"\"test\":\"victoria-metrics1:8428\"": 0,
"\"test\":\"victoria-metrics2:8428\"": 0,
"\"test\":\"victoria-metrics3:8428\"": 0,
})
// the second call
f([]prompb.TimeSeries{
// 1. metrics without vm_app_version, but the instances were already registered in the previous call, so it will be preserved.
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_rows_inserted_total"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
{Name: "instance", Value: "victoria-metrics2:8428"},
{Name: "job", Value: "test"},
},
},
// 2. metrics without vm_app_version, `service=victoriametrics` and `victoriametrics_app=true`, and the instance wasn't already registered in the previous call, so it will be dropped.
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
{Name: "instance", Value: "victoria-metrics7:8428"},
{Name: "job", Value: "test"},
},
},
// 3. metrics with service=victoriametrics.
{
Labels: []prompb.Label{
{Name: "service", Value: "victoriametrics"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "job", Value: "test"},
},
},
},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_rows_inserted_total"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
{Name: "instance", Value: "victoria-metrics2:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "service", Value: "victoriametrics"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
},
// only instances that are discovered via `vm_app_version` will be registered in instance map in MDX.
map[string]int64{
"\"test\":\"victoria-metrics1:8428\"": 0,
"\"test\":\"victoria-metrics2:8428\"": 0,
"\"test\":\"victoria-metrics3:8428\"": 0,
})
*vmLabel = originalVmLabel
}
func TestMdxInstanceCleanup(t *testing.T) {
t.Helper()
synctest.Test(t, func(t *testing.T) {
filter := NewFilter()
defer filter.MustStop()
ctx := Ctx{}
filter.Filter([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "node-exporter1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds"},
{Name: "instance", Value: "service1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent1:8429"},
{Name: "job", Value: "test"},
},
}}, []prompb.TimeSeries{}, &ctx,
)
f := func(expectedInstanceMap map[string]int64) {
t.Helper()
if filter.VmInstancesCount() != len(expectedInstanceMap) {
t.Fatalf("unexpected instance map length; got %d; want %d", len(filter.vmInstance), len(expectedInstanceMap))
}
for k := range expectedInstanceMap {
if filter.vmInstance[k] == nil {
t.Fatalf("missing instance in filter.vmInstance: %q", k)
}
}
}
time.Sleep(59 * time.Minute)
// the entries should not be cleaned.
f(map[string]int64{
"\"test\":\"victoria-metrics1:8428\"": 0,
"\"test\":\"vmagent1:8429\"": 0,
})
// receive samples from victoria-metrics1:8428 after 59 minutes.
// so the entry will be refreshed.
ctx.Reset()
filter.Filter([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
}}, []prompb.TimeSeries{}, &ctx,
)
time.Sleep(2 * time.Minute)
// no samples from vmagent1:8429 in the last hour, so it should be removed from the mdx instance list.
f(map[string]int64{
"\"test\":\"victoria-metrics1:8428\"": 0,
})
})
}