Compare commits

...

14 Commits

Author SHA1 Message Date
Benjamin Nichols-Farquhar
c82127b6d4 app/vmagent: Optimize remotewrite hot paths
## Description

We have a simple vmagent setup using `remotewrite.shardByURL` to
horizontally scale stream aggregations, as suggested
[here](https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages)
with minimal additional configuration and noticed there was some
optimization opportunities.

## Investigation 

We took a
[pprof](https://github.com/user-attachments/files/28968177/cpu.pprof.zip)
and found that our CPU time was in a few primary places.

1. payload protobuf marshaling 
2. zstd compresssion 
3. Map access in sharding
4. Hashing in sharding
5. Memory copies for timeseries 

## Changes


Zstd compression is pretty intractable(besides using
`remotewrite.vmProtoCompressLevel`, which we already do), but there were
some relatively easy gains for the other paths.

1. inline sov() when l < 128 on marshal. Skips calling a bunch of byte
math in a common case
2. Use slices instead of maps for shard keys, in the common case where
there are less than ~10 keys a slice iteration will be faster
3. use `xxhash.Digest.WriteString` rather than buffering values, avoids
some extra allocations and GC work
4. In the simple case where `insertRows` doesn't modify label names or
values construct the remotewrite request from the passed values instead
of copying them. Avoids a bunch of memory copy and GC work.

## Results

Modest but effective, we saw a ~10% reduction in total cpu cores used by
vmagent with these changes.
<img width="2358" height="709" alt="Screenshot 2026-06-15 at 2 24 43 PM"
src="https://github.com/user-attachments/assets/d2931242-c7ed-447b-9cfd-73b29cd0b893"
/>

Related PR https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11113
2026-06-30 18:52:03 +02:00
JAYICE
06bc808ddc app/vmagent: support MDX(monitor data exchange) for remote write
This commit adds MDX(Monitoring Data eXchange) feature into vmagent. 
It allows to detect VictoriaMetrics related components and forward metrics only for those instances.

In default state, mdx detects timeseries based on `vm_app_version` metric name and tracks instance based on "job:instance" key. It holds tracked instances in-memory for 1 hour, which must be enough for the most scrape intervals.

fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600
2026-06-30 18:31:17 +02:00
Nikolay
a6927c46be follow-up for e3121ad473 (#11162)
Fixes data-race at emwa.go package.
Write and read requests of the shared variable must be guarded by the
mutex lock.
2026-06-30 18:19:59 +02:00
xhe
15a4c31e87 app/vmselect: optimize repeated binary op rollup aggregates
This commit improves performance for sub query requests by properly cache binary operation results.

 For context:

When using binary operator in query, vmselect will executes the left and right expressions in parallel and independently. When the left and right expressions contain the same sub-expression, e.g.

```
count( count(vm_requests_total) by (action,addr,cluster,endpoint) ) by (action,addr,cluster) 
/
count( count(vm_requests_total) by (action,addr,cluster,endpoint) )
```

```count( count(vm_requests_total) by (action,addr,cluster,endpoint) is a sub expression in both left and right side.```

The sub expression will be executed twice (once in each of the left and right expressions). This introduces additional RPC and resource overhead.

 Currently, this feature is hidden behind query request param - `optimize_repeated_binary_op_subexprs`.
Later it could be transited into default behavior.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10575
2026-06-30 17:47:19 +02:00
Andrii Chubatiuk
54f9cd6edd chore(app/vmalert): update TestGroupValidate_Success and TestGroupValidate_Failure tests
This changes is needed to simplify tests logic and reduce diff between OSS and enterprise branches

Related https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11157
2026-06-30 17:44:01 +02:00
Zakhar Bessarab
2e16874e95 app/vmauth: add new default_vm_access_claim field
This commit relaxes JWT token format. Previously  "vm_access" claim was required.
Which may not be a case for some users. For example, if there is no need at request templating.
  The new `jwt` section field `default_vm_access_claim` was added for this purpose.
It's used as fall-back for templating data.

See: https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11054

---------

Signed-off-by: Nikolay <nik@victoriametrics.com>
Co-authored-by: f41gh7 <nik@victoriametrics.com>
2026-06-30 17:37:20 +02:00
Phuong Le
81d330f297 lib/httpserver: cancel in-flight requests when graceful shutdown times out
`http.Server.Shutdown()` waits for in-flight requests but never cancels
them, so long-lived ones (e.g. VictoriaLogs live tailing) make graceful
shutdown timeout, and the resulting `logger.Fatalf` -> `os.Exit` skips
the storage flush and loses data. So adding a cancelable `BaseContext`
that is canceled once `-http.maxGracefulShutdownDuration` elapses.

Updates https://github.com/VictoriaMetrics/VictoriaLogs/issues/1502

Also notes that VictoriaMetrics query execution is deadline-driven and
ignores ctx, so it's a no-op there.
2026-06-30 17:30:30 +02:00
Raymond Chen
3278ddd170 docs/victoriametrics: fix broken Prometheus WAL remote write blog link
The Grafana blog post about Prometheus 2.8 WAL-based remote write has
moved. The old URL
(`https://grafana.com/blog/2019/03/25/whats-new-in-prometheus-2.8-wal-based-remote-write/`)
now 404s; this updates both occurrences in the docs to the current
location
(`https://grafana.com/blog/whats-new-in-prometheus-2-8-wal-based-remote-write/`),
which returns HTTP 200.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11184

🤖 Generated with [Claude Code](https://claude.com/claude-code)
2026-06-30 17:29:41 +02:00
Max Kotliar
cc790c2ea1 deployment: bump Alpine version to v3.24.1 for security updates
See https://www.alpinelinux.org/posts/Alpine-3.24.1-released.html
2026-06-30 16:02:14 +03:00
Max Kotliar
950f38fd6a docs/vmestimator: sync docs 2026-06-30 12:11:27 +03:00
Max Kotliar
ab9db9152f docs/changelog: remove duplicated lines 2026-06-30 11:57:54 +03:00
Max Kotliar
5b89f52c72 docs/vmestimator: sync docs 2026-06-30 11:19:18 +03:00
Max Kotliar
3608ab5b4c docs/vmestimator: fix link 2026-06-29 19:18:08 +03:00
Max Kotliar
5ee1fa70c1 docs/vmestimator: sync docs 2026-06-29 19:12:09 +03:00
25 changed files with 1759 additions and 401 deletions

View File

@@ -35,6 +35,9 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
}
func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, mms []prompb.MetricMetadata, extraLabels []prompb.Label) error {
if len(extraLabels) == 0 && !prommetadata.IsEnabled() && at == nil {
return insertRowsFast(at, timeseries)
}
ctx := common.GetPushCtx()
defer common.PutPushCtx(ctx)
@@ -102,3 +105,17 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, mms []prompb.Met
rowsPerInsert.Update(float64(rowsTotal))
return nil
}
func insertRowsFast(at *auth.Token, timeseries []prompb.TimeSeries) error {
rowsTotal := 0
for i := range timeseries {
rowsTotal += len(timeseries[i].Samples)
}
wr := &prompb.WriteRequest{Timeseries: timeseries}
if !remotewrite.TryPush(at, wr) {
return remotewrite.ErrQueueFullHTTPRetry
}
rowsInserted.Add(rowsTotal)
rowsPerInsert.Update(float64(rowsTotal))
return nil
}

View File

@@ -12,19 +12,18 @@ import (
"sync/atomic"
"time"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/metrics"
"github.com/cespare/xxhash/v2"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consistenthash"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
@@ -106,6 +105,9 @@ var (
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
"By default, metadata sending is controlled by the global -enableMetadata flag")
enableMdx = flagutil.NewArrayBool("remoteWrite.mdx.enable", "Whether to only retain metrics from VictoriaMetrics services before sending them to the corresponding -remoteWrite.url. "+
"Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
)
var (
@@ -162,8 +164,8 @@ func InitSecretFlags() {
}
var (
shardByURLLabelsMap map[string]struct{}
shardByURLIgnoreLabelsMap map[string]struct{}
shardByURLLabelsFilter []string
shardByURLIgnoreLabelsFilter []string
)
// Init initializes remotewrite.
@@ -210,8 +212,8 @@ func Init() {
logger.Fatalf("-remoteWrite.shardByURL.labels and -remoteWrite.shardByURL.ignoreLabels cannot be set simultaneously; " +
"see https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages")
}
shardByURLLabelsMap = newMapFromStrings(*shardByURLLabels)
shardByURLIgnoreLabelsMap = newMapFromStrings(*shardByURLIgnoreLabels)
shardByURLLabelsFilter = slices.Clone(*shardByURLLabels)
shardByURLIgnoreLabelsFilter = slices.Clone(*shardByURLIgnoreLabels)
initLabelsGlobal()
@@ -307,6 +309,10 @@ func initRemoteWriteCtxs(urls []string) {
}
fs.RegisterPathFsMetrics(*tmpDataPath)
if slices.Contains(*enableMdx, true) && *shardByURL {
logger.Fatalf("-remoteWrite.mdx.enable and -remoteWrite.shardByURL cannot be set to true simultaneously.")
}
if *shardByURL {
consistentHashNodes := make([]string, 0, len(urls))
for i, url := range urls {
@@ -698,18 +704,18 @@ func shardAmountRemoteWriteCtx(tssBlock []prompb.TimeSeries, shards [][]prompb.T
for _, ts := range tssBlock {
hashLabels := ts.Labels
if len(shardByURLLabelsMap) > 0 {
if len(shardByURLLabelsFilter) > 0 {
hashLabels = tmpLabels.Labels[:0]
for _, label := range ts.Labels {
if _, ok := shardByURLLabelsMap[label.Name]; ok {
if slices.Contains(shardByURLLabelsFilter, label.Name) {
hashLabels = append(hashLabels, label)
}
}
tmpLabels.Labels = hashLabels
} else if len(shardByURLIgnoreLabelsMap) > 0 {
} else if len(shardByURLIgnoreLabelsFilter) > 0 {
hashLabels = tmpLabels.Labels[:0]
for _, label := range ts.Labels {
if _, ok := shardByURLIgnoreLabelsMap[label.Name]; !ok {
if !slices.Contains(shardByURLIgnoreLabelsFilter, label.Name) {
hashLabels = append(hashLabels, label)
}
}
@@ -810,34 +816,26 @@ var (
// it omits the '=' separator between label name and value for backward compatibility.
// Changing it would re-shard all series across remoteWrite targets.
func getLabelsHashForShard(labels []prompb.Label) uint64 {
bb := labelsHashBufPool.Get()
b := bb.B[:0]
var d xxhash.Digest
d.Reset()
for _, label := range labels {
b = append(b, label.Name...)
b = append(b, label.Value...)
_, _ = d.WriteString(label.Name)
_, _ = d.WriteString(label.Value)
}
h := xxhash.Sum64(b)
bb.B = b
labelsHashBufPool.Put(bb)
return h
return d.Sum64()
}
func getLabelsHash(labels []prompb.Label) uint64 {
bb := labelsHashBufPool.Get()
b := bb.B[:0]
var d xxhash.Digest
d.Reset()
for _, label := range labels {
b = append(b, label.Name...)
b = append(b, '=')
b = append(b, label.Value...)
_, _ = d.WriteString(label.Name)
_, _ = d.WriteString("=")
_, _ = d.WriteString(label.Value)
}
h := xxhash.Sum64(b)
bb.B = b
labelsHashBufPool.Put(bb)
return h
return d.Sum64()
}
var labelsHashBufPool bytesutil.ByteBufferPool
func logSkippedSeries(labels []prompb.Label, flagName string, flagValue int) {
select {
case <-logSkippedSeriesTicker.C:
@@ -862,6 +860,7 @@ type remoteWriteCtx struct {
sas atomic.Pointer[streamaggr.Aggregators]
deduplicator *streamaggr.Deduplicator
mdxFilter *mdx.Filter
streamAggrKeepInput bool
streamAggrDropInput bool
@@ -876,6 +875,7 @@ type remoteWriteCtx struct {
rowsPushedAfterRelabel *metrics.Counter
rowsDroppedByRelabel *metrics.Counter
mdxRowsPreserved *metrics.Counter
pushFailures *metrics.Counter
metadataDroppedOnPushFailure *metrics.Counter
@@ -972,7 +972,6 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
for i := range pss {
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
}
rwctx := &remoteWriteCtx{
idx: argIdx,
fq: fq,
@@ -989,6 +988,16 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
}
rwctx.initStreamAggrConfig()
if enableMdx.GetOptionalArg(argIdx) {
mdxFilter := mdx.NewFilter()
rwctx.mdxFilter = mdxFilter
rwctx.mdxRowsPreserved = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_preserved_total{path=%q,url=%q}`, queuePath, sanitizedURL))
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_mdx_tracked_instances{path=%q,url=%q}`, queuePath, sanitizedURL), func() float64 {
return float64(mdxFilter.VMInstancesCount())
})
}
return rwctx
}
@@ -1002,6 +1011,11 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.deduplicator.MustStop()
rwctx.deduplicator = nil
}
if rwctx.mdxFilter != nil {
rwctx.mdxFilter.MustStop()
rwctx.mdxFilter = nil
rwctx.mdxRowsPreserved = nil
}
for _, ps := range rwctx.pss {
ps.MustStop()
@@ -1017,6 +1031,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
rwctx.rowsPushedAfterRelabel = nil
rwctx.rowsDroppedByRelabel = nil
}
// TryPushTimeSeries sends tss series to the configured remote write endpoint
@@ -1024,16 +1039,41 @@ func (rwctx *remoteWriteCtx) MustStop() {
// TryPushTimeSeries doesn't modify tss, so tss can be passed concurrently to TryPush across distinct rwctx instances.
func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) bool {
var rctx *relabelCtx
var mctx *mdx.Ctx
var v *[]prompb.TimeSeries
defer func() {
if rctx == nil {
return
if v != nil {
*v = prompb.ResetTimeSeries(tss)
tssPool.Put(v)
}
if rctx != nil {
putRelabelCtx(rctx)
}
if mctx != nil {
mdx.PutContext(mctx)
}
*v = prompb.ResetTimeSeries(tss)
tssPool.Put(v)
putRelabelCtx(rctx)
}()
copyTimeSeriesIfNeeded := func() {
if v == nil {
v := tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
}
}
if rwctx.mdxFilter != nil {
mctx = mdx.GetContext()
// Make a copy of tss before applying relabeling in order to prevent
// from affecting time series for other remoteWrite.mdx configs.
copyTimeSeriesIfNeeded()
tss = rwctx.mdxFilter.Filter(mctx, tss)
if len(tss) == 0 {
return true
}
rowsCount := getRowsCount(tss)
rwctx.mdxRowsPreserved.Add(rowsCount)
}
// Apply relabeling
rcs := allRelabelConfigs.Load()
pcs := rcs.perURL[rwctx.idx]
@@ -1043,8 +1083,7 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
// from affecting time series for other remoteWrite.url configs.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
copyTimeSeriesIfNeeded()
rowsCountBeforeRelabel := getRowsCount(tss)
tss = rctx.applyRelabeling(tss, pcs)
rowsCountAfterRelabel := getRowsCount(tss)
@@ -1062,8 +1101,7 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
if rctx == nil {
rctx = getRelabelCtx()
// Make a copy of tss before dropping aggregated series
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
copyTimeSeriesIfNeeded()
}
tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput)
} else if rwctx.streamAggrDropInput {
@@ -1071,8 +1109,7 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
if rctx == nil {
rctx = getRelabelCtx()
// Make a copy of tss before dropping aggregated series
v = tssPool.Get().(*[]prompb.TimeSeries)
tss = append(*v, tss...)
copyTimeSeriesIfNeeded()
}
tss = dropUnaggregatedSeries(tss, matchIdxs.B)
}
@@ -1191,15 +1228,6 @@ func getRowsCount(tss []prompb.TimeSeries) int {
}
return rowsCount
}
func newMapFromStrings(a []string) map[string]struct{} {
m := make(map[string]struct{}, len(a))
for _, s := range a {
m[s] = struct{}{}
}
return m
}
func getMaxHourlySeries() int {
limit := *maxHourlySeries
if limit == -1 || limit > math.MaxInt32 {

View File

@@ -145,10 +145,10 @@ func TestRuleValidate(t *testing.T) {
}
func TestGroupValidate_Failure(t *testing.T) {
f := func(group *Group, validateExpressions bool, errStrExpected string) {
f := func(data []byte, validateExpressions bool, errStrExpected string) {
t.Helper()
err := group.Validate(nil, validateExpressions)
_, err := parse(map[string][]byte{"test.yaml": data}, nil, validateExpressions)
if err == nil {
t.Fatalf("expecting non-nil error")
}
@@ -158,275 +158,238 @@ func TestGroupValidate_Failure(t *testing.T) {
}
}
f(&Group{}, false, "group name must be set")
f([]byte(`
groups:
- name: ""
`), false, "group name must be set")
f(&Group{
Name: "both record and alert are not set",
Rules: []Rule{
{
Expr: "sum(up == 0 ) by (host)",
For: promutil.NewDuration(10 * time.Millisecond),
},
{
Expr: "sumSeries(time('foo.bar',10))",
},
},
}, false, "invalid rule")
f([]byte(`
groups:
- name: both record and alert are not set
rules:
- expr: "sum(up == 0 ) by (host)"
for: 10ms
- expr: "sumSeries(time('foo.bar',10))"
`), false, "invalid rule")
f(&Group{
Name: "negative interval",
Interval: promutil.NewDuration(-1),
}, false, "interval shouldn't be lower than 0")
f([]byte(`
groups:
- name: negative interval
interval: -1ms
`), false, "interval shouldn't be lower than 0")
f(&Group{
Name: "too big eval_offset",
Interval: promutil.NewDuration(time.Minute),
EvalOffset: promutil.NewDuration(2 * time.Minute),
}, false, "eval_offset should be smaller than interval")
f([]byte(`
groups:
- name: too big eval_offset
interval: 1m
eval_offset: 2m
`), false, "eval_offset should be smaller than interval")
f(&Group{
Name: "too big negative eval_offset",
Interval: promutil.NewDuration(time.Minute),
EvalOffset: promutil.NewDuration(-2 * time.Minute),
}, false, "eval_offset should be smaller than interval")
f([]byte(`
groups:
- name: too big negative eval_offset
interval: 1m
eval_offset: -2m
`), false, "eval_offset should be smaller than interval")
limit := -1
f(&Group{
Name: "wrong limit",
Limit: &limit,
}, false, "invalid limit")
f([]byte(`
groups:
- name: wrong limit
limit: -1
`), false, "invalid limit")
f(&Group{
Name: "wrong concurrency",
Concurrency: -1,
}, false, "invalid concurrency")
f([]byte(`
groups:
- name: wrong concurrency
concurrency: -1
`), false, "invalid concurrency")
f(&Group{
Name: "test",
Rules: []Rule{
{
Alert: "alert",
Expr: "up == 1",
},
{
Alert: "alert",
Expr: "up == 1",
},
},
}, false, "duplicate")
f([]byte(`
groups:
- name: test
rules:
- alert: alert
expr: up == 1
- alert: alert
expr: up == 1
`), false, "duplicate")
f(&Group{
Name: "test",
Rules: []Rule{
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
"summary": "{{ value|query }}",
}},
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
"summary": "{{ value|query }}",
}},
},
}, false, "duplicate")
f([]byte(`
groups:
- name: test
rules:
- alert: alert
expr: up == 1
labels:
summary: "{{ value|query }}"
- alert: alert
expr: up == 1
labels:
summary: "{{ value|query }}"
`), false, "duplicate")
f(&Group{
Name: "test",
Rules: []Rule{
{Record: "record", Expr: "up == 1", Labels: map[string]string{
"summary": "{{ value|query }}",
}},
{Record: "record", Expr: "up == 1", Labels: map[string]string{
"summary": "{{ value|query }}",
}},
},
}, false, "duplicate")
f([]byte(`
groups:
- name: test
rules:
- record: record
expr: up == 1
labels:
summary: "{{ value|query }}"
- record: record
expr: up == 1
labels:
summary: "{{ value|query }}"
`), false, "duplicate")
f(&Group{
Name: "test",
Rules: []Rule{
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
"summary": "{{ value|query }}",
}},
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
"description": "{{ value|query }}",
}},
},
}, false, "duplicate")
f(&Group{
Name: "test",
Rules: []Rule{
{Record: "alert", Expr: "up == 1", Labels: map[string]string{
"summary": "{{ value|query }}",
}},
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
"summary": "{{ value|query }}",
}},
},
}, false, "duplicate")
f(&Group{
Name: "test thanos",
Type: NewRawType("thanos"),
Rules: []Rule{
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
"description": "{{ value|query }}",
}},
},
}, true, "unknown datasource type")
f([]byte(`
groups:
- name: test thanos
type: thanos
rules:
- alert: alert
expr: up == 1
labels:
description: "{{ value|query }}"
`), true, "unknown datasource type")
// validate expressions
f(&Group{
Name: "test prometheus expr",
Type: NewPrometheusType(),
Rules: []Rule{
{
Record: "record",
Expr: "up | 0",
},
},
}, true, "bad MetricsQL expr")
f([]byte(`
groups:
- name: test prometheus expr
type: prometheus
rules:
- record: record
expr: "up | 0"
`), true, "bad MetricsQL expr")
f(&Group{
Name: "test graphite expr",
Type: NewGraphiteType(),
Rules: []Rule{
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
"description": "some-description",
}},
},
}, true, "bad GraphiteQL expr")
f([]byte(`
groups:
- name: test graphite expr
type: graphite
rules:
- alert: alert
expr: up == 1
labels:
description: some-description
`), true, "bad GraphiteQL expr")
f(&Group{
Name: "test vlogs expr",
Type: NewVLogsType(),
Rules: []Rule{
{Alert: "alert", Expr: "stats count(*) as requests"},
},
}, true, "bad LogsQL expr")
f([]byte(`
groups:
- name: test vlogs expr
type: vlogs
rules:
- alert: alert
expr: "stats count(*) as requests"
`), true, "bad LogsQL expr")
f(&Group{
Name: "test vlogs expr",
Type: NewVLogsType(),
Rules: []Rule{
{Alert: "alert", Expr: "_time: 1m | stats by (path, _time: 1m) count(*) as requests"},
},
}, true, "bad LogsQL expr")
f([]byte(`
groups:
- name: test vlogs expr multipart
type: vlogs
rules:
- alert: alert
expr: "_time: 1m | stats by (path, _time: 1m) count(*) as requests"
`), true, "bad LogsQL expr")
f(&Group{
Name: "test graphite with prometheus expr",
Type: NewGraphiteType(),
Rules: []Rule{
{
Record: "r1",
ID: 1,
Expr: "sumSeries(time('foo.bar',10))",
For: promutil.NewDuration(10 * time.Millisecond),
},
{
Record: "r2",
ID: 2,
Expr: "sum(up == 0 ) by (host)",
},
},
}, true, "bad GraphiteQL expr")
f([]byte(`
groups:
- name: test graphite with prometheus expr
type: graphite
rules:
- record: r1
expr: "sumSeries(time('foo.bar',10))"
for: 10ms
- record: r2
expr: "sum(up == 0 ) by (host)"
`), true, "bad GraphiteQL expr")
f(&Group{
Name: "test vlogs with prometheus exp",
Type: NewVLogsType(),
Rules: []Rule{
{
Record: "r1",
Expr: "sum(up == 0 ) by (host)",
For: promutil.NewDuration(10 * time.Millisecond),
},
},
}, true, "bad LogsQL expr")
f([]byte(`
groups:
- name: test vlogs with prometheus expr
type: vlogs
rules:
- record: r1
expr: "sum(up == 0 ) by (host)"
for: 10ms
`), true, "bad LogsQL expr")
f(&Group{
Name: "test prometheus with vlogs exp",
Type: NewPrometheusType(),
Rules: []Rule{
{
Record: "r1",
Expr: "* | stats by (path) count()",
For: promutil.NewDuration(10 * time.Millisecond),
},
},
}, true, "bad MetricsQL expr")
f([]byte(`
groups:
- name: test prometheus with vlogs expr
type: prometheus
rules:
- record: r1
expr: "* | stats by (path) count()"
for: 10ms
`), true, "bad MetricsQL expr")
}
func TestGroupValidate_Success(t *testing.T) {
f := func(group *Group, validateAnnotations, validateExpressions bool) {
f := func(data []byte, validateAnnotations, validateExpressions bool) {
t.Helper()
var validateTplFn ValidateTplFn
if validateAnnotations {
validateTplFn = notifier.ValidateTemplates
}
err := group.Validate(validateTplFn, validateExpressions)
_, err := parse(map[string][]byte{"test.yaml": data}, validateTplFn, validateExpressions)
if err != nil {
t.Fatalf("unexpected error: %s", err)
}
}
f(&Group{
Name: "test",
Rules: []Rule{
{
Record: "record",
Expr: "up | 0",
},
},
}, false, false)
f([]byte(`
groups:
- name: test
rules:
- record: record
expr: "up | 0"
`), false, false)
f(&Group{
Name: "test",
Rules: []Rule{
{
Alert: "alert",
Expr: "up == 1",
Labels: map[string]string{
"summary": "{{ value|query }}",
},
},
},
}, false, false)
f([]byte(`
groups:
- name: test
rules:
- alert: alert
expr: up == 1
labels:
summary: "{{ value|query }}"
`), false, false)
// validate annotations
f(&Group{
Name: "test",
Rules: []Rule{
{
Alert: "alert",
Expr: "up == 1",
Labels: map[string]string{
"summary": `
{{ with printf "node_memory_MemTotal{job='node',instance='%s'}" "localhost" | query }}
{{ . | first | value | humanize1024 }}B
{{ end }}`,
},
},
},
}, true, false)
f([]byte(`
groups:
- name: test
rules:
- alert: alert
expr: up == 1
labels:
summary: "\n{{ with printf \"node_memory_MemTotal{job='node',instance='%s'}\" \"localhost\" | query }}\n {{ . | first | value | humanize1024 }}B\n{{ end }}"
`), true, false)
// validate expressions
f(&Group{
Name: "test prometheus",
Type: NewPrometheusType(),
Rules: []Rule{
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
"description": "{{ value|query }}",
}},
},
}, false, true)
f(&Group{
Name: "test victorialogs",
Type: NewVLogsType(),
Rules: []Rule{
{Alert: "alert", Expr: " _time: 1m | stats count(*) as requests", Labels: map[string]string{
"description": "{{ value|query }}",
}},
},
}, false, true)
f([]byte(`
groups:
- name: test prometheus
type: prometheus
rules:
- alert: alert
expr: up == 1
labels:
description: "{{ value|query }}"
`), false, true)
f([]byte(`
groups:
- name: test victorialogs
type: vlogs
rules:
- alert: alert
expr: " _time: 1m | stats count(*) as requests"
labels:
description: "{{ value|query }}"
`), false, true)
}
func TestHashRule_NotEqual(t *testing.T) {

View File

@@ -140,6 +140,18 @@ users:
- "ProjectID: {{.MetricsProjectID}}"
url_prefix: "http://vminsert:8480/insert/prometheus"
# JWT-based routing that relies solely on custom claims.
# The `vm_access` claim is missing, default value will be used.
# e.g. {"role": "admin"}.
- name: jwt-custom-claims
jwt:
skip_verify: true
vm_default_access_claim:
metrics_account_id: 1
match_claims:
role: admin
url_prefix: "http://vmselect-admin:8481/select/0/prometheus"
# Requests without Authorization header are proxied according to `unauthorized_user` section.
# Requests are proxied in round-robin fashion between `url_prefix` backends.
# The deny_partial_response query arg is added to all the proxied requests.

View File

@@ -65,6 +65,8 @@ type JWTConfig struct {
MatchClaims map[string]string `yaml:"match_claims,omitempty"`
parsedMatchClaims []*jwt.Claim
DefaultVMAccessClaim *jwt.VMAccessClaim `yaml:"default_vm_access_claim,omitempty"`
// verifierPool is used to verify JWT tokens.
// It is initialized from PublicKeys and/or PublicKeyFiles.
// In this case, it is initialized once at config reload and never updated until next reload
@@ -432,7 +434,6 @@ func validateJWTPlaceholdersForURL(up *URLPrefix, isAllowed bool) error {
}
if strings.Contains(p, placeholderPrefix) {
return fmt.Errorf("invalid placeholder found in URL request path: %q, supported values are: %s", bu.Path, strings.Join(allPlaceholders, ", "))
}
}
for param, values := range bu.Query() {
@@ -487,7 +488,6 @@ func hasAnyPlaceholders(u *url.URL) bool {
return true
}
}
}
return false
}

View File

@@ -190,6 +190,10 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
if tkn == nil {
logger.Panicf("BUG: unexpected nil jwt token for user %q", ui.name())
}
if !tkn.HasVMAccessClaim() && ui.JWT.DefaultVMAccessClaim == nil {
http.Error(w, "Unauthorized", http.StatusUnauthorized)
return true
}
defer putToken(tkn)
processUserRequest(w, r, ui, tkn)
return true
@@ -424,8 +428,12 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo, tkn *j
}
targetURL := bu.url
if tkn != nil {
vmac := tkn.VMAccess()
if !tkn.HasVMAccessClaim() {
vmac = ui.JWT.DefaultVMAccessClaim
}
// for security reasons allow templating only for configured url values and headers
targetURL, hc = replaceJWTPlaceholders(bu, hc, tkn.VMAccess())
targetURL, hc = replaceJWTPlaceholders(bu, hc, vmac)
}
if isDefault {
// Don't change path and add request_path query param for default route.

View File

@@ -739,6 +739,12 @@ users:
"vm_access": map[string]any{},
}, false)
// token without vm_access claim, but with a custom claim usable for routing
roleToken := genToken(t, map[string]any{
"exp": time.Now().Add(10 * time.Minute).Unix(),
"role": "admin",
}, true)
fullToken := genToken(t, map[string]any{
"exp": time.Now().Add(10 * time.Minute).Unix(),
"vm_access": map[string]any{
@@ -779,6 +785,26 @@ statusCode=401
Unauthorized`
f(simpleCfgStr, request, responseExpected)
// token without vm_access claim is accepted when it
request = httptest.NewRequest(`GET`, "http://some-host.com/abc", nil)
request.Header.Set(`Authorization`, `Bearer `+roleToken)
responseExpected = `
statusCode=200
path: /foo/abc
query:
headers:`
f(fmt.Sprintf(`
users:
- jwt:
public_keys:
- %q
default_vm_access_claim:
metrics_account_id: 10
metrics_project_id: 10
match_claims:
role: admin
url_prefix: {BACKEND}/foo`, string(publicKeyPEM)), request, responseExpected)
// expired token
request = httptest.NewRequest(`GET`, "http://some-host.com/abc", nil)
request.Header.Set(`Authorization`, `Bearer `+expiredToken)

View File

@@ -956,6 +956,7 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
start, end, step int64, r *http.Request, ct int64, etfs [][]storage.TagFilter) error {
deadline := searchutil.GetDeadlineForQuery(r, startTime)
mayCache := !httputil.GetBool(r, "nocache")
optimizeRepeatedBinaryOpSubexprs := httputil.GetBool(r, "optimize_repeated_binary_op_subexprs")
lookbackDelta, err := getMaxLookback(r)
if err != nil {
return err
@@ -977,18 +978,19 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
}
ec := &promql.EvalConfig{
Start: start,
End: end,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: 0, // let vmstorage use maxUniqueTimeseries by default
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: mayCache,
LookbackDelta: lookbackDelta,
RoundDigits: getRoundDigits(r),
EnforcedTagFilterss: etfs,
CacheTagFilters: etfs,
Start: start,
End: end,
Step: step,
MaxPointsPerSeries: *maxPointsPerTimeseries,
MaxSeries: 0, // let vmstorage use maxUniqueTimeseries by default
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
Deadline: deadline,
MayCache: mayCache,
OptimizeRepeatedBinaryOpSubexprs: optimizeRepeatedBinaryOpSubexprs,
LookbackDelta: lookbackDelta,
RoundDigits: getRoundDigits(r),
EnforcedTagFilterss: etfs,
CacheTagFilters: etfs,
GetRequestURI: func() string {
return httpserver.GetRequestURI(r)
},

View File

@@ -132,6 +132,9 @@ type EvalConfig struct {
// Whether the response can be cached.
MayCache bool
// Whether repeated cacheable binary op subexpressions can be optimized.
OptimizeRepeatedBinaryOpSubexprs bool
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
LookbackDelta int64
@@ -171,6 +174,7 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig {
ec.MaxPointsPerSeries = src.MaxPointsPerSeries
ec.Deadline = src.Deadline
ec.MayCache = src.MayCache
ec.OptimizeRepeatedBinaryOpSubexprs = src.OptimizeRepeatedBinaryOpSubexprs
ec.LookbackDelta = src.LookbackDelta
ec.RoundDigits = src.RoundDigits
ec.EnforcedTagFilterss = src.EnforcedTagFilterss
@@ -467,7 +471,8 @@ func isAggrFuncWithoutGrouping(e metricsql.Expr) bool {
}
func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
if !canPushdownCommonFilters(be) {
canPushdown := canPushdownCommonFilters(be)
if !canPushdown && !shouldOptimizeRepeatedBinaryOpSubexprs(ec, exprFirst, exprSecond) {
// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
// from exprFirst to exprSecond.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
@@ -500,6 +505,25 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
}
return tssFirst, tssSecond, nil
}
if !canPushdown {
qt = qt.NewChild("execute left and right sides of %q sequentially because repeated cacheable subexpression was found", be.Op)
defer qt.Done()
qtFirst := qt.NewChild("expr1")
tssFirst, err := evalExpr(qtFirst, ec, exprFirst)
qtFirst.Done()
if err != nil {
return nil, nil, err
}
qtSecond := qt.NewChild("expr2")
tssSecond, err := evalExpr(qtSecond, ec, exprSecond)
qtSecond.Done()
if err != nil {
return nil, nil, err
}
return tssFirst, tssSecond, nil
}
// Execute binary operation in the following way:
//
@@ -544,6 +568,78 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
return tssFirst, tssSecond, nil
}
func shouldOptimizeRepeatedBinaryOpSubexprs(ec *EvalConfig, exprFirst, exprSecond metricsql.Expr) bool {
if !ec.OptimizeRepeatedBinaryOpSubexprs {
return false
}
if ec.Start == ec.End {
return false
}
if !ec.mayCache() {
return false
}
candidatesFirst := make(map[string]struct{}, 1)
var b []byte
visitOptimizedAggrs(exprFirst, func(ae *metricsql.AggrFuncExpr) {
if hasUnseededVolatileFunc(ae) {
return
}
b = ae.AppendString(b[:0])
candidatesFirst[string(b)] = struct{}{}
})
if len(candidatesFirst) == 0 {
return false
}
repeated := false
visitOptimizedAggrs(exprSecond, func(ae *metricsql.AggrFuncExpr) {
if repeated {
return
}
b = ae.AppendString(b[:0])
_, repeated = candidatesFirst[string(b)]
})
return repeated
}
func visitOptimizedAggrs(e metricsql.Expr, f func(ae *metricsql.AggrFuncExpr)) {
metricsql.VisitAll(e, func(expr metricsql.Expr) {
ae, ok := expr.(*metricsql.AggrFuncExpr)
if !ok {
return
}
if getIncrementalAggrFuncCallbacks(ae.Name) == nil {
return
}
fe, _ := tryGetArgRollupFuncWithMetricExpr(ae)
if fe == nil {
return
}
f(ae)
})
}
func hasUnseededVolatileFunc(e metricsql.Expr) bool {
found := false
metricsql.VisitAll(e, func(expr metricsql.Expr) {
if found {
return
}
fe, ok := expr.(*metricsql.FuncExpr)
if !ok {
return
}
switch strings.ToLower(fe.Name) {
case "now":
found = true
case "rand", "rand_normal", "rand_exponential":
found = len(fe.Args) == 0
}
})
return found
}
func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
if len(tss) == 0 {
return nil

View File

@@ -170,3 +170,87 @@ func TestGetSumInstantValues(t *testing.T) {
[]*timeseries{ts("foo", 100, 1)},
)
}
func TestShouldOptimizeRepeatedBinaryOpSubexprsGate(t *testing.T) {
e, err := metricsql.Parse(`count(count(vm_requests_total) by (action,addr,cluster,endpoint)) by (action,addr,cluster) / count(count(vm_requests_total) by (action,addr,cluster,endpoint))`)
if err != nil {
t.Fatalf("unexpected error in metricsql.Parse(): %s", err)
}
be, ok := e.(*metricsql.BinaryOpExpr)
if !ok {
t.Fatalf("unexpected expr type; got %T; want *metricsql.BinaryOpExpr", e)
}
f := func(name string, ec *EvalConfig, resultExpected bool) {
t.Helper()
result := shouldOptimizeRepeatedBinaryOpSubexprs(ec, be.Left, be.Right)
if result != resultExpected {
t.Fatalf("unexpected result for %q; got %v; want %v", name, result, resultExpected)
}
}
f("disabled optimization", &EvalConfig{
Start: 1000,
End: 2000,
Step: 1000,
}, false)
f("disabled cache", &EvalConfig{
Start: 1000,
End: 2000,
Step: 1000,
OptimizeRepeatedBinaryOpSubexprs: true,
}, false)
f("instant query", &EvalConfig{
Start: 1000,
End: 1000,
Step: 1000,
MayCache: true,
OptimizeRepeatedBinaryOpSubexprs: true,
}, false)
f("repeated cacheable aggregate subexpression", &EvalConfig{
Start: 1000,
End: 2000,
Step: 1000,
MayCache: true,
OptimizeRepeatedBinaryOpSubexprs: true,
}, true)
f("unaligned range query", &EvalConfig{
Start: 1001,
End: 2000,
Step: 1000,
MayCache: true,
OptimizeRepeatedBinaryOpSubexprs: true,
}, false)
}
func TestShouldOptimizeRepeatedBinaryOpSubexprsExpressions(t *testing.T) {
f := func(name, q string, resultExpected bool) {
t.Helper()
e, err := metricsql.Parse(q)
if err != nil {
t.Fatalf("unexpected error in metricsql.Parse(%q) for %q: %s", q, name, err)
}
be, ok := e.(*metricsql.BinaryOpExpr)
if !ok {
t.Fatalf("unexpected expr type for %q; got %T; want *metricsql.BinaryOpExpr", name, e)
}
ec := &EvalConfig{Start: 1000, End: 2000, Step: 1000, MayCache: true, OptimizeRepeatedBinaryOpSubexprs: true}
result := shouldOptimizeRepeatedBinaryOpSubexprs(ec, be.Left, be.Right)
if result != resultExpected {
t.Fatalf("unexpected result for %q; got %v; want %v; query: %q", name, result, resultExpected, q)
}
}
f("original issue query", `count(count(vm_requests_total) by (action,addr,cluster,endpoint)) by (action,addr,cluster) / count(count(vm_requests_total) by (action,addr,cluster,endpoint))`, true)
f("right side contains repeated count aggregate", `count(foo) by (job) / (count(foo) by (job) + 1)`, true)
f("same sum aggregate", `sum(rate(foo[5m])) by (job) / sum(rate(foo[5m])) by (job)`, true)
f("same inner rollup but different aggregates", `sum(rate(foo[5m])) by (job) / count(rate(foo[5m])) by (job)`, false)
f("different count aggregates", `count(foo) by (job) / count(bar) by (job)`, false)
f("bare metric selector", `foo / foo`, false)
f("bare rollup function", `rate(a[5m]) / rate(a[5m])`, false)
f("now at modifier", `sum(rate(foo[5m] @ now())) by (job) / sum(rate(foo[5m] @ now())) by (job)`, false)
f("unseeded rand at modifier", `sum(rate(foo[5m] @ rand())) by (job) / sum(rate(foo[5m] @ rand())) by (job)`, false)
f("unseeded rand_normal at modifier", `sum(rate(foo[5m] @ rand_normal())) by (job) / sum(rate(foo[5m] @ rand_normal())) by (job)`, false)
f("unseeded rand_exponential at modifier", `sum(rate(foo[5m] @ rand_exponential())) by (job) / sum(rate(foo[5m] @ rand_exponential())) by (job)`, false)
f("seeded rand at modifier", `sum(rate(foo[5m] @ rand(1))) by (job) / sum(rate(foo[5m] @ rand(1))) by (job)`, true)
}

View File

@@ -6,7 +6,7 @@ COPY web/ /build/
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o web-amd64 github.com/VictoriMetrics/vmui/ && \
GOOS=windows GOARCH=amd64 CGO_ENABLED=0 go build -o web-windows github.com/VictoriMetrics/vmui/
FROM alpine:3.23.4
FROM alpine:3.24.1
USER root
COPY --from=build-web-stage /build/web-amd64 /app/web

View File

@@ -3,9 +3,9 @@
DOCKER_REGISTRIES ?= docker.io quay.io
DOCKER_NAMESPACE ?= victoriametrics
ROOT_IMAGE ?= alpine:3.23.4
ROOT_IMAGE ?= alpine:3.24.1
ROOT_IMAGE_SCRATCH ?= scratch
CERTS_IMAGE := alpine:3.23.4
CERTS_IMAGE := alpine:3.24.1
GO_BUILDER_IMAGE := golang:1.26.4

View File

@@ -245,7 +245,7 @@ The following steps must be performed during the upgrade / downgrade procedure:
* Wait until the process stops. This can take a few seconds.
* Start the upgraded VictoriaMetrics.
Prometheus doesn't drop data during VictoriaMetrics restart. See [this article](https://grafana.com/blog/2019/03/25/whats-new-in-prometheus-2.8-wal-based-remote-write/) for details. The same applies also to [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/).
Prometheus doesn't drop data during VictoriaMetrics restart. See [this article](https://grafana.com/blog/whats-new-in-prometheus-2-8-wal-based-remote-write/) for details. The same applies also to [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/).
> If you'd prefer not to manage upgrades yourself, [VictoriaMetrics Cloud](https://console.victoriametrics.cloud/signUp?utm_source=website&utm_campaign=docs_vm_single_upgrade)
> performs version upgrades automatically during maintenance windows with no action required on your part.
@@ -417,7 +417,7 @@ VictoriaMetrics is configured via command-line flags, so it must be restarted wh
* Wait until the process stops. This can take a few seconds.
* Start VictoriaMetrics with the new command-line flags.
Prometheus doesn't drop data during VictoriaMetrics restart. See [this article](https://grafana.com/blog/2019/03/25/whats-new-in-prometheus-2.8-wal-based-remote-write/) for details. The same applies also to [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/).
Prometheus doesn't drop data during VictoriaMetrics restart. See [this article](https://grafana.com/blog/whats-new-in-prometheus-2-8-wal-based-remote-write/) for details. The same applies also to [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/).
## How to scrape Prometheus exporters such as [node-exporter](https://github.com/prometheus/node_exporter)
@@ -478,6 +478,11 @@ and [/api/v1/query_range](https://docs.victoriametrics.com/victoriametrics/keyco
to the given number of digits after the decimal point.
For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point.
VictoriaMetrics accepts `optimize_repeated_binary_op_subexprs=1` query arg for [/api/v1/query_range](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#range-query)
handler. It allows `vmselect` to execute left and right sides of binary operators sequentially when they contain the same
optimized aggregate rollup result expression, so the second side may reuse the rollup result cache populated by the first side.
The optimization is disabled by default and applies only when rollup result cache can be used for the request.
VictoriaMetrics accepts `limit` query arg for [/api/v1/labels](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labels)
and [`/api/v1/label/<labelName>/values`](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labelvalues) handlers for limiting the number of returned entries.
For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels.

View File

@@ -26,16 +26,22 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* SECURITY: upgrade base docker image (Alpine) from 3.23.4 to 3.24.1. See [Alpine 3.24.1 release notes](https://www.alpinelinux.org/posts/Alpine-3.24.1-released.html).
* BUGFIX: all VictoriaMetrics components: cancel in-flight HTTP requests shortly before `-http.maxGracefulShutdownDuration` elapses during graceful shutdown, so they can drain and the shutdown completes cleanly within that window instead of timing out and exiting via `logger.Fatalf` -> `os.Exit`. This prevents skipping the storage flush and losing in-memory data when long-lived requests are in flight (such as VictoriaLogs live tailing). See [#1502](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1502).
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fixes unexpected rare rerouting. See [#11162](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11162).
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): propagate cache reset operation to `selectNode` when `/internal/resetRollupResultCache` is called. Previously, the propagation only happened when the `delete_series` API was called. See [#11112](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11112).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix possible unexpected increases in `rate_avg` and `rate_sum` if an out-of-order sample is ingested after the previous flush. See [#11140](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11140).
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): add `default_vm_access_claim` field into `jwt` section of auth config. It could be used at [JWT claim placeholders](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-based-request-templating), if `JWT` token doesn't have `vm_access` claim. See [#11054](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11054).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): reduces CPU usage by 10% at [sharding among remote storages](https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages). See [#11113](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11113). Thanks to @bennf for contribution.
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `optimize_repeated_binary_op_subexprs=1` query arg to [/api/v1/query_range](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#range-query) for executing binary operator sides sequentially when they share the same optimized aggregate rollup result expression. This allows the second side to reuse rollup result cache populated by the first side. See [#10575](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10575).
## [v1.146.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.146.0)
Released at 2026-06-22
* FEATURE: all VictoriaMetrics components: add `-http.header.disableServerHostname` command-line flag for disabling the `X-Server-Hostname` HTTP response header. See [#11067](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11067). Thanks to @zasdaym for contribution.
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): expose `vm_streamaggr_dedup_dropped_samples_total` to allow tracking dropped old samples during [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): use the aggregation rule interval as the default [staleness_interval](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#staleness) instead of `2*interval`, to reduce spikes when there are gaps between received samples. See [#11102](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11102).
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): add new aggregation output [sum_samples_total](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#sum_samples_total) for summing input delta values into a cumulative counter. See issues [#11002](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11002) and [#4843](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4843).
@@ -44,6 +50,7 @@ Released at 2026-06-22
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add support for [Monitoring Data eXchange (MDX)](https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange): the ability to route only metrics from VictoriaMetrics services to a specific `-remoteWrite.url`. MDX is useful for building monitoring-of-monitoring where one remote storage should receive the full metric stream and another should receive only VictoriaMetrics metrics. Enable per destination with `-remoteWrite.mdx.enable=true`. See [#10600](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600).
* BUGFIX: [enterprise](https://docs.victoriametrics.com/enterprise/) [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly expose metric `vm_retention_filters_partitions_scheduled_rows`. See [#11138](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11138)
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
@@ -58,6 +65,7 @@ Released at 2026-06-22
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly escape `metricFamilyName` at metrics metadata response. See [#11129](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11129). Thanks for @fxrlv for the contribution.
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent more cases of panic during directory deletion on `NFS`-based mounts. See [#11060](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11060).
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
Released at 2026-06-08

View File

@@ -268,6 +268,39 @@ for the collected samples. Examples:
```sh
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -streamAggr.dedupInterval=60s
```
### Monitoring Data eXchange
The Monitoring Data eXchange (MDX){{% available_from "#" %}} feature allows `vmagent` to forward only VictoriaMetrics metrics to selected `-remoteWrite.url` destinations while dropping metrics from non-VictoriaMetrics services.
To enable MDX, set `-remoteWrite.mdx.enable=true` for the target URL and `-remoteWrite.mdx.enable=false` for other URLs:
```sh
./vmagent \
-remoteWrite.url=http://service-to-keep-all-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=false \
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=true
```
When MDX is enabled for a `-remoteWrite.url`, `vmagent` forwards only metrics that:
- come from the target that exposes the `vm_app_version` metric (emitted by all VictoriaMetrics components)
- contain the `victoriametrics_app=true` label, which will be added automatically to the metrics if the instance was deployed via [VictoriaMetrics Operator](https://docs.victoriametrics.com/operator/).
`victoriametrics_app=true` label will be added to all metrics that are preserved by MDX if it's absent.
- contain the label specified via `-mdx.label`.
```sh
./vmagent \
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
-remoteWrite.mdx.enable=true \
-mdx.label="service=victoriametrics"
```
In this configuration, metrics with the label `service=victoriametrics` are preserved even if their scrape targets do not expose `vm_app_version` metric.
The number of VictoriaMetrics metrics preserved by MDX is exposed as `vmagent_remotewrite_mdx_rows_preserved_total`.
The scope of MDX is at the per-url level, so it works after global level mechanisms, such as stream aggregation, relabeling, complexity limiter, and cardinality limiter. See [Life of a sample](https://docs.victoriametrics.com/victoriametrics/vmagent/#life-of-a-sample).
### Life of a sample
@@ -285,18 +318,20 @@ flowchart TB
F --> G[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#replication-and-high-availability">replicate</a> to each <b>-remoteWrite.url</b><br/>or <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages">shard</a> if <b>-remoteWrite.shardByURL</b> is set]
%% Left branch
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange/">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
H2 --> H3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
H3 --> H4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
%% Right branch
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
R2 --> R3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
R3 --> R4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
```
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:

View File

@@ -270,7 +270,7 @@ users:
url_prefix: "http://victoria-metrics:8428/"
```
JWT tokens must contain a `"vm_access": {}` claim, more on that in [JWT claim-based request templating](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-based-request-templating)
The `vm_access` claim is optional starting from {{% available_from "#" %}}: when present it is used for [request templating](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-based-request-templating), and when absent the default tenant `0:0` is assumed for any `vm_access`-based placeholders. Routing can rely solely on other token claims via [JWT claim matching](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-matching).
For testing, skip signature verification with `skip_verify: true` (not recommended for production).
@@ -520,7 +520,8 @@ for dynamic URL rewriting based on `vm_access` claim fields.
`vmauth` can dynamically rewrite{{% available_from "v1.137.0" %}} upstream URLs and request headers using values from the JWT `vm_access` claim.
This enables routing different users to different backends or tenants based solely on the JWT token,
without maintaining separate user configs per tenant.
without maintaining separate user configs per tenant. In addition `vm_access` claim could be defined at `jwt` section with `default_vm_access_claim` {{% available_from "#" %}}.
In this case, if JWT token doesn't have `vm_access` claim defined, value from `default_vm_access_claim` will be used for templaing.
Example: minimal valid JWT. If vm_access is empty, tenant `0:0` is assumed and no additional filters are applied.
```json
@@ -575,6 +576,28 @@ Placeholders are supported in the following locations:
Placeholders are **not** supported in response headers.
They are also only valid for JWT-authenticated users — using them in configs for `username`/`password` or `bearer_token` users causes a configuration error.
Example: default `vm_access` claim:
```yaml
users:
- jwt:
default_vm_access_claim:
metrics_account_id: 10
metrics_project_id: 10
metrics_extra_filters:
- '{instance="sandbox"}'
metrics_extra_labels:
- team=dev
- env=dev
public_keys:
- |
-----BEGIN PUBLIC KEY-----
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA...
-----END PUBLIC KEY-----
url_prefix: "http://vminsert:8480/insert/{{.MetricsAccountID}}:{{.MetricsProjectID}}/prometheus/?extra_filters={{.MetricsExtraFilters}}&extra_label={{.MetricsExtraLabels}}"
```
Example: route requests to the VictoriaMetrics single-node:
```yaml

View File

@@ -16,7 +16,7 @@ aliases:
`vmestimator` measures metrics cardinality across arbitrary label dimensions and exposes the results as metrics.
## Why measure ?
## Why measure?
Consider a setup where metrics are scraped from dozens of Prometheus targets.
One day, a team deploys a new version of their service with a `trace_id` or `user_id` label.
@@ -75,7 +75,7 @@ The resulting topology looks like this:
## Install
Create a `streams.yaml` from [example config](https://github.com/VictoriaMetrics/cestimator/blob/main/streams.yaml).
Create a `streams.yaml` from [example config](https://github.com/VictoriaMetrics/vmestimator/blob/main/streams.yaml).
Run the Docker image from [Docker Hub](https://hub.docker.com/r/victoriametrics/vmestimator) or [Quay](https://quay.io/repository/victoriametrics/vmestimator), mounting your config file:
```bash
docker run --rm \
@@ -92,7 +92,7 @@ To build from sources, see [How to build from sources](https://github.com/Victor
## Configuration
To run vmestimator a `streams.yaml` config has to be provided (see [example config](https://github.com/VictoriaMetrics/cestimator/blob/main/streams.yaml):
To run vmestimator a `streams.yaml` config has to be provided (see [example config](https://github.com/VictoriaMetrics/vmestimator/blob/main/streams.yaml)):
```bash
/path/to/vmestimator -config=streams.yaml # -httpListenAddr=:8490
@@ -153,7 +153,7 @@ streams:
# Whether to use the sparse HyperLogLog representation for low-cardinality groups.
# Sparse mode uses far less memory until a group's cardinality reaches ~2^(p-1),
# at which point it automatically promotes to the dense representation.
# See more in # See more in https://research.google.com/pubs/archive/40671.pdf
# See more in https://research.google.com/pubs/archive/40671.pdf
#
# default: true
hll_sparse: 'boolean'
@@ -230,7 +230,7 @@ Per tenant cardinality:
group_by: ['vm_account_id', 'vm_project_id']
```
### Churn rate calculation
### Churn calculation
[Churn rate](https://valyala.medium.com/prometheus-storage-technical-terms-for-humans-4ab4de6c3d48#churn-rate) measures how quickly time series are created and disappear.
[High churn](https://docs.victoriametrics.com/victoriametrics/faq/#what-is-high-churn-rate) means many series appear briefly and are replaced by new ones.
@@ -273,6 +273,40 @@ Values in between indicate the fraction of maximum possible churn that is occurr
This helps identify jobs that create the most indexing pressure on storage, even when their current active cardinality appears moderate.
### Alerting
Pre-built alert rules for cardinality monitoring are available in
[deployment/docker/rules/alerts-cardinality.yml](https://github.com/VictoriaMetrics/vmestimator/blob/main/deployment/docker/rules/alerts-cardinality.yml).
They require two streams with the same `group_by` but different intervals to also support churn detection:
```yaml
# streams.yaml
# or use example config:
# https://github.com/VictoriaMetrics/vmestimator/blob/main/streams.yaml
- interval: '15m'
group_by: ['job']
- interval: '30m'
group_by: ['job']
```
The included alerts are:
- **JobTooHighCardinality** — fires when any job exceeds 20,000 estimated active series over the last 30 minutes.
The threshold is a starting point and should be calibrated to reflect the expected cardinality of your largest jobs.
- **JobTooHighChurnRate** — fires when more than 10% of a job's series churned between the 15m and 30m windows.
Catches jobs that generate continuous indexing pressure even when their active series count looks moderate.
- **CardinalityGroupLimitNearlyReached** — fires when the number of tracked groups exceeds 80% of the configured `group_limit`.
Acts as an early warning that some label value combinations may soon be dropped from individual tracking.
- **CardinalityGroupLimitReached** — fires when groups are actively rejected because `group_limit` is full.
At this point, some label combinations are being counted in a shared "rejected" sketch rather than tracked individually.
All alerts link to the [Cardinality Explorer dashboard](https://play-grafana.victoriametrics.com/d/mktd5h8/).
## Alternative solutions
### PromQL
@@ -297,7 +331,7 @@ topk(10, count({__name__=~".*"}) by (job))
This approach works for small setups but does not scale well, because these queries scan the entire time series set.
Most critically, if the storage is overloaded or unavailable, these queries could not be executed.
### Cardinality explorer
### Cardinality Explorer
VictoriaMetrics includes a built-in [cardinality explorer](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#cardinality-explorer).
It provides per-metric detail beyond raw series counts: query frequency, last access time, day-over-day change, and share of total cardinality.
@@ -355,10 +389,11 @@ When grouping is enabled, vmestimator exposes per-bucket operational metrics at
Two Grafana dashboards are available in the [dashboards](https://github.com/VictoriaMetrics/vmestimator/tree/main/dashboards) directory:
- `vmestimator.json` — application health: CPU, memory, ingestion rates, concurrent inserts, and group key saturation.
- `cardinality-explorer.json` — cardinality analysis: global estimates, per-group-key series counts, and top-10 highest-cardinality label value combinations.
- [VictoriaMetrics - vmestimator](https://play-grafana.victoriametrics.com/d/mkv22l4/victoriametrics-vmestimator) — application health: CPU, memory, ingestion rates, concurrent inserts, and group key saturation.
<img width="1507" height="801" alt="Screenshot 2026-06-29 at 19 06 46" src="https://github.com/user-attachments/assets/cbfd979d-f403-4270-b098-2d2f0b392172" />
<img style="min-width:0;width: 100%" src="https://github.com/user-attachments/assets/2bd6a930-1eb5-40ef-8006-8196c1c12397" />
- [VictoriaMetrics - Cardinality Explorer](https://play-grafana.victoriametrics.com/d/mktd5h8/victoriametrics-cardinality-explorer) — cardinality analysis: global estimates, per-group-key series counts, and top-10 highest-cardinality label value combinations.
<img width="1510" height="796" alt="Screenshot 2026-06-29 at 19 05 47" src="https://github.com/user-attachments/assets/a1aea6e1-8714-4d5a-a629-8bdee978f1c6" />
## How to build from sources
@@ -384,17 +419,17 @@ The base docker image is [alpine](https://hub.docker.com/_/alpine) but it is pos
For example, the following command builds the image on top of [scratch](https://hub.docker.com/_/scratch) image:
```sh
ROOT_IMAGE=scratch make package-vmrestore
ROOT_IMAGE=scratch make package-vmestimator
```
You can build and publish to your own registry and namespace:
```
DOCKER_REGISTRIES=ghcr.io DOCKER_NAMESPACE=foo make publish-vmagent
DOCKER_REGISTRIES=ghcr.io DOCKER_NAMESPACE=foo make publish-vmestimator
```
## Command-line flags
Run `vmestimate -help` in order to see all the available options:
Run `vmestimator -help` in order to see all the available options:
```
Usage of ./bin/vmestimator:
@@ -403,7 +438,7 @@ Usage of ./bin/vmestimator:
-cardinalityMetrics.exposeAt string
HTTP path for exposing cardinality metrics. If set to the default /metrics, cardinality metrics are merged with regular metrics and exposed together. If set to a different path, only cardinality metrics are exposed at that endpoint. If set to an empty value, cardinality metrics are not exposed via HTTP at all. (default "/metrics")
-config string
Path to YAML configuration file. Must be set unless -storageNode is specified. See https://github.com/VictoriaMetrics/cestimator/blob/main/streams.yaml for config example
Path to YAML configuration file. Must be set unless -storageNode is specified. See https://github.com/VictoriaMetrics/vmestimator/blob/main/streams.yaml for config example
-enableTCP6
Whether to enable IPv6 for listening and dialing. By default, only IPv4 TCP and UDP are used
-envflag.enable

View File

@@ -58,10 +58,13 @@ var (
disableKeepAlive = flag.Bool("http.disableKeepAlive", false, "Whether to disable HTTP keep-alive for incoming connections at -httpListenAddr")
disableResponseCompression = flag.Bool("http.disableResponseCompression", false, "Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth")
maxGracefulShutdownDuration = flag.Duration("http.maxGracefulShutdownDuration", 7*time.Second, `The maximum duration for a graceful shutdown of the HTTP server. A highly loaded server may require increased value for a graceful shutdown`)
shutdownDelay = flag.Duration("http.shutdownDelay", 0, `Optional delay before http server shutdown. During this delay, the server returns non-OK responses from /health page, so load balancers can route new requests to other servers`)
idleConnTimeout = flag.Duration("http.idleConnTimeout", time.Minute, "Timeout for incoming idle http connections")
connTimeout = flag.Duration("http.connTimeout", 2*time.Minute, "Incoming connections to -httpListenAddr are closed after the configured timeout. "+
maxGracefulShutdownDuration = flag.Duration("http.maxGracefulShutdownDuration", 7*time.Second, "The maximum duration for a graceful shutdown of the HTTP server. "+
"During this period the server stops accepting new connections, but it will continue serving existing connections. "+
"The remaining in-flight requests are canceled before the deadline, so the shutdown can finish within this duration. "+
"A highly loaded server may require increased value for a graceful shutdown")
shutdownDelay = flag.Duration("http.shutdownDelay", 0, `Optional delay before http server shutdown. During this delay, the server returns non-OK responses from /health page, so load balancers can route new requests to other servers`)
idleConnTimeout = flag.Duration("http.idleConnTimeout", time.Minute, "Timeout for incoming idle http connections")
connTimeout = flag.Duration("http.connTimeout", 2*time.Minute, "Incoming connections to -httpListenAddr are closed after the configured timeout. "+
"This may help evenly spreading load among a cluster of services behind TCP-level load balancer. Zero value disables closing of incoming connections")
headerHSTS = flag.String("http.header.hsts", "", "Value for 'Strict-Transport-Security' header, recommended: 'max-age=31536000; includeSubDomains'")
@@ -80,6 +83,7 @@ var (
type server struct {
shutdownDelayDeadline atomic.Int64
s *http.Server
cancel context.CancelFunc
}
// RequestHandler must serve the given request r and write response to w.
@@ -156,7 +160,11 @@ func serve(addr string, rh RequestHandler, idx int, opts ServeOptions) {
func serveWithListener(addr string, ln net.Listener, rh RequestHandler, disableBuiltinRoutes bool) {
var s server
ctx, cancel := context.WithCancel(context.Background())
s.s = &http.Server{
BaseContext: func(l net.Listener) context.Context {
return ctx
},
// Disable http/2, since it doesn't give any advantages for VictoriaMetrics services.
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
@@ -170,6 +178,7 @@ func serveWithListener(addr string, ln net.Listener, rh RequestHandler, disableB
ErrorLog: log.New(&tlsErrorSkipLogger{}, "", 0),
}
s.s.SetKeepAlivesEnabled(!*disableKeepAlive)
s.cancel = cancel
if *connTimeout > 0 {
s.s.ConnContext = func(ctx context.Context, _ net.Conn) context.Context {
timeoutSec := connTimeout.Seconds()
@@ -265,8 +274,18 @@ func stop(addr string) error {
logger.Infof("Starting shutdown for http server %q", addr)
}
// Cancel in-flight requests shortly before the deadline, reserving up to 2s (or 20%
// of the window, whichever is smaller) for them to unwind, so Shutdown returns cleanly
// within -http.maxGracefulShutdownDuration instead of timing out and dying via
// logger.Fatalf -> os.Exit, which skips the storage flush and loses data.
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/1502
cancelInflightAfter := *maxGracefulShutdownDuration - min(*maxGracefulShutdownDuration/5, 2*time.Second)
ctx, cancel := context.WithTimeout(context.Background(), *maxGracefulShutdownDuration)
defer cancel()
t := time.AfterFunc(cancelInflightAfter, s.cancel)
defer t.Stop()
if err := s.s.Shutdown(ctx); err != nil {
return fmt.Errorf("cannot gracefully shutdown http server at %q in %.3fs; "+
"probably, `-http.maxGracefulShutdownDuration` command-line flag value must be increased; error: %s", addr, maxGracefulShutdownDuration.Seconds(), err)

View File

@@ -105,6 +105,10 @@ type body struct {
Scope string `json:"scope,omitempty"`
vmAccessClaim VMAccessClaim
// hasVMAccess is set to true when the token body contains a `vm_access` claim.
// Presence enforcement is left to the caller via Token.HasVMAccess.
hasVMAccess bool
buf []byte
p *fastjson.Parser
@@ -121,7 +125,6 @@ type body struct {
}
func (b *body) parse(src string) error {
var err error
b.buf, err = decodeB64(b.buf[:0], src)
if err != nil {
@@ -132,6 +135,9 @@ func (b *body) parse(src string) error {
if err != nil {
return err
}
if jv.Type() != fastjson.TypeObject {
return fmt.Errorf("unexpected non json object; type: %q", jv.Type())
}
if expObject := jv.Get("exp"); expObject != nil {
b.Exp, err = expObject.Int64()
if err != nil {
@@ -153,30 +159,31 @@ func (b *body) parse(src string) error {
}
vaObject := jv.Get("vm_access")
if vaObject == nil {
return ErrVMAccessFieldMissing
}
// some IDPs encode custom claims as a string
// try parsing as an object and fallback to a string
switch vaObject.Type() {
case fastjson.TypeObject:
if err := b.vmAccessClaim.parseFrom(vaObject); err != nil {
return err
}
case fastjson.TypeString:
b.claimsParser = parserPool.Get()
va, err := b.claimsParser.ParseBytes(vaObject.GetStringBytes())
if err != nil {
return fmt.Errorf("cannot parse `vm_access` string json: %w", err)
}
if err := b.vmAccessClaim.parseFrom(va); err != nil {
return fmt.Errorf("cannot parse `vm_access` values from string json: %w", err)
}
b.vmAccessClaimObject = va
case fastjson.TypeNull:
return ErrVMAccessFieldMissing
switch {
case vaObject == nil || vaObject.Type() == fastjson.TypeNull:
b.hasVMAccess = false
default:
return fmt.Errorf("unexpected type for `vm_access` field; got: %q, want object {}", vaObject.Type())
// some IDPs encode custom claims as a string
// try parsing as an object and fallback to a string
switch vaObject.Type() {
case fastjson.TypeObject:
if err := b.vmAccessClaim.parseFrom(vaObject); err != nil {
return err
}
case fastjson.TypeString:
b.claimsParser = parserPool.Get()
va, err := b.claimsParser.ParseBytes(vaObject.GetStringBytes())
if err != nil {
return fmt.Errorf("cannot parse `vm_access` string json: %w", err)
}
if err := b.vmAccessClaim.parseFrom(va); err != nil {
return fmt.Errorf("cannot parse `vm_access` values from string json: %w", err)
}
b.vmAccessClaimObject = va
default:
return fmt.Errorf("unexpected type for `vm_access` field; got: %q, want object {}", vaObject.Type())
}
b.hasVMAccess = true
}
b.Jti = bytesutil.ToUnsafeString(jv.GetStringBytes("jti"))
@@ -218,6 +225,7 @@ func (b *body) reset() {
b.buf = b.buf[:0]
b.allClaims = nil
b.vmAccessClaim.reset()
b.hasVMAccess = false
if b.p != nil {
parserPool.Put(b.p)
b.p = nil
@@ -229,11 +237,9 @@ func (b *body) reset() {
if b.vmAccessClaimObject != nil {
b.vmAccessClaimObject = nil
}
}
// Parse parses JWT token from given source string
//
// Token field is valid until src is reachable
func (t *Token) Parse(src string, enforceAuthPrefix bool) error {
if enforceAuthPrefix && (len(src) < len(prefix) || !strings.EqualFold(src[:len(prefix)], prefix)) {
@@ -268,6 +274,11 @@ func (t *Token) Parse(src string, enforceAuthPrefix bool) error {
return nil
}
// HasVMAccessClaim reports whether the parsed token contains a `vm_access` claim.
func (t *Token) HasVMAccessClaim() bool {
return t.body.hasVMAccess
}
// Issuer returns `iss` claim value from token body
func (t *Token) Issuer() string {
return t.body.Iss
@@ -371,30 +382,30 @@ func (t *Token) Reset() {
// VMAccessClaim represent JWT claim object
type VMAccessClaim struct {
MetricsExtraFilters []string `json:"metrics_extra_filters,omitempty"`
MetricsExtraLabels []string `json:"metrics_extra_labels,omitempty"`
LogsExtraFilters []string `json:"logs_extra_filters,omitempty"`
LogsExtraStreamFilters []string `json:"logs_extra_stream_filters,omitempty"`
MetricsExtraFilters []string `json:"metrics_extra_filters,omitempty" yaml:"metrics_extra_filters,omitempty"`
MetricsExtraLabels []string `json:"metrics_extra_labels,omitempty" yaml:"metrics_extra_labels,omitempty"`
LogsExtraFilters []string `json:"logs_extra_filters,omitempty" yaml:"logs_extra_filters,omitempty"`
LogsExtraStreamFilters []string `json:"logs_extra_stream_filters,omitempty" yaml:"logs_extra_stream_filters,omitempty"`
MetricsAccountID uint32 `json:"metrics_account_id,omitempty"`
MetricsProjectID uint32 `json:"metrics_project_id,omitempty"`
MetricsAccountID uint32 `json:"metrics_account_id,omitempty" yaml:"metrics_account_id,omitempty"`
MetricsProjectID uint32 `json:"metrics_project_id,omitempty" yaml:"metrics_project_id,omitempty"`
LogsAccountID uint32 `json:"logs_account_id,omitempty"`
LogsProjectID uint32 `json:"logs_project_id,omitempty"`
LogsAccountID uint32 `json:"logs_account_id,omitempty" yaml:"logs_account_id,omitempty"`
LogsProjectID uint32 `json:"logs_project_id,omitempty" yaml:"logs_project_id,omitempty"`
// Properties below are deprecated and retained only for compatibility with vmgateway, which is itself deprecated.
// promql filters applied to each select query
// Deprecated
ExtraFilters []string `json:"extra_filters,omitempty"`
ExtraFilters []string `json:"extra_filters,omitempty" yaml:"-"`
// Deprecated
Tenant TenantID `json:"tenant_id"`
Tenant TenantID `json:"tenant_id" yaml:"-"`
// role can be denied as 1 = read, 2 = write, 3 = read and write
// 0 = unconfigured - read and write
// Deprecated
Mode int `json:"mode,omitempty"`
Mode int `json:"mode,omitempty" yaml:"-"`
// Deprecated
Labels []string `json:"extra_labels,omitempty"`
Labels []string `json:"extra_labels,omitempty" yaml:"-"`
// labelsBuf holds allocated memory for Labels
// Deprecated
labelsBuf []byte
@@ -425,7 +436,6 @@ func (vac *VMAccessClaim) reset() {
}
func (vac *VMAccessClaim) parseFrom(jv *fastjson.Value) error {
if err := vac.Tenant.parseFrom(jv); err != nil {
return err
}
@@ -569,6 +579,9 @@ func NewToken(auth string, enforceAuthPrefix bool) (*Token, error) {
if err := t.parse(jwt[0], jwt[1], jwt[2]); err != nil {
return nil, err
}
if !t.body.hasVMAccess {
return nil, ErrVMAccessFieldMissing
}
return &t, nil
}

View File

@@ -168,17 +168,10 @@ func TestParseJWTBody_Failure(t *testing.T) {
true,
)
// invalid body type json
// non-object body type
f(
`[]`,
"missing `vm_access` claim",
true,
)
// missing vm_access claim
f(
`{}`,
"missing `vm_access` claim",
`unexpected non json object; type: "array"`,
true,
)
@@ -189,13 +182,6 @@ func TestParseJWTBody_Failure(t *testing.T) {
true,
)
// vm_access claim null
f(
`{"vm_access": null}`,
"missing `vm_access` claim",
true,
)
// invalid vm_access: account_id type mismatch
f(
`{"vm_access": {"tenant_id": {"account_id": "1", "project_id": 5}}}`,
@@ -555,6 +541,33 @@ func TestParseJWTBody_Success(t *testing.T) {
)
}
func TestParseJWTBody_VMAccessPresence(t *testing.T) {
f := func(data string, wantHasVMAccess bool) {
t.Helper()
encodedLen := base64.RawURLEncoding.EncodedLen(len(data))
encoded := make([]byte, encodedLen)
base64.RawURLEncoding.Encode(encoded, []byte(data))
var b body
if err := b.parse(string(encoded)); err != nil {
t.Fatalf("unexpected error: %s", err)
}
if b.hasVMAccess != wantHasVMAccess {
t.Fatalf("unexpected hasVMAccess; got %v; want %v", b.hasVMAccess, wantHasVMAccess)
}
}
// vm_access claim is present
f(`{"vm_access": {}}`, true)
f(`{"vm_access": {"metrics_account_id": 1}}`, true)
// vm_access claim is absent or null - parsing must succeed with hasVMAccess=false
f(`{}`, false)
f(`{"vm_access": null}`, false)
f(`{"role": "admin"}`, false)
}
func TestNewTokenFromRequest_Failure(t *testing.T) {
f := func(r *http.Request) {
t.Helper()
@@ -866,7 +879,6 @@ func TestNewTokenFromRequest_Success(t *testing.T) {
}
func TestTokenMatchClaims(t *testing.T) {
/*
{
"iss": "https://login.microsoftonline.com/-6691-4868-a77b-1b0f9bbe5f43/v2.0",

311
lib/mdx/filter.go Normal file
View File

@@ -0,0 +1,311 @@
package mdx
import (
"flag"
"strconv"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
var (
vmLabel = flag.String("mdx.label", "", "Optional label value in the form 'name=value' used to identify VictoriaMetrics metrics for MDX. "+
"Metrics containing the specified label are forwarded to `-remoteWrite.url` endpoints configured with `-remoteWrite.mdx.enable=true`.")
)
const (
vmAppLabelName = "victoriametrics_app"
vmAppLabelValue = "true"
vmAppVersionMetricName = "vm_app_version"
)
// Ctx defines filtering context
type Ctx struct {
// labels hold modified timeseries labels
// valid until PutContext call
labels []prompb.Label
buf []byte
hasVMAppLabel bool
hasVMAppVersionLabel bool
hasFilterLabelValue bool
jobLabelValue string
instanceLabelValue string
}
func (ctx *Ctx) reset() {
// do not reset labels intentionally
// it must live until PutContext call
ctx.buf = ctx.buf[:0]
ctx.hasVMAppLabel = false
ctx.hasVMAppVersionLabel = false
ctx.hasFilterLabelValue = false
ctx.jobLabelValue = ""
ctx.instanceLabelValue = ""
}
var ctxPool = &sync.Pool{
New: func() any {
return &Ctx{}
},
}
// GetContext returns filtering context
func GetContext() *Ctx {
return ctxPool.Get().(*Ctx)
}
// PutContext resets context
func PutContext(ctx *Ctx) {
clear(ctx.labels)
ctx.labels = ctx.labels[:0]
ctx.reset()
ctxPool.Put(ctx)
}
// Filter manages the list of VictoriaMetrics instances grouped by job:instance labels.
// job and instance must present at timeseries.
//
// Filter keeps timeseries with any of the following conditions:
// * vm_app_version present
// * victoriametrics_app=true label present at timeseries
// * if labels has label value defined with flag `-mdx.label`
//
// Filter track entries with TTL of 1 hour
type Filter struct {
tracker *instanceTracker
filterByLabelName string
label string
}
// NewFilter returns new Filter instance
func NewFilter() *Filter {
filter := &Filter{
tracker: newInstanceTracker(),
}
if len(*vmLabel) > 0 {
n := strings.IndexByte(*vmLabel, '=')
if n < 0 {
logger.Fatalf("missing '=' in `-mdx.label`. It must contain label in the form `name=value`; got %q", *vmLabel)
}
filter.filterByLabelName = (*vmLabel)[:n]
filter.label = (*vmLabel)[n+1:]
if len(filter.filterByLabelName) == 0 || len(filter.label) == 0 {
logger.Fatalf("label name and value cannot be empty in `-mdx.label`. It must contain label in the form `name=value`; got %q", *vmLabel)
}
}
return filter
}
// VMInstancesCount returns amount of currently tracked instances
func (filter *Filter) VMInstancesCount() int {
return filter.tracker.len()
}
// MustStop stops filter instance
func (filter *Filter) MustStop() {
filter.tracker.mustStop()
}
// Filter filters provided timeseries with given context.
//
// Returned timeseries is valid as long as Ctx is valid
func (filter *Filter) Filter(ctx *Ctx, tss []prompb.TimeSeries) []prompb.TimeSeries {
dstTss := tss[:0]
for _, ts := range tss {
ctx.prepare(ts.Labels, filter.filterByLabelName, filter.label)
key := ctx.formatTimeSeriesKey()
if len(key) == 0 {
// metrics with empty job or instance labels must be always dropped
// despite any other conditions
continue
}
if ctx.hasVMAppLabel {
filter.trackInstance(key)
dstTss = append(dstTss, ts)
continue
}
if ctx.hasFilterLabelValue || ctx.hasVMAppVersionLabel {
ts.Labels = ctx.addVMAppLabel(ts.Labels)
filter.trackInstance(key)
dstTss = append(dstTss, ts)
continue
}
ok := filter.tracker.has(key)
if ok {
ts.Labels = ctx.addVMAppLabel(ts.Labels)
dstTss = append(dstTss, ts)
}
}
return dstTss
}
func (filter *Filter) trackInstance(key string) {
if filter.tracker.has(key) {
return
}
key = strings.Clone(key)
filter.tracker.register(key)
}
func (ctx *Ctx) prepare(labels []prompb.Label, filterByLabelName, label string) {
ctx.reset()
// always use the last label=value pair
// because in case of possible label duplicates,
// the last added label must win
for _, l := range labels {
switch l.Name {
case "job":
ctx.jobLabelValue = l.Value
case "instance":
ctx.instanceLabelValue = l.Value
case vmAppLabelName:
if l.Value == vmAppLabelValue {
ctx.hasVMAppLabel = true
}
case "__name__":
if l.Value == vmAppVersionMetricName {
ctx.hasVMAppVersionLabel = true
}
}
if len(filterByLabelName) > 0 {
if l.Name == filterByLabelName && l.Value == label {
ctx.hasFilterLabelValue = true
}
}
}
}
// formatTimeSeriesKey returns timeseries key after ctx.prepare call
// if it catched job and instances labels
//
// returned string is valid until next ctx.prepare
func (ctx *Ctx) formatTimeSeriesKey() string {
if len(ctx.jobLabelValue) == 0 || len(ctx.instanceLabelValue) == 0 {
return ""
}
buf := ctx.buf[:0]
buf = strconv.AppendQuote(buf, ctx.jobLabelValue)
buf = append(buf, ':')
buf = strconv.AppendQuote(buf, ctx.instanceLabelValue)
ctx.buf = buf
return bytesutil.ToUnsafeString(buf)
}
func (ctx *Ctx) addVMAppLabel(labels []prompb.Label) []prompb.Label {
// unconditionally add vmAppLabelValue at the end of labels list
// it will overwrite any exist vmAppLabelName labels with a value different to vmAppLabelValue
// it's guaranteed by VictoriaMetrics ingestion contract
poolLabels := ctx.labels
poolLabelsLen := len(poolLabels)
poolLabels = append(poolLabels, labels...)
poolLabels = append(poolLabels, prompb.Label{Name: vmAppLabelName, Value: vmAppLabelValue})
ctx.labels = poolLabels
return poolLabels[poolLabelsLen:len(poolLabels):len(poolLabels)]
}
type instanceTracker struct {
mu sync.RWMutex
lastAccessByKey map[string]*atomic.Uint64
wg sync.WaitGroup
stop chan struct{}
}
func newInstanceTracker() *instanceTracker {
c := &instanceTracker{
lastAccessByKey: make(map[string]*atomic.Uint64),
stop: make(chan struct{}),
}
c.wg.Add(1)
go c.startStaleWatcher()
return c
}
func (it *instanceTracker) len() int {
it.mu.RLock()
s := len(it.lastAccessByKey)
it.mu.RUnlock()
return s
}
func (it *instanceTracker) has(key string) bool {
it.mu.RLock()
lat, ok := it.lastAccessByKey[key]
it.mu.RUnlock()
if ok {
lat.Store(fasttime.UnixTimestamp())
}
return ok
}
func (it *instanceTracker) register(key string) {
it.mu.Lock()
// key could be registered by concurrent goroutine
lat, ok := it.lastAccessByKey[key]
if !ok {
lat = &atomic.Uint64{}
it.lastAccessByKey[key] = lat
}
it.mu.Unlock()
lat.Store(fasttime.UnixTimestamp())
}
func (it *instanceTracker) startStaleWatcher() {
defer it.wg.Done()
t := time.NewTicker(time.Minute)
defer t.Stop()
for {
select {
case <-it.stop:
return
case <-t.C:
it.cleanStale()
}
}
}
var entryTTLSeconds = uint64(time.Hour.Seconds())
func (it *instanceTracker) cleanStale() {
ct := fasttime.UnixTimestamp()
var toDelete map[string]*atomic.Uint64
it.mu.RLock()
for key, lastAccessTime := range it.lastAccessByKey {
accessedAt := lastAccessTime.Load()
if ct > accessedAt+entryTTLSeconds {
if toDelete == nil {
toDelete = make(map[string]*atomic.Uint64)
}
toDelete[key] = lastAccessTime
}
}
it.mu.RUnlock()
if len(toDelete) > 0 {
it.mu.Lock()
for key, lastAccessTime := range toDelete {
accessedAt := lastAccessTime.Load()
// concurrent goroutine may refresh lastAccessTime
if ct > accessedAt+entryTTLSeconds {
delete(it.lastAccessByKey, key)
}
}
it.mu.Unlock()
}
}
func (it *instanceTracker) mustStop() {
close(it.stop)
it.wg.Wait()
}

View File

@@ -0,0 +1,104 @@
//go:build synctest
package mdx
import (
"testing"
"testing/synctest"
"time"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
func TestMdxInstanceCleanup(t *testing.T) {
synctest.Test(t, func(t *testing.T) {
filter := NewFilter()
defer filter.MustStop()
assertFilterLen := func(expectedLen int) {
t.Helper()
if filter.VMInstancesCount() != expectedLen {
t.Fatalf("unexpected instance map length; got %d; want %d", filter.VMInstancesCount(), expectedLen)
}
}
ctx := GetContext()
filter.Filter(ctx, []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_up"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "node-exporter1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds"},
{Name: "instance", Value: "service1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent1:8429"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "scrape_targets_up"},
{Name: "instance", Value: "vmagent1:8429"},
{Name: "job", Value: "test"},
},
},
},
)
PutContext(ctx)
time.Sleep(1 * time.Minute)
// the entries should not be cleaned.
assertFilterLen(2)
time.Sleep(58 * time.Minute)
// receive samples from victoria-metrics1:8428 after 59 minutes.
// so the entry will be refreshed.
ctx = GetContext()
filter.Filter(ctx, []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_up"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
},
)
PutContext(ctx)
assertFilterLen(2)
// entry for job:instance - test:vmagent1:8429 must be removed
time.Sleep(4 * time.Minute)
assertFilterLen(1)
// no samples from vmagent1:8429 in the last hour, so it should be removed from the mdx instance list.
time.Sleep(2 * time.Hour)
assertFilterLen(0)
})
}

435
lib/mdx/filter_test.go Normal file
View File

@@ -0,0 +1,435 @@
package mdx
import (
"fmt"
"sync"
"testing"
"github.com/google/go-cmp/cmp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
func TestMdxInstanceFilter(t *testing.T) {
originalVmLabel := *vmLabel
*vmLabel = "service=victoriametrics"
t.Cleanup(func() {
*vmLabel = originalVmLabel
})
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries) {
t.Helper()
filter := NewFilter()
defer filter.MustStop()
ctx := GetContext()
defer PutContext(ctx)
inputCopy := append([]prompb.TimeSeries{}, input...)
output := filter.Filter(ctx, inputCopy)
if diff := cmp.Diff(expectedOutput, output); len(diff) > 0 {
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
}
// make sure that result is the same over multiple calls
inputCopy = append([]prompb.TimeSeries{}, input...)
output = filter.Filter(ctx, inputCopy)
if diff := cmp.Diff(expectedOutput, output); len(diff) > 0 {
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
}
}
// metrics with vm_app_version and different order of labels.
f([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics2:8428"},
{Name: "__name__", Value: "vm_app_version"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "job", Value: "test"},
{Name: "instance", Value: "victoria-metrics3:8428"},
{Name: "__name__", Value: "vm_app_version"},
},
},
},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics2:8428"},
{Name: "__name__", Value: "vm_app_version"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "job", Value: "test"},
{Name: "instance", Value: "victoria-metrics3:8428"},
{Name: "__name__", Value: "vm_app_version"},
{Name: "victoriametrics_app", Value: "true"},
},
}},
)
// metrics without vm_app_version but with service=victoriametrics that is specified in `-mdx.label`.
f([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_slow_queries_total"},
{Name: "job", Value: "test"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "service", Value: "victoriametrics"},
},
},
},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_slow_queries_total"},
{Name: "job", Value: "test"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "service", Value: "victoriametrics"},
{Name: "victoriametrics_app", Value: "true"},
},
},
})
// metrics without vm_app_version but with service=victoriametrics that is specified in `-mdx.label`.
f([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_slow_queries_total"},
{Name: "job", Value: "test"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "service", Value: "victoriametrics"},
},
}},
[]prompb.TimeSeries{
// 2.
// metrics without vm_app_version but with service=victoriametrics that is specified in `-mdx.label`.
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_slow_queries_total"},
{Name: "job", Value: "test"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "service", Value: "victoriametrics"},
{Name: "victoriametrics_app", Value: "true"},
},
}})
// metrics with vm_app_version and service=victoriametrics should be preserved.
f([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics5:8428"},
{Name: "job", Value: "test"},
{Name: "service", Value: "victoriametrics"},
{Name: "__name__", Value: "vm_app_version"},
},
},
},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics5:8428"},
{Name: "job", Value: "test"},
{Name: "service", Value: "victoriametrics"},
{Name: "__name__", Value: "vm_app_version"},
{Name: "victoriametrics_app", Value: "true"},
},
},
},
)
// metrics without vm_app_version and `service=victoriametrics` but with `victoriametrics_app=true`, which should be preserved.
f([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics6:8428"},
{Name: "job", Value: "test"},
{Name: "__name__", Value: "vm_slow_queries_total"},
{Name: "victoriametrics_app", Value: "true"},
},
},
},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics6:8428"},
{Name: "job", Value: "test"},
{Name: "__name__", Value: "vm_slow_queries_total"},
{Name: "victoriametrics_app", Value: "true"},
},
},
})
// metrics without vm_app_version and service=victoriametrics and `victoriametrics_app=true`, which should be filtered out.
f([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "go_gc_duration_seconds"},
{Name: "instance", Value: "node-exporter1"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "http_request_duration_seconds"},
{Name: "instance", Value: "service1"},
{Name: "job", Value: "test"},
},
},
},
[]prompb.TimeSeries{},
)
// metrics with vm_app_version but job or instance is empty (or missing), they should be dropped.
f([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: ""},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent2:8429"},
{Name: "job", Value: ""},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "vmagent2:8429"},
},
},
},
[]prompb.TimeSeries{})
// metrics without vm_app_version, but the instances were already registered with first timeseries
f([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_rows_inserted_total"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_rows_inserted_total"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
})
// metrics without vm_app_version, `service=victoriametrics` and `victoriametrics_app=true`, and the instance wasn't already registered in the previous call, so it will be dropped.
f([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
{Name: "instance", Value: "victoria-metrics7:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "service", Value: "victoriametrics"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "job", Value: "test"},
},
},
},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "service", Value: "victoriametrics"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
}},
)
// metrics with duplicate victoriametrics_app label
f([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "job", Value: "test"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "service", Value: "victoriametrics"},
{Name: "victoriametrics_app", Value: "other_value"},
},
},
},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "job", Value: "test"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "service", Value: "victoriametrics"},
{Name: "victoriametrics_app", Value: "other_value"},
{Name: "victoriametrics_app", Value: "true"},
},
},
})
// metrics with duplicate job and instance labels
// last value wins
f([]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "job", Value: "test"},
{Name: "job", Value: "test2"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "instance", Value: "victoria-metrics5:8428"},
{Name: "service", Value: "victoriametrics"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "http_requests_total"},
{Name: "job", Value: "test2"},
{Name: "instance", Value: "victoria-metrics5:8428"},
{Name: "service", Value: "victoriametrics"},
},
},
},
[]prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "job", Value: "test"},
{Name: "job", Value: "test2"},
{Name: "instance", Value: "victoria-metrics4:8428"},
{Name: "instance", Value: "victoria-metrics5:8428"},
{Name: "service", Value: "victoriametrics"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "__name__", Value: "http_requests_total"},
{Name: "job", Value: "test2"},
{Name: "instance", Value: "victoria-metrics5:8428"},
{Name: "service", Value: "victoriametrics"},
{Name: "victoriametrics_app", Value: "true"},
},
}})
}
func TestMdxInstanceFilterConcurrent(t *testing.T) {
originalVmLabel := *vmLabel
*vmLabel = "service=victoriametrics"
t.Cleanup(func() { *vmLabel = originalVmLabel })
filter := NewFilter()
defer filter.MustStop()
const concurrency = 8
const iterations = 200
generateSeries := func(g int) []prompb.TimeSeries {
return []prompb.TimeSeries{
{Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "job", Value: "test"},
{Name: "instance", Value: fmt.Sprintf("vm-%d:8428", g)},
}},
// shared job:instance
{Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "job", Value: "test"},
{Name: "instance", Value: "vmagent:8428"},
}},
}
}
var wg sync.WaitGroup
for worker := range concurrency {
wg.Go(func() {
input := generateSeries(worker)
var expectedOutput []prompb.TimeSeries
for _, inputTs := range input {
labels := append([]prompb.Label{}, inputTs.Labels...)
labels = append(labels, prompb.Label{Name: vmAppLabelName, Value: vmAppLabelValue})
expectedOutput = append(expectedOutput, prompb.TimeSeries{Labels: labels})
}
for range iterations {
ctx := GetContext()
inputCopy := append([]prompb.TimeSeries{}, input...)
output := filter.Filter(ctx, inputCopy)
if diff := cmp.Diff(expectedOutput, output); len(diff) > 0 {
t.Errorf("unexpected result (-want, +got):\n%s", diff)
}
PutContext(ctx)
}
})
}
wg.Wait()
// goroutines + 1 shared
if got := filter.VMInstancesCount(); got != concurrency+1 {
t.Errorf("unexpected instance count: got %d, want %d", got, concurrency+1)
}
}

View File

@@ -0,0 +1,85 @@
package mdx
import (
"testing"
"github.com/google/go-cmp/cmp"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
)
func BenchmarkFilter(b *testing.B) {
f := func(name string, input, want []prompb.TimeSeries) {
b.Helper()
b.Run(name, func(b *testing.B) {
filter := NewFilter()
defer filter.MustStop()
b.ReportAllocs()
b.ResetTimer()
b.RunParallel(func(pb *testing.PB) {
for pb.Next() {
ctx := GetContext()
localInput := append([]prompb.TimeSeries{}, input...)
tss := filter.Filter(ctx, localInput)
if len(tss) != len(want) {
diff := cmp.Diff(want, tss)
b.Fatalf("unexpected result (-want, +got):\n%s", diff)
}
PutContext(ctx)
}
})
})
}
input := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
},
},
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "__name__", Value: "http_requests_total"},
},
},
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "__name__", Value: "http_requests_errors_total"},
},
},
}
expected := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{Name: "__name__", Value: "vm_app_version"},
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "__name__", Value: "http_requests_total"},
{Name: "victoriametrics_app", Value: "true"},
},
},
{
Labels: []prompb.Label{
{Name: "instance", Value: "victoria-metrics1:8428"},
{Name: "job", Value: "test"},
{Name: "__name__", Value: "http_requests_errors_total"},
{Name: "victoriametrics_app", Value: "true"},
},
},
}
f("match vm_app_version", input, expected)
}

View File

@@ -112,11 +112,19 @@ func (m *TimeSeries) size() (n int) {
}
for _, e := range m.Labels {
l := e.size()
n += 1 + l + sov(uint64(l))
if l < 128 {
n += 2 + l
} else {
n += 1 + l + sov(uint64(l))
}
}
for _, e := range m.Samples {
l := e.size()
n += 1 + l + sov(uint64(l))
if l < 128 {
n += 2 + l
} else {
n += 1 + l + sov(uint64(l))
}
}
return n
}
@@ -126,10 +134,18 @@ func (m *Label) size() (n int) {
return 0
}
if l := len(m.Name); l > 0 {
n += 1 + l + sov(uint64(l))
if l < 128 {
n += 2 + l
} else {
n += 1 + l + sov(uint64(l))
}
}
if l := len(m.Value); l > 0 {
n += 1 + l + sov(uint64(l))
if l < 128 {
n += 2 + l
} else {
n += 1 + l + sov(uint64(l))
}
}
return n
}
@@ -160,6 +176,11 @@ func (m *WriteRequest) marshalToSizedBuffer(dst []byte) (int, error) {
}
func encodeVarint(dst []byte, offset int, v uint64) int {
if v < 1<<7 {
offset--
dst[offset] = byte(v)
return offset
}
offset -= sov(v)
base := offset
for v >= 1<<7 {
@@ -180,7 +201,11 @@ func (m *WriteRequest) size() (n int) {
}
for _, e := range m.Metadata {
l := e.size()
n += 1 + l + sov(uint64(l))
if l < 128 {
n += 2 + l
} else {
n += 1 + l + sov(uint64(l))
}
}
return n
}
@@ -238,13 +263,25 @@ func (m *MetricMetadata) size() (n int) {
n += 1 + sov(uint64(m.Type))
}
if l := len(m.MetricFamilyName); l > 0 {
n += 1 + l + sov(uint64(l))
if l < 128 {
n += 2 + l
} else {
n += 1 + l + sov(uint64(l))
}
}
if l := len(m.Help); l > 0 {
n += 1 + l + sov(uint64(l))
if l < 128 {
n += 2 + l
} else {
n += 1 + l + sov(uint64(l))
}
}
if l := len(m.Unit); l > 0 {
n += 1 + l + sov(uint64(l))
if l < 128 {
n += 2 + l
} else {
n += 1 + l + sov(uint64(l))
}
}
if m.AccountID != 0 {
n += 1 + sov(uint64(m.AccountID))