mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-30 22:23:49 +03:00
Compare commits
31 Commits
dependabot
...
vmauth-slo
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
19ae54bcc7 | ||
|
|
f84c1055da | ||
|
|
7b28b34e3a | ||
|
|
d07286ea20 | ||
|
|
6d0c5f5099 | ||
|
|
c82127b6d4 | ||
|
|
06bc808ddc | ||
|
|
a6927c46be | ||
|
|
ed1e3965db | ||
|
|
15a4c31e87 | ||
|
|
54f9cd6edd | ||
|
|
2e16874e95 | ||
|
|
81d330f297 | ||
|
|
3278ddd170 | ||
|
|
cc790c2ea1 | ||
|
|
950f38fd6a | ||
|
|
ab9db9152f | ||
|
|
5b89f52c72 | ||
|
|
3608ab5b4c | ||
|
|
5ee1fa70c1 | ||
|
|
1b0e843e8f | ||
|
|
679646a3b3 | ||
|
|
bb3c038e2f | ||
|
|
8f32b6648f | ||
|
|
df5f11623f | ||
|
|
6c8a41f5ed | ||
|
|
e749a6ce8d | ||
|
|
615176ad55 | ||
|
|
3aec167f00 | ||
|
|
6f633e5654 | ||
|
|
50a827256a |
@@ -35,6 +35,9 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
|
||||
}
|
||||
|
||||
func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, mms []prompb.MetricMetadata, extraLabels []prompb.Label) error {
|
||||
if len(extraLabels) == 0 && !prommetadata.IsEnabled() && at == nil {
|
||||
return insertRowsFast(at, timeseries)
|
||||
}
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
@@ -102,3 +105,17 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, mms []prompb.Met
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return nil
|
||||
}
|
||||
|
||||
func insertRowsFast(at *auth.Token, timeseries []prompb.TimeSeries) error {
|
||||
rowsTotal := 0
|
||||
for i := range timeseries {
|
||||
rowsTotal += len(timeseries[i].Samples)
|
||||
}
|
||||
wr := &prompb.WriteRequest{Timeseries: timeseries}
|
||||
if !remotewrite.TryPush(at, wr) {
|
||||
return remotewrite.ErrQueueFullHTTPRetry
|
||||
}
|
||||
rowsInserted.Add(rowsTotal)
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -12,19 +12,18 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consistenthash"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
@@ -106,6 +105,9 @@ var (
|
||||
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
|
||||
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
|
||||
"By default, metadata sending is controlled by the global -enableMetadata flag")
|
||||
|
||||
enableMdx = flagutil.NewArrayBool("remoteWrite.mdx.enable", "Whether to only retain metrics from VictoriaMetrics services before sending them to the corresponding -remoteWrite.url. "+
|
||||
"Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -162,8 +164,8 @@ func InitSecretFlags() {
|
||||
}
|
||||
|
||||
var (
|
||||
shardByURLLabelsMap map[string]struct{}
|
||||
shardByURLIgnoreLabelsMap map[string]struct{}
|
||||
shardByURLLabelsFilter []string
|
||||
shardByURLIgnoreLabelsFilter []string
|
||||
)
|
||||
|
||||
// Init initializes remotewrite.
|
||||
@@ -210,8 +212,8 @@ func Init() {
|
||||
logger.Fatalf("-remoteWrite.shardByURL.labels and -remoteWrite.shardByURL.ignoreLabels cannot be set simultaneously; " +
|
||||
"see https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages")
|
||||
}
|
||||
shardByURLLabelsMap = newMapFromStrings(*shardByURLLabels)
|
||||
shardByURLIgnoreLabelsMap = newMapFromStrings(*shardByURLIgnoreLabels)
|
||||
shardByURLLabelsFilter = slices.Clone(*shardByURLLabels)
|
||||
shardByURLIgnoreLabelsFilter = slices.Clone(*shardByURLIgnoreLabels)
|
||||
|
||||
initLabelsGlobal()
|
||||
|
||||
@@ -307,6 +309,10 @@ func initRemoteWriteCtxs(urls []string) {
|
||||
}
|
||||
fs.RegisterPathFsMetrics(*tmpDataPath)
|
||||
|
||||
if slices.Contains(*enableMdx, true) && *shardByURL {
|
||||
logger.Fatalf("-remoteWrite.mdx.enable and -remoteWrite.shardByURL cannot be set to true simultaneously.")
|
||||
}
|
||||
|
||||
if *shardByURL {
|
||||
consistentHashNodes := make([]string, 0, len(urls))
|
||||
for i, url := range urls {
|
||||
@@ -698,18 +704,18 @@ func shardAmountRemoteWriteCtx(tssBlock []prompb.TimeSeries, shards [][]prompb.T
|
||||
|
||||
for _, ts := range tssBlock {
|
||||
hashLabels := ts.Labels
|
||||
if len(shardByURLLabelsMap) > 0 {
|
||||
if len(shardByURLLabelsFilter) > 0 {
|
||||
hashLabels = tmpLabels.Labels[:0]
|
||||
for _, label := range ts.Labels {
|
||||
if _, ok := shardByURLLabelsMap[label.Name]; ok {
|
||||
if slices.Contains(shardByURLLabelsFilter, label.Name) {
|
||||
hashLabels = append(hashLabels, label)
|
||||
}
|
||||
}
|
||||
tmpLabels.Labels = hashLabels
|
||||
} else if len(shardByURLIgnoreLabelsMap) > 0 {
|
||||
} else if len(shardByURLIgnoreLabelsFilter) > 0 {
|
||||
hashLabels = tmpLabels.Labels[:0]
|
||||
for _, label := range ts.Labels {
|
||||
if _, ok := shardByURLIgnoreLabelsMap[label.Name]; !ok {
|
||||
if !slices.Contains(shardByURLIgnoreLabelsFilter, label.Name) {
|
||||
hashLabels = append(hashLabels, label)
|
||||
}
|
||||
}
|
||||
@@ -810,34 +816,26 @@ var (
|
||||
// it omits the '=' separator between label name and value for backward compatibility.
|
||||
// Changing it would re-shard all series across remoteWrite targets.
|
||||
func getLabelsHashForShard(labels []prompb.Label) uint64 {
|
||||
bb := labelsHashBufPool.Get()
|
||||
b := bb.B[:0]
|
||||
var d xxhash.Digest
|
||||
d.Reset()
|
||||
for _, label := range labels {
|
||||
b = append(b, label.Name...)
|
||||
b = append(b, label.Value...)
|
||||
_, _ = d.WriteString(label.Name)
|
||||
_, _ = d.WriteString(label.Value)
|
||||
}
|
||||
h := xxhash.Sum64(b)
|
||||
bb.B = b
|
||||
labelsHashBufPool.Put(bb)
|
||||
return h
|
||||
return d.Sum64()
|
||||
}
|
||||
|
||||
func getLabelsHash(labels []prompb.Label) uint64 {
|
||||
bb := labelsHashBufPool.Get()
|
||||
b := bb.B[:0]
|
||||
var d xxhash.Digest
|
||||
d.Reset()
|
||||
for _, label := range labels {
|
||||
b = append(b, label.Name...)
|
||||
b = append(b, '=')
|
||||
b = append(b, label.Value...)
|
||||
_, _ = d.WriteString(label.Name)
|
||||
_, _ = d.WriteString("=")
|
||||
_, _ = d.WriteString(label.Value)
|
||||
}
|
||||
h := xxhash.Sum64(b)
|
||||
bb.B = b
|
||||
labelsHashBufPool.Put(bb)
|
||||
return h
|
||||
return d.Sum64()
|
||||
}
|
||||
|
||||
var labelsHashBufPool bytesutil.ByteBufferPool
|
||||
|
||||
func logSkippedSeries(labels []prompb.Label, flagName string, flagValue int) {
|
||||
select {
|
||||
case <-logSkippedSeriesTicker.C:
|
||||
@@ -862,6 +860,7 @@ type remoteWriteCtx struct {
|
||||
|
||||
sas atomic.Pointer[streamaggr.Aggregators]
|
||||
deduplicator *streamaggr.Deduplicator
|
||||
mdxFilter *mdx.Filter
|
||||
|
||||
streamAggrKeepInput bool
|
||||
streamAggrDropInput bool
|
||||
@@ -876,6 +875,7 @@ type remoteWriteCtx struct {
|
||||
|
||||
rowsPushedAfterRelabel *metrics.Counter
|
||||
rowsDroppedByRelabel *metrics.Counter
|
||||
mdxRowsPreserved *metrics.Counter
|
||||
|
||||
pushFailures *metrics.Counter
|
||||
metadataDroppedOnPushFailure *metrics.Counter
|
||||
@@ -972,7 +972,6 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
for i := range pss {
|
||||
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
|
||||
}
|
||||
|
||||
rwctx := &remoteWriteCtx{
|
||||
idx: argIdx,
|
||||
fq: fq,
|
||||
@@ -989,6 +988,16 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
}
|
||||
rwctx.initStreamAggrConfig()
|
||||
|
||||
if enableMdx.GetOptionalArg(argIdx) {
|
||||
mdxFilter := mdx.NewFilter()
|
||||
rwctx.mdxFilter = mdxFilter
|
||||
rwctx.mdxRowsPreserved = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_preserved_total{path=%q,url=%q}`, queuePath, sanitizedURL))
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_mdx_tracked_instances{path=%q,url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||
return float64(mdxFilter.VMInstancesCount())
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
return rwctx
|
||||
}
|
||||
|
||||
@@ -1002,6 +1011,11 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
rwctx.deduplicator.MustStop()
|
||||
rwctx.deduplicator = nil
|
||||
}
|
||||
if rwctx.mdxFilter != nil {
|
||||
rwctx.mdxFilter.MustStop()
|
||||
rwctx.mdxFilter = nil
|
||||
rwctx.mdxRowsPreserved = nil
|
||||
}
|
||||
|
||||
for _, ps := range rwctx.pss {
|
||||
ps.MustStop()
|
||||
@@ -1017,6 +1031,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
|
||||
rwctx.rowsPushedAfterRelabel = nil
|
||||
rwctx.rowsDroppedByRelabel = nil
|
||||
|
||||
}
|
||||
|
||||
// TryPushTimeSeries sends tss series to the configured remote write endpoint
|
||||
@@ -1024,16 +1039,41 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
// TryPushTimeSeries doesn't modify tss, so tss can be passed concurrently to TryPush across distinct rwctx instances.
|
||||
func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) bool {
|
||||
var rctx *relabelCtx
|
||||
var mctx *mdx.Ctx
|
||||
var v *[]prompb.TimeSeries
|
||||
defer func() {
|
||||
if rctx == nil {
|
||||
return
|
||||
if v != nil {
|
||||
*v = prompb.ResetTimeSeries(tss)
|
||||
tssPool.Put(v)
|
||||
}
|
||||
if rctx != nil {
|
||||
putRelabelCtx(rctx)
|
||||
}
|
||||
if mctx != nil {
|
||||
mdx.PutContext(mctx)
|
||||
}
|
||||
*v = prompb.ResetTimeSeries(tss)
|
||||
tssPool.Put(v)
|
||||
putRelabelCtx(rctx)
|
||||
}()
|
||||
|
||||
copyTimeSeriesIfNeeded := func() {
|
||||
if v == nil {
|
||||
v := tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
}
|
||||
}
|
||||
|
||||
if rwctx.mdxFilter != nil {
|
||||
mctx = mdx.GetContext()
|
||||
// Make a copy of tss before applying relabeling in order to prevent
|
||||
// from affecting time series for other remoteWrite.mdx configs.
|
||||
copyTimeSeriesIfNeeded()
|
||||
tss = rwctx.mdxFilter.Filter(mctx, tss)
|
||||
if len(tss) == 0 {
|
||||
return true
|
||||
}
|
||||
rowsCount := getRowsCount(tss)
|
||||
rwctx.mdxRowsPreserved.Add(rowsCount)
|
||||
}
|
||||
|
||||
// Apply relabeling
|
||||
rcs := allRelabelConfigs.Load()
|
||||
pcs := rcs.perURL[rwctx.idx]
|
||||
@@ -1043,8 +1083,7 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
// from affecting time series for other remoteWrite.url configs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
|
||||
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
copyTimeSeriesIfNeeded()
|
||||
rowsCountBeforeRelabel := getRowsCount(tss)
|
||||
tss = rctx.applyRelabeling(tss, pcs)
|
||||
rowsCountAfterRelabel := getRowsCount(tss)
|
||||
@@ -1062,8 +1101,7 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
if rctx == nil {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before dropping aggregated series
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
copyTimeSeriesIfNeeded()
|
||||
}
|
||||
tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput)
|
||||
} else if rwctx.streamAggrDropInput {
|
||||
@@ -1071,8 +1109,7 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
if rctx == nil {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before dropping aggregated series
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
copyTimeSeriesIfNeeded()
|
||||
}
|
||||
tss = dropUnaggregatedSeries(tss, matchIdxs.B)
|
||||
}
|
||||
@@ -1191,15 +1228,6 @@ func getRowsCount(tss []prompb.TimeSeries) int {
|
||||
}
|
||||
return rowsCount
|
||||
}
|
||||
|
||||
func newMapFromStrings(a []string) map[string]struct{} {
|
||||
m := make(map[string]struct{}, len(a))
|
||||
for _, s := range a {
|
||||
m[s] = struct{}{}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func getMaxHourlySeries() int {
|
||||
limit := *maxHourlySeries
|
||||
if limit == -1 || limit > math.MaxInt32 {
|
||||
|
||||
@@ -145,10 +145,10 @@ func TestRuleValidate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGroupValidate_Failure(t *testing.T) {
|
||||
f := func(group *Group, validateExpressions bool, errStrExpected string) {
|
||||
f := func(data []byte, validateExpressions bool, errStrExpected string) {
|
||||
t.Helper()
|
||||
|
||||
err := group.Validate(nil, validateExpressions)
|
||||
_, err := parse(map[string][]byte{"test.yaml": data}, nil, validateExpressions)
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
@@ -158,275 +158,238 @@ func TestGroupValidate_Failure(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
f(&Group{}, false, "group name must be set")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: ""
|
||||
`), false, "group name must be set")
|
||||
|
||||
f(&Group{
|
||||
Name: "both record and alert are not set",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Expr: "sum(up == 0 ) by (host)",
|
||||
For: promutil.NewDuration(10 * time.Millisecond),
|
||||
},
|
||||
{
|
||||
Expr: "sumSeries(time('foo.bar',10))",
|
||||
},
|
||||
},
|
||||
}, false, "invalid rule")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: both record and alert are not set
|
||||
rules:
|
||||
- expr: "sum(up == 0 ) by (host)"
|
||||
for: 10ms
|
||||
- expr: "sumSeries(time('foo.bar',10))"
|
||||
`), false, "invalid rule")
|
||||
|
||||
f(&Group{
|
||||
Name: "negative interval",
|
||||
Interval: promutil.NewDuration(-1),
|
||||
}, false, "interval shouldn't be lower than 0")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: negative interval
|
||||
interval: -1ms
|
||||
`), false, "interval shouldn't be lower than 0")
|
||||
|
||||
f(&Group{
|
||||
Name: "too big eval_offset",
|
||||
Interval: promutil.NewDuration(time.Minute),
|
||||
EvalOffset: promutil.NewDuration(2 * time.Minute),
|
||||
}, false, "eval_offset should be smaller than interval")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: too big eval_offset
|
||||
interval: 1m
|
||||
eval_offset: 2m
|
||||
`), false, "eval_offset should be smaller than interval")
|
||||
|
||||
f(&Group{
|
||||
Name: "too big negative eval_offset",
|
||||
Interval: promutil.NewDuration(time.Minute),
|
||||
EvalOffset: promutil.NewDuration(-2 * time.Minute),
|
||||
}, false, "eval_offset should be smaller than interval")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: too big negative eval_offset
|
||||
interval: 1m
|
||||
eval_offset: -2m
|
||||
`), false, "eval_offset should be smaller than interval")
|
||||
|
||||
limit := -1
|
||||
f(&Group{
|
||||
Name: "wrong limit",
|
||||
Limit: &limit,
|
||||
}, false, "invalid limit")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: wrong limit
|
||||
limit: -1
|
||||
`), false, "invalid limit")
|
||||
|
||||
f(&Group{
|
||||
Name: "wrong concurrency",
|
||||
Concurrency: -1,
|
||||
}, false, "invalid concurrency")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: wrong concurrency
|
||||
concurrency: -1
|
||||
`), false, "invalid concurrency")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Alert: "alert",
|
||||
Expr: "up == 1",
|
||||
},
|
||||
{
|
||||
Alert: "alert",
|
||||
Expr: "up == 1",
|
||||
},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
`), false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
`), false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{Record: "record", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
{Record: "record", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- record: record
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
- record: record
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
`), false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"description": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{Record: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test thanos",
|
||||
Type: NewRawType("thanos"),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"description": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, true, "unknown datasource type")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test thanos
|
||||
type: thanos
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
description: "{{ value|query }}"
|
||||
`), true, "unknown datasource type")
|
||||
|
||||
// validate expressions
|
||||
f(&Group{
|
||||
Name: "test prometheus expr",
|
||||
Type: NewPrometheusType(),
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "record",
|
||||
Expr: "up | 0",
|
||||
},
|
||||
},
|
||||
}, true, "bad MetricsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test prometheus expr
|
||||
type: prometheus
|
||||
rules:
|
||||
- record: record
|
||||
expr: "up | 0"
|
||||
`), true, "bad MetricsQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test graphite expr",
|
||||
Type: NewGraphiteType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"description": "some-description",
|
||||
}},
|
||||
},
|
||||
}, true, "bad GraphiteQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test graphite expr
|
||||
type: graphite
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
description: some-description
|
||||
`), true, "bad GraphiteQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test vlogs expr",
|
||||
Type: NewVLogsType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "stats count(*) as requests"},
|
||||
},
|
||||
}, true, "bad LogsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test vlogs expr
|
||||
type: vlogs
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: "stats count(*) as requests"
|
||||
`), true, "bad LogsQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test vlogs expr",
|
||||
Type: NewVLogsType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "_time: 1m | stats by (path, _time: 1m) count(*) as requests"},
|
||||
},
|
||||
}, true, "bad LogsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test vlogs expr multipart
|
||||
type: vlogs
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: "_time: 1m | stats by (path, _time: 1m) count(*) as requests"
|
||||
`), true, "bad LogsQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test graphite with prometheus expr",
|
||||
Type: NewGraphiteType(),
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "r1",
|
||||
ID: 1,
|
||||
Expr: "sumSeries(time('foo.bar',10))",
|
||||
For: promutil.NewDuration(10 * time.Millisecond),
|
||||
},
|
||||
{
|
||||
Record: "r2",
|
||||
ID: 2,
|
||||
Expr: "sum(up == 0 ) by (host)",
|
||||
},
|
||||
},
|
||||
}, true, "bad GraphiteQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test graphite with prometheus expr
|
||||
type: graphite
|
||||
rules:
|
||||
- record: r1
|
||||
expr: "sumSeries(time('foo.bar',10))"
|
||||
for: 10ms
|
||||
- record: r2
|
||||
expr: "sum(up == 0 ) by (host)"
|
||||
`), true, "bad GraphiteQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test vlogs with prometheus exp",
|
||||
Type: NewVLogsType(),
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "r1",
|
||||
Expr: "sum(up == 0 ) by (host)",
|
||||
For: promutil.NewDuration(10 * time.Millisecond),
|
||||
},
|
||||
},
|
||||
}, true, "bad LogsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test vlogs with prometheus expr
|
||||
type: vlogs
|
||||
rules:
|
||||
- record: r1
|
||||
expr: "sum(up == 0 ) by (host)"
|
||||
for: 10ms
|
||||
`), true, "bad LogsQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test prometheus with vlogs exp",
|
||||
Type: NewPrometheusType(),
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "r1",
|
||||
Expr: "* | stats by (path) count()",
|
||||
For: promutil.NewDuration(10 * time.Millisecond),
|
||||
},
|
||||
},
|
||||
}, true, "bad MetricsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test prometheus with vlogs expr
|
||||
type: prometheus
|
||||
rules:
|
||||
- record: r1
|
||||
expr: "* | stats by (path) count()"
|
||||
for: 10ms
|
||||
`), true, "bad MetricsQL expr")
|
||||
}
|
||||
|
||||
func TestGroupValidate_Success(t *testing.T) {
|
||||
f := func(group *Group, validateAnnotations, validateExpressions bool) {
|
||||
f := func(data []byte, validateAnnotations, validateExpressions bool) {
|
||||
t.Helper()
|
||||
|
||||
var validateTplFn ValidateTplFn
|
||||
if validateAnnotations {
|
||||
validateTplFn = notifier.ValidateTemplates
|
||||
}
|
||||
err := group.Validate(validateTplFn, validateExpressions)
|
||||
_, err := parse(map[string][]byte{"test.yaml": data}, validateTplFn, validateExpressions)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "record",
|
||||
Expr: "up | 0",
|
||||
},
|
||||
},
|
||||
}, false, false)
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- record: record
|
||||
expr: "up | 0"
|
||||
`), false, false)
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Alert: "alert",
|
||||
Expr: "up == 1",
|
||||
Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
},
|
||||
},
|
||||
},
|
||||
}, false, false)
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
`), false, false)
|
||||
|
||||
// validate annotations
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Alert: "alert",
|
||||
Expr: "up == 1",
|
||||
Labels: map[string]string{
|
||||
"summary": `
|
||||
{{ with printf "node_memory_MemTotal{job='node',instance='%s'}" "localhost" | query }}
|
||||
{{ . | first | value | humanize1024 }}B
|
||||
{{ end }}`,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, true, false)
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "\n{{ with printf \"node_memory_MemTotal{job='node',instance='%s'}\" \"localhost\" | query }}\n {{ . | first | value | humanize1024 }}B\n{{ end }}"
|
||||
`), true, false)
|
||||
|
||||
// validate expressions
|
||||
f(&Group{
|
||||
Name: "test prometheus",
|
||||
Type: NewPrometheusType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"description": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, true)
|
||||
f(&Group{
|
||||
Name: "test victorialogs",
|
||||
Type: NewVLogsType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: " _time: 1m | stats count(*) as requests", Labels: map[string]string{
|
||||
"description": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, true)
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test prometheus
|
||||
type: prometheus
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
description: "{{ value|query }}"
|
||||
`), false, true)
|
||||
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test victorialogs
|
||||
type: vlogs
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: " _time: 1m | stats count(*) as requests"
|
||||
labels:
|
||||
description: "{{ value|query }}"
|
||||
`), false, true)
|
||||
}
|
||||
|
||||
func TestHashRule_NotEqual(t *testing.T) {
|
||||
|
||||
@@ -140,6 +140,18 @@ users:
|
||||
- "ProjectID: {{.MetricsProjectID}}"
|
||||
url_prefix: "http://vminsert:8480/insert/prometheus"
|
||||
|
||||
# JWT-based routing that relies solely on custom claims.
|
||||
# The `vm_access` claim is missing, default value will be used.
|
||||
# e.g. {"role": "admin"}.
|
||||
- name: jwt-custom-claims
|
||||
jwt:
|
||||
skip_verify: true
|
||||
vm_default_access_claim:
|
||||
metrics_account_id: 1
|
||||
match_claims:
|
||||
role: admin
|
||||
url_prefix: "http://vmselect-admin:8481/select/0/prometheus"
|
||||
|
||||
# Requests without Authorization header are proxied according to `unauthorized_user` section.
|
||||
# Requests are proxied in round-robin fashion between `url_prefix` backends.
|
||||
# The deny_partial_response query arg is added to all the proxied requests.
|
||||
|
||||
@@ -65,6 +65,8 @@ type JWTConfig struct {
|
||||
MatchClaims map[string]string `yaml:"match_claims,omitempty"`
|
||||
parsedMatchClaims []*jwt.Claim
|
||||
|
||||
DefaultVMAccessClaim *jwt.VMAccessClaim `yaml:"default_vm_access_claim,omitempty"`
|
||||
|
||||
// verifierPool is used to verify JWT tokens.
|
||||
// It is initialized from PublicKeys and/or PublicKeyFiles.
|
||||
// In this case, it is initialized once at config reload and never updated until next reload
|
||||
@@ -432,7 +434,6 @@ func validateJWTPlaceholdersForURL(up *URLPrefix, isAllowed bool) error {
|
||||
}
|
||||
if strings.Contains(p, placeholderPrefix) {
|
||||
return fmt.Errorf("invalid placeholder found in URL request path: %q, supported values are: %s", bu.Path, strings.Join(allPlaceholders, ", "))
|
||||
|
||||
}
|
||||
}
|
||||
for param, values := range bu.Query() {
|
||||
@@ -487,7 +488,6 @@ func hasAnyPlaceholders(u *url.URL) bool {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -6,6 +6,7 @@ import (
|
||||
"flag"
|
||||
"fmt"
|
||||
"io"
|
||||
"math/rand/v2"
|
||||
"net"
|
||||
"net/http"
|
||||
"net/textproto"
|
||||
@@ -31,6 +32,7 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promauth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/pushmetrics"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -190,6 +192,10 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
if tkn == nil {
|
||||
logger.Panicf("BUG: unexpected nil jwt token for user %q", ui.name())
|
||||
}
|
||||
if !tkn.HasVMAccessClaim() && ui.JWT.DefaultVMAccessClaim == nil {
|
||||
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
||||
return true
|
||||
}
|
||||
defer putToken(tkn)
|
||||
processUserRequest(w, r, ui, tkn)
|
||||
return true
|
||||
@@ -202,6 +208,7 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
}
|
||||
|
||||
invalidAuthTokenRequests.Inc()
|
||||
slowdownUnauthorizedResponse(r)
|
||||
if *logInvalidAuthTokens {
|
||||
err := fmt.Errorf("cannot authorize request with auth tokens %q", ats)
|
||||
err = &httpserver.ErrorWithStatusCode{
|
||||
@@ -424,8 +431,12 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo, tkn *j
|
||||
}
|
||||
targetURL := bu.url
|
||||
if tkn != nil {
|
||||
vmac := tkn.VMAccess()
|
||||
if !tkn.HasVMAccessClaim() {
|
||||
vmac = ui.JWT.DefaultVMAccessClaim
|
||||
}
|
||||
// for security reasons allow templating only for configured url values and headers
|
||||
targetURL, hc = replaceJWTPlaceholders(bu, hc, tkn.VMAccess())
|
||||
targetURL, hc = replaceJWTPlaceholders(bu, hc, vmac)
|
||||
}
|
||||
if isDefault {
|
||||
// Don't change path and add request_path query param for default route.
|
||||
@@ -881,3 +892,18 @@ func debugInfo(u *url.URL, r *http.Request) string {
|
||||
fmt.Fprint(s, ")")
|
||||
return s.String()
|
||||
}
|
||||
|
||||
// SlowdownUnauthorizedResponse adds a random delay in the [2..3] seconds range
|
||||
// before returning an unauthorized response.
|
||||
// This reduces the effectiveness of brute-force.
|
||||
func slowdownUnauthorizedResponse(r *http.Request) {
|
||||
|
||||
d := 2*time.Second + time.Duration(rand.IntN(1000))*time.Millisecond
|
||||
t := timerpool.Get(d)
|
||||
|
||||
select {
|
||||
case <-t.C:
|
||||
case <-r.Context().Done():
|
||||
}
|
||||
timerpool.Put(t)
|
||||
}
|
||||
|
||||
@@ -739,6 +739,12 @@ users:
|
||||
"vm_access": map[string]any{},
|
||||
}, false)
|
||||
|
||||
// token without vm_access claim, but with a custom claim usable for routing
|
||||
roleToken := genToken(t, map[string]any{
|
||||
"exp": time.Now().Add(10 * time.Minute).Unix(),
|
||||
"role": "admin",
|
||||
}, true)
|
||||
|
||||
fullToken := genToken(t, map[string]any{
|
||||
"exp": time.Now().Add(10 * time.Minute).Unix(),
|
||||
"vm_access": map[string]any{
|
||||
@@ -779,6 +785,26 @@ statusCode=401
|
||||
Unauthorized`
|
||||
f(simpleCfgStr, request, responseExpected)
|
||||
|
||||
// token without vm_access claim is accepted when it
|
||||
request = httptest.NewRequest(`GET`, "http://some-host.com/abc", nil)
|
||||
request.Header.Set(`Authorization`, `Bearer `+roleToken)
|
||||
responseExpected = `
|
||||
statusCode=200
|
||||
path: /foo/abc
|
||||
query:
|
||||
headers:`
|
||||
f(fmt.Sprintf(`
|
||||
users:
|
||||
- jwt:
|
||||
public_keys:
|
||||
- %q
|
||||
default_vm_access_claim:
|
||||
metrics_account_id: 10
|
||||
metrics_project_id: 10
|
||||
match_claims:
|
||||
role: admin
|
||||
url_prefix: {BACKEND}/foo`, string(publicKeyPEM)), request, responseExpected)
|
||||
|
||||
// expired token
|
||||
request = httptest.NewRequest(`GET`, "http://some-host.com/abc", nil)
|
||||
request.Header.Set(`Authorization`, `Bearer `+expiredToken)
|
||||
|
||||
@@ -956,6 +956,7 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
|
||||
start, end, step int64, r *http.Request, ct int64, etfs [][]storage.TagFilter) error {
|
||||
deadline := searchutil.GetDeadlineForQuery(r, startTime)
|
||||
mayCache := !httputil.GetBool(r, "nocache")
|
||||
optimizeRepeatedBinaryOpSubexprs := httputil.GetBool(r, "optimize_repeated_binary_op_subexprs")
|
||||
lookbackDelta, err := getMaxLookback(r)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -977,18 +978,19 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
|
||||
}
|
||||
|
||||
ec := &promql.EvalConfig{
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: 0, // let vmstorage use maxUniqueTimeseries by default
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
MayCache: mayCache,
|
||||
LookbackDelta: lookbackDelta,
|
||||
RoundDigits: getRoundDigits(r),
|
||||
EnforcedTagFilterss: etfs,
|
||||
CacheTagFilters: etfs,
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: 0, // let vmstorage use maxUniqueTimeseries by default
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
MayCache: mayCache,
|
||||
OptimizeRepeatedBinaryOpSubexprs: optimizeRepeatedBinaryOpSubexprs,
|
||||
LookbackDelta: lookbackDelta,
|
||||
RoundDigits: getRoundDigits(r),
|
||||
EnforcedTagFilterss: etfs,
|
||||
CacheTagFilters: etfs,
|
||||
GetRequestURI: func() string {
|
||||
return httpserver.GetRequestURI(r)
|
||||
},
|
||||
|
||||
@@ -132,6 +132,9 @@ type EvalConfig struct {
|
||||
// Whether the response can be cached.
|
||||
MayCache bool
|
||||
|
||||
// Whether repeated cacheable binary op subexpressions can be optimized.
|
||||
OptimizeRepeatedBinaryOpSubexprs bool
|
||||
|
||||
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
|
||||
LookbackDelta int64
|
||||
|
||||
@@ -171,6 +174,7 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig {
|
||||
ec.MaxPointsPerSeries = src.MaxPointsPerSeries
|
||||
ec.Deadline = src.Deadline
|
||||
ec.MayCache = src.MayCache
|
||||
ec.OptimizeRepeatedBinaryOpSubexprs = src.OptimizeRepeatedBinaryOpSubexprs
|
||||
ec.LookbackDelta = src.LookbackDelta
|
||||
ec.RoundDigits = src.RoundDigits
|
||||
ec.EnforcedTagFilterss = src.EnforcedTagFilterss
|
||||
@@ -467,7 +471,8 @@ func isAggrFuncWithoutGrouping(e metricsql.Expr) bool {
|
||||
}
|
||||
|
||||
func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
|
||||
if !canPushdownCommonFilters(be) {
|
||||
canPushdown := canPushdownCommonFilters(be)
|
||||
if !canPushdown && !shouldOptimizeRepeatedBinaryOpSubexprs(ec, exprFirst, exprSecond) {
|
||||
// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
|
||||
// from exprFirst to exprSecond.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
|
||||
@@ -500,6 +505,25 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
|
||||
}
|
||||
return tssFirst, tssSecond, nil
|
||||
}
|
||||
if !canPushdown {
|
||||
qt = qt.NewChild("execute left and right sides of %q sequentially because repeated cacheable subexpression was found", be.Op)
|
||||
defer qt.Done()
|
||||
|
||||
qtFirst := qt.NewChild("expr1")
|
||||
tssFirst, err := evalExpr(qtFirst, ec, exprFirst)
|
||||
qtFirst.Done()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
qtSecond := qt.NewChild("expr2")
|
||||
tssSecond, err := evalExpr(qtSecond, ec, exprSecond)
|
||||
qtSecond.Done()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return tssFirst, tssSecond, nil
|
||||
}
|
||||
|
||||
// Execute binary operation in the following way:
|
||||
//
|
||||
@@ -544,6 +568,78 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
|
||||
return tssFirst, tssSecond, nil
|
||||
}
|
||||
|
||||
func shouldOptimizeRepeatedBinaryOpSubexprs(ec *EvalConfig, exprFirst, exprSecond metricsql.Expr) bool {
|
||||
if !ec.OptimizeRepeatedBinaryOpSubexprs {
|
||||
return false
|
||||
}
|
||||
if ec.Start == ec.End {
|
||||
return false
|
||||
}
|
||||
if !ec.mayCache() {
|
||||
return false
|
||||
}
|
||||
|
||||
candidatesFirst := make(map[string]struct{}, 1)
|
||||
var b []byte
|
||||
visitOptimizedAggrs(exprFirst, func(ae *metricsql.AggrFuncExpr) {
|
||||
if hasUnseededVolatileFunc(ae) {
|
||||
return
|
||||
}
|
||||
b = ae.AppendString(b[:0])
|
||||
candidatesFirst[string(b)] = struct{}{}
|
||||
})
|
||||
if len(candidatesFirst) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
repeated := false
|
||||
visitOptimizedAggrs(exprSecond, func(ae *metricsql.AggrFuncExpr) {
|
||||
if repeated {
|
||||
return
|
||||
}
|
||||
b = ae.AppendString(b[:0])
|
||||
_, repeated = candidatesFirst[string(b)]
|
||||
})
|
||||
return repeated
|
||||
}
|
||||
|
||||
func visitOptimizedAggrs(e metricsql.Expr, f func(ae *metricsql.AggrFuncExpr)) {
|
||||
metricsql.VisitAll(e, func(expr metricsql.Expr) {
|
||||
ae, ok := expr.(*metricsql.AggrFuncExpr)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if getIncrementalAggrFuncCallbacks(ae.Name) == nil {
|
||||
return
|
||||
}
|
||||
fe, _ := tryGetArgRollupFuncWithMetricExpr(ae)
|
||||
if fe == nil {
|
||||
return
|
||||
}
|
||||
f(ae)
|
||||
})
|
||||
}
|
||||
|
||||
func hasUnseededVolatileFunc(e metricsql.Expr) bool {
|
||||
found := false
|
||||
metricsql.VisitAll(e, func(expr metricsql.Expr) {
|
||||
if found {
|
||||
return
|
||||
}
|
||||
fe, ok := expr.(*metricsql.FuncExpr)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
switch strings.ToLower(fe.Name) {
|
||||
case "now":
|
||||
found = true
|
||||
case "rand", "rand_normal", "rand_exponential":
|
||||
found = len(fe.Args) == 0
|
||||
}
|
||||
})
|
||||
return found
|
||||
}
|
||||
|
||||
func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
|
||||
if len(tss) == 0 {
|
||||
return nil
|
||||
|
||||
@@ -170,3 +170,87 @@ func TestGetSumInstantValues(t *testing.T) {
|
||||
[]*timeseries{ts("foo", 100, 1)},
|
||||
)
|
||||
}
|
||||
|
||||
func TestShouldOptimizeRepeatedBinaryOpSubexprsGate(t *testing.T) {
|
||||
e, err := metricsql.Parse(`count(count(vm_requests_total) by (action,addr,cluster,endpoint)) by (action,addr,cluster) / count(count(vm_requests_total) by (action,addr,cluster,endpoint))`)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error in metricsql.Parse(): %s", err)
|
||||
}
|
||||
be, ok := e.(*metricsql.BinaryOpExpr)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected expr type; got %T; want *metricsql.BinaryOpExpr", e)
|
||||
}
|
||||
|
||||
f := func(name string, ec *EvalConfig, resultExpected bool) {
|
||||
t.Helper()
|
||||
result := shouldOptimizeRepeatedBinaryOpSubexprs(ec, be.Left, be.Right)
|
||||
if result != resultExpected {
|
||||
t.Fatalf("unexpected result for %q; got %v; want %v", name, result, resultExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f("disabled optimization", &EvalConfig{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Step: 1000,
|
||||
}, false)
|
||||
f("disabled cache", &EvalConfig{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Step: 1000,
|
||||
OptimizeRepeatedBinaryOpSubexprs: true,
|
||||
}, false)
|
||||
f("instant query", &EvalConfig{
|
||||
Start: 1000,
|
||||
End: 1000,
|
||||
Step: 1000,
|
||||
MayCache: true,
|
||||
OptimizeRepeatedBinaryOpSubexprs: true,
|
||||
}, false)
|
||||
f("repeated cacheable aggregate subexpression", &EvalConfig{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Step: 1000,
|
||||
MayCache: true,
|
||||
OptimizeRepeatedBinaryOpSubexprs: true,
|
||||
}, true)
|
||||
f("unaligned range query", &EvalConfig{
|
||||
Start: 1001,
|
||||
End: 2000,
|
||||
Step: 1000,
|
||||
MayCache: true,
|
||||
OptimizeRepeatedBinaryOpSubexprs: true,
|
||||
}, false)
|
||||
}
|
||||
|
||||
func TestShouldOptimizeRepeatedBinaryOpSubexprsExpressions(t *testing.T) {
|
||||
f := func(name, q string, resultExpected bool) {
|
||||
t.Helper()
|
||||
e, err := metricsql.Parse(q)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error in metricsql.Parse(%q) for %q: %s", q, name, err)
|
||||
}
|
||||
be, ok := e.(*metricsql.BinaryOpExpr)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected expr type for %q; got %T; want *metricsql.BinaryOpExpr", name, e)
|
||||
}
|
||||
ec := &EvalConfig{Start: 1000, End: 2000, Step: 1000, MayCache: true, OptimizeRepeatedBinaryOpSubexprs: true}
|
||||
result := shouldOptimizeRepeatedBinaryOpSubexprs(ec, be.Left, be.Right)
|
||||
if result != resultExpected {
|
||||
t.Fatalf("unexpected result for %q; got %v; want %v; query: %q", name, result, resultExpected, q)
|
||||
}
|
||||
}
|
||||
|
||||
f("original issue query", `count(count(vm_requests_total) by (action,addr,cluster,endpoint)) by (action,addr,cluster) / count(count(vm_requests_total) by (action,addr,cluster,endpoint))`, true)
|
||||
f("right side contains repeated count aggregate", `count(foo) by (job) / (count(foo) by (job) + 1)`, true)
|
||||
f("same sum aggregate", `sum(rate(foo[5m])) by (job) / sum(rate(foo[5m])) by (job)`, true)
|
||||
f("same inner rollup but different aggregates", `sum(rate(foo[5m])) by (job) / count(rate(foo[5m])) by (job)`, false)
|
||||
f("different count aggregates", `count(foo) by (job) / count(bar) by (job)`, false)
|
||||
f("bare metric selector", `foo / foo`, false)
|
||||
f("bare rollup function", `rate(a[5m]) / rate(a[5m])`, false)
|
||||
f("now at modifier", `sum(rate(foo[5m] @ now())) by (job) / sum(rate(foo[5m] @ now())) by (job)`, false)
|
||||
f("unseeded rand at modifier", `sum(rate(foo[5m] @ rand())) by (job) / sum(rate(foo[5m] @ rand())) by (job)`, false)
|
||||
f("unseeded rand_normal at modifier", `sum(rate(foo[5m] @ rand_normal())) by (job) / sum(rate(foo[5m] @ rand_normal())) by (job)`, false)
|
||||
f("unseeded rand_exponential at modifier", `sum(rate(foo[5m] @ rand_exponential())) by (job) / sum(rate(foo[5m] @ rand_exponential())) by (job)`, false)
|
||||
f("seeded rand at modifier", `sum(rate(foo[5m] @ rand(1))) by (job) / sum(rate(foo[5m] @ rand(1))) by (job)`, true)
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ COPY web/ /build/
|
||||
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o web-amd64 github.com/VictoriMetrics/vmui/ && \
|
||||
GOOS=windows GOARCH=amd64 CGO_ENABLED=0 go build -o web-windows github.com/VictoriMetrics/vmui/
|
||||
|
||||
FROM alpine:3.23.4
|
||||
FROM alpine:3.24.1
|
||||
USER root
|
||||
|
||||
COPY --from=build-web-stage /build/web-amd64 /app/web
|
||||
|
||||
@@ -3,9 +3,9 @@
|
||||
DOCKER_REGISTRIES ?= docker.io quay.io
|
||||
DOCKER_NAMESPACE ?= victoriametrics
|
||||
|
||||
ROOT_IMAGE ?= alpine:3.23.4
|
||||
ROOT_IMAGE ?= alpine:3.24.1
|
||||
ROOT_IMAGE_SCRATCH ?= scratch
|
||||
CERTS_IMAGE := alpine:3.23.4
|
||||
CERTS_IMAGE := alpine:3.24.1
|
||||
|
||||
GO_BUILDER_IMAGE := golang:1.26.4
|
||||
|
||||
|
||||
@@ -17,11 +17,11 @@ Please find the changelog for VictoriaMetrics Anomaly Detection below.
|
||||
## v1.29.7
|
||||
Released: 2026-06-25
|
||||
|
||||
- UI: updated [vmanomaly UI](https://docs.victoriametrics.com/anomaly-detection/ui/) from [v1.7.1](https://docs.victoriametrics.com/anomaly-detection/ui/#v171) to [v1.7.2](https://docs.victoriametrics.com/anomaly-detection/ui/#v172), see respective [release notes](https://docs.victoriametrics.com/anomaly-detection/ui/#v172) for details.
|
||||
- UI: updated [vmanomaly UI](https://docs.victoriametrics.com/anomaly-detection/ui/) from [v1.7.1](https://docs.victoriametrics.com/anomaly-detection/ui/#v171) to [v1.7.2](https://docs.victoriametrics.com/anomaly-detection/ui/#v172), see respective [release notes](https://docs.victoriametrics.com/anomaly-detection/ui/#v172) for details. Notable mentions include `api/v1/server/model` endpoint for accessing production models config and queries from UI, manually or through [AI assistant](https://docs.victoriametrics.com/anomaly-detection/ui/#ai-assistance).
|
||||
|
||||
- IMPROVEMENT: Increased high-cardinality inference scaling by optionally scattering periodic infer jobs to reduce contention on shared resources (e.g. datasource, CPU, RAM) when `settings.n_workers > 1` and `scheduler.infer_every` is smaller than the total time to fetch and process all queries. This is controlled by new `scatter_infer_jobs` boolean argument of [Periodic Scheduler](https://docs.victoriametrics.com/anomaly-detection/components/scheduler/#parameters-1) (default: `false`).
|
||||
|
||||
- IMPROVEMENT: Optimized internal batching for reader post-fetch series processing, exposing reader processing queue depth, and clarifying inference skip logs after data fetch timeouts.
|
||||
- IMPROVEMENT: Optimized internal batching for reader post-fetch series processing, exposing reader processing queue depth (`vmanomaly_reader_processing_tasks_queued` [metric](https://docs.victoriametrics.com/anomaly-detection/components/monitoring/#reader-behaviour-metrics)), and clarifying inference skip logs after data fetch timeouts. See `series_processing_batch_size` argument of [VmReader](https://docs.victoriametrics.com/anomaly-detection/components/reader/#vm-reader) and [VLogsReader](https://docs.victoriametrics.com/anomaly-detection/components/reader/#victorialogs-reader) for details.
|
||||
|
||||
- IMPROVEMENT: Refined `VmReader` and `VLogsReader` logging after datasource request failures by suppressing the follow-up generic "No data" or "No unseen data" warning for failed fetches. Failed requests now keep the original datasource error while empty successful responses still emit the no-data warning.
|
||||
|
||||
|
||||
@@ -893,6 +893,19 @@ If a path to a CA bundle file (like `ca.crt`), it will verify the certificate us
|
||||
(Optional) Password for authentication. If set, it will be used to authenticate the request.
|
||||
</td>
|
||||
</tr>
|
||||
<tr>
|
||||
<td>
|
||||
|
||||
<span style="white-space: nowrap;">`series_processing_batch_size`</span>
|
||||
</td>
|
||||
<td>
|
||||
|
||||
`8`
|
||||
</td>
|
||||
<td>
|
||||
Optional argument {{% available_from "v1.29.7" anomaly %}}, allows specifying the number of time series to process together while preparing data for fit or infer stages. Defaults to `8`. Suggested values are 4-16 for high-cardinality queries.
|
||||
</td>
|
||||
</tr>
|
||||
</tbody>
|
||||
</table>
|
||||
|
||||
@@ -911,6 +924,7 @@ reader:
|
||||
# tenant_id: '0:0' # for cluster version only
|
||||
sampling_period: '1m'
|
||||
max_points_per_query: 10000
|
||||
series_processing_batch_size: 8
|
||||
data_range: [0, 'inf'] # reader-level
|
||||
offset: '0s' # reader-level
|
||||
timeout: '30s'
|
||||
|
||||
@@ -85,6 +85,21 @@ Pull requests requirements:
|
||||
|
||||
See a good example of a [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/6487).
|
||||
|
||||
## AI policy
|
||||
|
||||
You are free to use any AI tools when working on a contribution, on code,
|
||||
documentation, issues, or anything else. You do not need to disclose whether or
|
||||
how you used them.
|
||||
|
||||
With or without the help of AI, you are responsible for the changes you submit.
|
||||
Take the effort to understand the code base and every change in your pull request,
|
||||
and clean up any AI slop before sending it. Do not use AI to automate your
|
||||
responses to maintainers.
|
||||
|
||||
We review contributions on their quality, regardless of how they were produced. A
|
||||
pull request or issue that looks like unreviewed AI output, with low-quality or
|
||||
broken changes, may be closed without a detailed review or triage.
|
||||
|
||||
## Merging Pull Request
|
||||
|
||||
The person who merges the Pull Request is responsible for satisfying the requirements below:
|
||||
|
||||
@@ -245,7 +245,7 @@ The following steps must be performed during the upgrade / downgrade procedure:
|
||||
* Wait until the process stops. This can take a few seconds.
|
||||
* Start the upgraded VictoriaMetrics.
|
||||
|
||||
Prometheus doesn't drop data during VictoriaMetrics restart. See [this article](https://grafana.com/blog/2019/03/25/whats-new-in-prometheus-2.8-wal-based-remote-write/) for details. The same applies also to [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/).
|
||||
Prometheus doesn't drop data during VictoriaMetrics restart. See [this article](https://grafana.com/blog/whats-new-in-prometheus-2-8-wal-based-remote-write/) for details. The same applies also to [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/).
|
||||
|
||||
> If you'd prefer not to manage upgrades yourself, [VictoriaMetrics Cloud](https://console.victoriametrics.cloud/signUp?utm_source=website&utm_campaign=docs_vm_single_upgrade)
|
||||
> performs version upgrades automatically during maintenance windows with no action required on your part.
|
||||
@@ -417,7 +417,7 @@ VictoriaMetrics is configured via command-line flags, so it must be restarted wh
|
||||
* Wait until the process stops. This can take a few seconds.
|
||||
* Start VictoriaMetrics with the new command-line flags.
|
||||
|
||||
Prometheus doesn't drop data during VictoriaMetrics restart. See [this article](https://grafana.com/blog/2019/03/25/whats-new-in-prometheus-2.8-wal-based-remote-write/) for details. The same applies also to [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/).
|
||||
Prometheus doesn't drop data during VictoriaMetrics restart. See [this article](https://grafana.com/blog/whats-new-in-prometheus-2-8-wal-based-remote-write/) for details. The same applies also to [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/).
|
||||
|
||||
## How to scrape Prometheus exporters such as [node-exporter](https://github.com/prometheus/node_exporter)
|
||||
|
||||
@@ -478,6 +478,11 @@ and [/api/v1/query_range](https://docs.victoriametrics.com/victoriametrics/keyco
|
||||
to the given number of digits after the decimal point.
|
||||
For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point.
|
||||
|
||||
VictoriaMetrics accepts `optimize_repeated_binary_op_subexprs=1` query arg for [/api/v1/query_range](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#range-query)
|
||||
handler. It allows `vmselect` to execute left and right sides of binary operators sequentially when they contain the same
|
||||
optimized aggregate rollup result expression, so the second side may reuse the rollup result cache populated by the first side.
|
||||
The optimization is disabled by default and applies only when rollup result cache can be used for the request.
|
||||
|
||||
VictoriaMetrics accepts `limit` query arg for [/api/v1/labels](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labels)
|
||||
and [`/api/v1/label/<labelName>/values`](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labelvalues) handlers for limiting the number of returned entries.
|
||||
For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels.
|
||||
@@ -1712,64 +1717,63 @@ The following versions of VictoriaMetrics receive regular security fixes:
|
||||
| [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/) | ✅ |
|
||||
| other releases | ❌ |
|
||||
|
||||
### Software Bill of Materials (SBOM)
|
||||
|
||||
Every VictoriaMetrics container{{% available_from "v1.137.0" %}} image published to
|
||||
[Docker Hub](https://hub.docker.com/u/victoriametrics) and [Quay.io](https://quay.io/organization/victoriametrics) include an [SPDX](https://spdx.dev/) SBOM attestation generated automatically by BuildKit during `docker buildx build`.
|
||||
|
||||
To inspect the SBOM for an image:
|
||||
|
||||
```sh
|
||||
docker buildx imagetools inspect \
|
||||
docker.io/victoriametrics/victoria-metrics:latest \
|
||||
--format "{{ json .SBOM }}"
|
||||
```
|
||||
|
||||
To scan an image using its SBOM attestation with [Trivy](https://github.com/aquasecurity/trivy):
|
||||
|
||||
```sh
|
||||
trivy image --sbom-sources oci \
|
||||
docker.io/victoriametrics/victoria-metrics:latest
|
||||
```
|
||||
|
||||
### Reporting a Vulnerability
|
||||
|
||||
Please report any security issues to <security@victoriametrics.com>
|
||||
|
||||
### CVE handling policy
|
||||
|
||||
**Source code:** Go dependencies are scanned by [govulncheck](https://pkg.go.dev/golang.org/x/vuln/cmd/govulncheck) in CI.
|
||||
All vulnerabilities must be fixed before the next scheduled release and backported to [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
|
||||
|
||||
**Docker images:** CVE findings in the [Alpine](https://security.alpinelinux.org/) base image pose minimal risk since VictoriaMetrics binaries are statically compiled with no OS dependencies.
|
||||
When detected, only the Alpine base tag is updated.
|
||||
Releases proceed as planned even if upstream fixes are not yet available.
|
||||
For maximum security, hardened [scratch](https://hub.docker.com/_/scratch)-based images are also provided.
|
||||
All images are continuously scanned by Docker Hub and verified before release using [grype](https://github.com/anchore/grype).
|
||||
|
||||
### General security recommendations:
|
||||
|
||||
* All the VictoriaMetrics components must run in protected private networks without direct access from untrusted networks such as Internet.
|
||||
* All VictoriaMetrics components must run in protected private networks without direct access from untrusted networks such as the Internet.
|
||||
The exception is [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/) and [vmgateway](https://docs.victoriametrics.com/victoriametrics/vmgateway/),
|
||||
which are intended for serving public requests and performing authorization with [TLS termination](https://en.wikipedia.org/wiki/TLS_termination_proxy).
|
||||
* All the requests from untrusted networks to VictoriaMetrics components must go through auth proxy such as [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/)
|
||||
* All the requests from untrusted networks to VictoriaMetrics components must go through an auth proxy, such as [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/)
|
||||
or [vmgateway](https://docs.victoriametrics.com/victoriametrics/vmgateway/). The proxy must be set up with proper authentication and authorization.
|
||||
* Prefer using lists of allowed API endpoints, while disallowing access to other endpoints when configuring [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/)
|
||||
in front of VictoriaMetrics components.
|
||||
* Set reasonable [`Strict-Transport-Security`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Strict-Transport-Security) header value to all the components to mitigate [MitM attacks](https://en.wikipedia.org/wiki/Man-in-the-middle_attack), for example: `max-age=31536000; includeSubDomains`. See `-http.header.hsts` flag.
|
||||
* Set a reasonable [`Strict-Transport-Security`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/Strict-Transport-Security) header value on all the components to mitigate [MitM attacks](https://en.wikipedia.org/wiki/Man-in-the-middle_attack), for example: `max-age=31536000; includeSubDomains`. See `-http.header.hsts` flag.
|
||||
* Set reasonable [`Content-Security-Policy`](https://developer.mozilla.org/en-US/docs/Web/HTTP/CSP) header value to mitigate [XSS attacks](https://en.wikipedia.org/wiki/Cross-site_scripting). See `-http.header.csp` flag.
|
||||
* Set reasonable [`X-Frame-Options`](https://developer.mozilla.org/en-US/docs/Web/HTTP/Headers/X-Frame-Options) header value to mitigate [clickjacking attacks](https://en.wikipedia.org/wiki/Clickjacking), for example `DENY`. See `-http.header.frameOptions` flag.
|
||||
|
||||
VictoriaMetrics provides the following security-related command-line flags:
|
||||
The following security-related command-line flags are available for all components with HTTP API:
|
||||
|
||||
* `-tls`, `-tlsCertFile` and `-tlsKeyFile` for switching from HTTP to HTTPS at `-httpListenAddr` (TCP port 8428 is listened by default).
|
||||
* `-tls`, `-tlsCertFile` and `-tlsKeyFile` for switching from HTTP to HTTPS at `-httpListenAddr`.
|
||||
[Enterprise version of VictoriaMetrics](https://docs.victoriametrics.com/victoriametrics/enterprise/) supports automatic issuing of TLS certificates.
|
||||
See [these docs](#automatic-issuing-of-tls-certificates).
|
||||
* `-mtls` and `-mtlsCAFile` for enabling [mTLS](https://en.wikipedia.org/wiki/Mutual_authentication) for requests to `-httpListenAddr`. See [these docs](#mtls-protection).
|
||||
* `-httpAuth.username` and `-httpAuth.password` for protecting all the HTTP endpoints
|
||||
with [HTTP Basic Authentication](https://en.wikipedia.org/wiki/Basic_access_authentication).
|
||||
* `-http.header.hsts`, `-http.header.csp`, and `-http.header.frameOptions` for serving `Strict-Transport-Security`, `Content-Security-Policy`
|
||||
and `X-Frame-Options` HTTP response headers.
|
||||
|
||||
### Protecting service endpoints
|
||||
|
||||
All VictoriaMetrics components expose internal metrics in Prometheus exposition format at the `/metrics` page for [#Monitoring](https://docs.victoriametrics.com/victoriametrics/#monitoring).
|
||||
Consider limiting access to the `/metrics` page to trusted networks only.
|
||||
|
||||
The following service endpoints may require protection:
|
||||
|
||||
* `-deleteAuthKey` for protecting the `/api/v1/admin/tsdb/delete_series` endpoint. See [how to delete time series](#how-to-delete-time-series).
|
||||
* `-snapshotAuthKey` for protecting the `/snapshot*` endpoints. See [how to work with snapshots](#how-to-work-with-snapshots).
|
||||
* `-forceFlushAuthKey` for protecting the `/internal/force_flush` endpoint. See [force flush docs](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#forced-flush).
|
||||
* `-forceMergeAuthKey` for protecting the `/internal/force_merge` endpoint. See [force merge docs](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#forced-merge).
|
||||
* `-search.resetCacheAuthKey` for protecting the `/internal/resetRollupResultCache` endpoint. See [backfilling](#backfilling) for more details.
|
||||
* `-reloadAuthKey` for protecting the `/-/reload` endpoint, which is used for force reloading of [`-promscrape.config`](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
|
||||
* `-reloadAuthKey` for protecting the `/-/reload` endpoint, which is used to force reload the [`-promscrape.config`](#how-to-scrape-prometheus-exporters-such-as-node-exporter).
|
||||
* `-configAuthKey` for protecting the `/config` endpoint, since it may contain sensitive information such as passwords.
|
||||
* `-flagsAuthKey` for protecting the `/flags` endpoint.
|
||||
* `-pprofAuthKey` for protecting the `/debug/pprof/*` endpoints, which can be used for [profiling](#profiling).
|
||||
* `-metricNamesStatsResetAuthKey` for protecting the `/api/v1/admin/status/metric_names_stats/reset` endpoint, used for [Metric Names Tracker](#track-ingested-metrics-usage).
|
||||
* `-denyQueryTracing` for disallowing [query tracing](#query-tracing).
|
||||
* `-http.header.hsts`, `-http.header.csp`, and `-http.header.frameOptions` for serving `Strict-Transport-Security`, `Content-Security-Policy`
|
||||
and `X-Frame-Options` HTTP response headers.
|
||||
|
||||
Explicitly set internal network interface for TCP and UDP ports for data ingestion with Graphite and OpenTSDB formats.
|
||||
For example, substitute `-graphiteListenAddr=:2003` with `-graphiteListenAddr=<internal_iface_ip>:2003`. This protects from unexpected requests from untrusted network interfaces.
|
||||
@@ -1777,17 +1781,6 @@ For example, substitute `-graphiteListenAddr=:2003` with `-graphiteListenAddr=<i
|
||||
See also [security recommendation for VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#security)
|
||||
and [the general security page at VictoriaMetrics website](https://victoriametrics.com/security/).
|
||||
|
||||
### CVE handling policy
|
||||
|
||||
**Source code:** Go dependencies are scanned by [govulncheck](https://pkg.go.dev/golang.org/x/vuln/cmd/govulncheck) in CI.
|
||||
All vulnerabilities must be fixed before next scheduled release and backported to [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-releases/).
|
||||
|
||||
**Docker images:** CVE findings in [Alpine](https://security.alpinelinux.org/) base image pose minimal risk since VictoriaMetrics binaries are statically compiled with no OS dependencies.
|
||||
When detected, only the Alpine base tag is updated.
|
||||
Releases proceed as planned even if upstream fixes are not yet available.
|
||||
For maximum security, hardened [scratch](https://hub.docker.com/_/scratch)-based images are also provided.
|
||||
All images are continuously scanned by Docker Hub and verified before release using [grype](https://github.com/anchore/grype).
|
||||
|
||||
### mTLS protection
|
||||
|
||||
By default `VictoriaMetrics` accepts http requests at `8428` port (this port can be changed via `-httpListenAddr` command-line flags).
|
||||
@@ -1817,19 +1810,39 @@ This functionality can be evaluated for free according to [these docs](https://d
|
||||
|
||||
See also [security recommendations](#security).
|
||||
|
||||
### Software Bill of Materials (SBOM)
|
||||
|
||||
Every VictoriaMetrics container{{% available_from "v1.137.0" %}} image published to
|
||||
[Docker Hub](https://hub.docker.com/u/victoriametrics) and [Quay.io](https://quay.io/organization/victoriametrics) include an [SPDX](https://spdx.dev/) SBOM attestation generated automatically by BuildKit during `docker buildx build`.
|
||||
|
||||
To inspect the SBOM for an image:
|
||||
|
||||
```sh
|
||||
docker buildx imagetools inspect \
|
||||
docker.io/victoriametrics/victoria-metrics:latest \
|
||||
--format "{{ json .SBOM }}"
|
||||
```
|
||||
|
||||
To scan an image using its SBOM attestation with [Trivy](https://github.com/aquasecurity/trivy):
|
||||
|
||||
```sh
|
||||
trivy image --sbom-sources oci \
|
||||
docker.io/victoriametrics/victoria-metrics:latest
|
||||
```
|
||||
|
||||
## Tuning
|
||||
|
||||
* No need in tuning for VictoriaMetrics - it uses reasonable defaults for command-line flags,
|
||||
* No need to tune for VictoriaMetrics - it uses reasonable defaults for command-line flags,
|
||||
which are automatically adjusted for the available CPU and RAM resources.
|
||||
* No need in tuning for Operating System - VictoriaMetrics is optimized for default OS settings.
|
||||
* No need to tune for Operating System - VictoriaMetrics is optimized for default OS settings.
|
||||
The only option is increasing the limit on [the number of open files in the OS](https://medium.com/@muhammadtriwibowo/set-permanently-ulimit-n-open-files-in-ubuntu-4d61064429a).
|
||||
The recommendation is not specific for VictoriaMetrics only but also for any service which handles many HTTP connections and stores data on disk.
|
||||
* VictoriaMetrics is a write-heavy application and its performance depends on disk performance. So be careful with other
|
||||
The recommendation is not specific to VictoriaMetrics only, but also for any service that handles many HTTP connections and stores data on disk.
|
||||
* VictoriaMetrics is a write-heavy application, and its performance depends on disk performance. So be careful with other
|
||||
applications or utilities (like [fstrim](https://manpages.ubuntu.com/manpages/lunar/en/man8/fstrim.8.html))
|
||||
which could [exhaust disk resources](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/1521).
|
||||
* The recommended filesystem is `ext4`, the recommended persistent storage is [persistent HDD-based disk on GCP](https://cloud.google.com/compute/docs/disks/#pdspecs),
|
||||
since it is protected from hardware failures via internal replication and it can be [resized on the fly](https://cloud.google.com/compute/docs/disks/add-persistent-disk#resize_pd).
|
||||
If you plan to store more than 1TB of data on `ext4` partition, then the following options are recommended to pass to `mkfs.ext4`:
|
||||
since it is protected from hardware failures via internal replication, and it can be [resized on the fly](https://cloud.google.com/compute/docs/disks/add-persistent-disk#resize_pd).
|
||||
If you plan to store more than 1TB of data on an `ext4` partition, then the following options are recommended to pass to `mkfs.ext4`:
|
||||
|
||||
```sh
|
||||
mkfs.ext4 ... -O 64bit,huge_file,extent -T huge
|
||||
|
||||
@@ -26,23 +26,32 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
## tip
|
||||
|
||||
* SECURITY: upgrade base docker image (Alpine) from 3.23.4 to 3.24.1. See [Alpine 3.24.1 release notes](https://www.alpinelinux.org/posts/Alpine-3.24.1-released.html).
|
||||
|
||||
* BUGFIX: all VictoriaMetrics components: cancel in-flight HTTP requests shortly before `-http.maxGracefulShutdownDuration` elapses during graceful shutdown, so they can drain and the shutdown completes cleanly within that window instead of timing out and exiting via `logger.Fatalf` -> `os.Exit`. This prevents skipping the storage flush and losing in-memory data when long-lived requests are in flight (such as VictoriaLogs live tailing). See [#1502](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1502).
|
||||
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fixes unexpected rare rerouting. See [#11162](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11162).
|
||||
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): propagate cache reset operation to `selectNode` when `/internal/resetRollupResultCache` is called. Previously, the propagation only happened when the `delete_series` API was called. See [#11112](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11112).
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix possible unexpected increases in `rate_avg` and `rate_sum` if an out-of-order sample is ingested after the previous flush. See [#11140](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11140).
|
||||
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): add `default_vm_access_claim` field into `jwt` section of auth config. It could be used at [JWT claim placeholders](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-based-request-templating), if `JWT` token doesn't have `vm_access` claim. See [#11054](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11054).
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): prevent possible password brute-force attacks with an artificial 2-3 second delay as recommended by [OWASP](https://owasp.org/Top10/2025/A07_2025-Authentication_Failures). See [#11180](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11180).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): reduces CPU usage by 10% at [sharding among remote storages](https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages). See [#11113](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11113). Thanks to @bennf for contribution.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `optimize_repeated_binary_op_subexprs=1` query arg to [/api/v1/query_range](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#range-query) for executing binary operator sides sequentially when they share the same optimized aggregate rollup result expression. This allows the second side to reuse rollup result cache populated by the first side. See [#10575](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10575).
|
||||
|
||||
## [v1.146.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.146.0)
|
||||
|
||||
Released at 2026-06-22
|
||||
|
||||
* FEATURE: all VictoriaMetrics components: add `-http.header.disableServerHostname` command-line flag for disabling the `X-Server-Hostname` HTTP response header. See [#11067](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11067). Thanks to @zasdaym for contribution.
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): expose `vm_streamaggr_dedup_dropped_samples_total` to allow tracking dropped old samples during [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): use the aggregation rule interval as the default [staleness_interval](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#staleness) instead of `2*interval`, to reduce spikes when there are gaps between received samples. See [#11102](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11102).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): add new aggregation output [sum_samples_total](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#sum_samples_total) for summing input delta values into a cumulative counter. See issues [#11002](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11002) and [#4843](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4843).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add a new flag `-remoteWrite.inmemoryQueues` to prioritize recently ingested data over historical data stored at file-based [persistent queue](https://docs.victoriametrics.com/victoriametrics/vmagent/#on-disk-persistence-and-data-processing-order). See [#8833](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8833)
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-promscrape.cluster.shardByLabels` command-line flag for selecting target labels used for sharding scrape targets among `vmagent` instances in cluster mode. See [#11044](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11044).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add `-promscrape.cluster.shardByLabels` command-line flag for selecting target labels used for sharding scrape targets among `vmagent` instances in cluster mode. See [#11044](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11044).
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add support for [Monitoring Data eXchange (MDX)](https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange): the ability to route only metrics from VictoriaMetrics services to a specific `-remoteWrite.url`. MDX is useful for building monitoring-of-monitoring where one remote storage should receive the full metric stream and another should receive only VictoriaMetrics metrics. Enable per destination with `-remoteWrite.mdx.enable=true`. See [#10600](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600).
|
||||
|
||||
* BUGFIX: [enterprise](https://docs.victoriametrics.com/enterprise/) [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly expose metric `vm_retention_filters_partitions_scheduled_rows`. See [#11138](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11138)
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
|
||||
@@ -57,6 +66,7 @@ Released at 2026-06-22
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly escape `metricFamilyName` at metrics metadata response. See [#11129](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11129). Thanks for @fxrlv for the contribution.
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent more cases of panic during directory deletion on `NFS`-based mounts. See [#11060](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11060).
|
||||
|
||||
|
||||
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
|
||||
|
||||
Released at 2026-06-08
|
||||
|
||||
@@ -624,13 +624,12 @@ command line flags. See how to [shard data across remote write destinations](htt
|
||||
The following requirements must be met for sharded aggregation to work correctly:
|
||||
- All sharding vmagents should have the same deterministic sharding configuration.
|
||||
- The sharding configuration must align with the `by` and `without` lists:
|
||||
- Labels listed in `by` setting should be a subset of shard's routing key `-remoteWrite.shardByURL.labels`.
|
||||
With `-remoteWrite.shardByURL.labels=env,job` aggregator's `by` should include `by: env`, `by: job` or both: `by: [env, job]`.
|
||||
This makes sure that all the samples for the same `env` and `job` are aggregated together and produce the complete output.
|
||||
- Labels listed in `without` setting should be a superset of shard's routing key `--remoteWrite.shardByURL.ignoreLabels`.
|
||||
With `-remoteWrite.shardByURL.ignoreLabels=env,job` aggegator's `without` should include at least both labels `without: [env,job]`.
|
||||
This makes sure that `requests_total{env=test, job=foo}` and `requests_total{env=prod, job=foo}` are routed to the same aggregator
|
||||
and are aggregated together. See also [this issue](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/5938#issuecomment-2018470324).
|
||||
- Labels configured in `-remoteWrite.shardByURL.labels` must be a subset of the labels listed in `by`.
|
||||
For example, if the aggregation config specifies `by: [env, job]`, then `-remoteWrite.shardByURL.labels` may include `env`, `job`, or both.
|
||||
This ensures that all samples contributing to the same aggregation result are routed to the same aggregator instance and aggregated together to produce a complete output.
|
||||
- Labels configured in `-remoteWrite.shardByURL.ignoreLabels` must be a superset of the labels listed in `without`.
|
||||
For example, if the aggregation config specifies `without: [env, pod]`, then `-remoteWrite.shardByURL.ignoreLabels` must include at least `env` and `pod`.
|
||||
This ensures that labels removed during aggregation are not used for shard routing.
|
||||
- Aggregating vmagents should not produce collisions: the aggregation output should be unique across all the sharded agents.
|
||||
For example, `requests_total:5m_without_env_pod_total` produced by both `vmagent-aggr-1` and `vmagent-aggr-2` will collide
|
||||
unless they have labels uniquely identifying them. These labels should be either preserved during sharding and aggregation config,
|
||||
|
||||
@@ -222,6 +222,7 @@ Below are aggregation functions that can be put in the `outputs` list at [stream
|
||||
* [stddev](#stddev)
|
||||
* [stdvar](#stdvar)
|
||||
* [sum_samples](#sum_samples)
|
||||
* [sum_samples_total](#sum_samples_total)
|
||||
* [total](#total)
|
||||
* [total_prometheus](#total_prometheus)
|
||||
* [unique_samples](#unique_samples)
|
||||
@@ -505,10 +506,11 @@ See also:
|
||||
|
||||
- [count_samples](#count_samples)
|
||||
- [count_series](#count_series)
|
||||
- [sum_samples_total](#sum_samples_total)
|
||||
|
||||
### `sum_samples_total`
|
||||
|
||||
`sum_samples_total` sums input delta values into a cumulative [counter](https://docs.victoriametrics.com/victoriametrics/keyconcepts/index.html#counter) and outputs the result at the given `interval`.
|
||||
`sum_samples_total` {{% available_from "v1.146.0" %}}. sums input delta values into a cumulative [counter](https://docs.victoriametrics.com/victoriametrics/keyconcepts/index.html#counter) and outputs the result at the given `interval`.
|
||||
`sum_samples_total` makes sense only for aggregating delta values from clients such as [StatsD counter](https://github.com/statsd/statsd/blob/master/docs/metric_types.md#counting).
|
||||
|
||||
The results of `sum_samples_total` is roughly equal to the following [MetricsQL](https://docs.victoriametrics.com/victoriametrics/metricsql/) query:
|
||||
@@ -559,6 +561,7 @@ See also:
|
||||
- [total_prometheus](#total_prometheus)
|
||||
- [increase](#increase)
|
||||
- [increase_prometheus](#increase_prometheus)
|
||||
- [sum_samples_total](#sum_samples_total)
|
||||
- [rate_sum](#rate_sum)
|
||||
- [rate_avg](#rate_avg)
|
||||
|
||||
@@ -588,6 +591,7 @@ See also:
|
||||
- [total](#total)
|
||||
- [increase](#increase)
|
||||
- [increase_prometheus](#increase_prometheus)
|
||||
- [sum_samples_total](#sum_samples_total)
|
||||
- [rate_sum](#rate_sum)
|
||||
- [rate_avg](#rate_avg)
|
||||
|
||||
|
||||
@@ -268,6 +268,39 @@ for the collected samples. Examples:
|
||||
```sh
|
||||
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -streamAggr.dedupInterval=60s
|
||||
```
|
||||
|
||||
### Monitoring Data eXchange
|
||||
|
||||
The Monitoring Data eXchange (MDX){{% available_from "#" %}} feature allows `vmagent` to forward only VictoriaMetrics metrics to selected `-remoteWrite.url` destinations while dropping metrics from non-VictoriaMetrics services.
|
||||
|
||||
To enable MDX, set `-remoteWrite.mdx.enable=true` for the target URL and `-remoteWrite.mdx.enable=false` for other URLs:
|
||||
|
||||
```sh
|
||||
./vmagent \
|
||||
-remoteWrite.url=http://service-to-keep-all-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=false \
|
||||
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=true
|
||||
```
|
||||
When MDX is enabled for a `-remoteWrite.url`, `vmagent` forwards only metrics that:
|
||||
- come from the target that exposes the `vm_app_version` metric (emitted by all VictoriaMetrics components)
|
||||
- contain the `victoriametrics_app=true` label, which will be added automatically to the metrics if the instance was deployed via [VictoriaMetrics Operator](https://docs.victoriametrics.com/operator/).
|
||||
|
||||
`victoriametrics_app=true` label will be added to all metrics that are preserved by MDX if it's absent.
|
||||
|
||||
- contain the label specified via `-mdx.label`.
|
||||
|
||||
```sh
|
||||
./vmagent \
|
||||
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=true \
|
||||
-mdx.label="service=victoriametrics"
|
||||
```
|
||||
In this configuration, metrics with the label `service=victoriametrics` are preserved even if their scrape targets do not expose `vm_app_version` metric.
|
||||
|
||||
The number of VictoriaMetrics metrics preserved by MDX is exposed as `vmagent_remotewrite_mdx_rows_preserved_total`.
|
||||
|
||||
The scope of MDX is at the per-url level, so it works after global level mechanisms, such as stream aggregation, relabeling, complexity limiter, and cardinality limiter. See [Life of a sample](https://docs.victoriametrics.com/victoriametrics/vmagent/#life-of-a-sample).
|
||||
|
||||
### Life of a sample
|
||||
|
||||
@@ -285,18 +318,20 @@ flowchart TB
|
||||
F --> G[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#replication-and-high-availability">replicate</a> to each <b>-remoteWrite.url</b><br/>or <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages">shard</a> if <b>-remoteWrite.shardByURL</b> is set]
|
||||
|
||||
%% Left branch
|
||||
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
|
||||
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange/">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
H2 --> H3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H3 --> H4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
|
||||
|
||||
%% Right branch
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R2 --> R3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R3 --> R4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
|
||||
```
|
||||
|
||||
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:
|
||||
@@ -309,11 +344,11 @@ Scraping has additional settings that can be applied before samples are pushed t
|
||||
`vmagent` supports [the same set of push-based data ingestion protocols as VictoriaMetrics does](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#how-to-import-time-series-data)
|
||||
in addition to the pull-based Prometheus-compatible targets' scraping:
|
||||
|
||||
* DataDog "submit metrics" API. See [these docs](https://docs.victoriametrics.com/victoriametrics/integrations/datadog/).
|
||||
* Datadog "submit metrics" API. See [these docs](https://docs.victoriametrics.com/victoriametrics/integrations/datadog/).
|
||||
* InfluxDB line protocol via `http://<vmagent>:8429/write`. See [these docs](https://docs.victoriametrics.com/victoriametrics/integrations/influxdb/).
|
||||
* Graphite plaintext protocol if the `-graphiteListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/victoriametrics/integrations/graphite/#ingesting).
|
||||
* OpenTelemetry HTTP API. See [these docs](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/).
|
||||
* NewRelic API. See [these docs](https://docs.victoriametrics.com/victoriametrics/integrations/newrelic/#sending-data-from-agent).
|
||||
* OpenTelemetry HTTP API via `http://<vmagent>:8429/opentelemetry/v1/metrics`. See [these docs](https://docs.victoriametrics.com/victoriametrics/integrations/opentelemetry/).
|
||||
* New Relic API. See [these docs](https://docs.victoriametrics.com/victoriametrics/integrations/newrelic/#sending-data-from-agent).
|
||||
* OpenTSDB telnet and http protocols if `-opentsdbListenAddr` command-line flag is set. See [these docs](https://docs.victoriametrics.com/victoriametrics/integrations/opentsdb/).
|
||||
* Zabbix Connector streaming protocol. See [these docs](https://docs.victoriametrics.com/victoriametrics/integrations/zabbixconnector/#send-data-from-zabbix-connector).
|
||||
* Prometheus remote write protocol via `http://<vmagent>:8429/api/v1/write`.
|
||||
@@ -481,29 +516,38 @@ by specifying `-remoteWrite.forcePromProto` command-line flag for the correspond
|
||||
## Multitenancy
|
||||
|
||||
By default, `vmagent` collects the data without [tenant](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy) identifiers
|
||||
and routes it to the remote storage specified via `-remoteWrite.url` command-line flag. The `-remoteWrite.url` can point to `/insert/<tenant_id>/prometheus/api/v1/write` path
|
||||
at `vminsert` according to [these docs](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format).
|
||||
and routes it to the remote storage specified via `-remoteWrite.url` command-line flag. Point `-remoteWrite.url` to vminsert's `/insert/<tenant_id>/prometheus/api/v1/write` path
|
||||
according to [these docs](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format).
|
||||
|
||||
> Note: the single-node version of VictoriaMetrics doesn't support multitenancy.
|
||||
|
||||
```mermaid
|
||||
flowchart LR
|
||||
A["requests_total{instance=foo}"] --> |/api/v1/write| V[vmagent]
|
||||
B["requests_total{instance=bar}"] <--> |scrape| V
|
||||
V --> |"/insert/#60;tenant_id#62;/#60;suffix#62;"| C[vminsert]
|
||||
A["requests_total{instance=foo}"] --> |<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#how-to-push-data-to-vmagent">push</a>| V[vmagent]
|
||||
B["requests_total{instance=bar}"] <--> |<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#how-to-collect-metrics-in-prometheus-format">pull</a>| V
|
||||
V --> |"/insert/#60;tenant_id#62;/prometheus/api/v1/write"| C[vminsert]
|
||||
```
|
||||
|
||||
In this case, all the metrics written to `/insert/tenant_id/prometheus/api/v1/write` will belong to the specified `<tenant_id>` tenant.
|
||||
In this case, all the metrics written to `/insert/<tenant_id>/prometheus/api/v1/write` will belong to the specified `<tenant_id>` tenant.
|
||||
|
||||
### Multitenancy via labels
|
||||
|
||||
vmagent can write data to multiple distinct tenants if `-remoteWrite.url` points to [multitenant URL at VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-labels)
|
||||
and tenant is specified via [multitenancy labels](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-labels):
|
||||
vmagent can write data to **multiple distinct tenants** if `-remoteWrite.url` points to the [multitenant URL in the VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-labels)
|
||||
and the tenant is specified via [multitenancy labels](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-labels):
|
||||
|
||||
```mermaid
|
||||
flowchart LR
|
||||
A["requests_total{instance=foo, vm_account_id=0}"] --> |/api/v1/write| V[vmagent]
|
||||
B["requests_total{instance=bar, vm_account_id=1}"] <--> |scrape| V
|
||||
V --> |"/insert/multitenant/#60;suffix#62;"| C[vminsert]
|
||||
A["requests_total{instance=foo, vm_account_id=0}"] --> |<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#how-to-push-data-to-vmagent">push</a>| V[vmagent]
|
||||
B["requests_total{instance=bar, vm_account_id=1}"] <--> |<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#how-to-collect-metrics-in-prometheus-format">pull</a>| V
|
||||
V --> |"/insert/multitenant/prometheus/api/v1/write"| C[vminsert]
|
||||
```
|
||||
`<tenant_id>` is extracted from the `vm_account_id` and `vm_project_id` labels.
|
||||
|
||||
> A single payload pulled from or pushed to vmagent may contain time series belonging to multiple tenants.
|
||||
|
||||
When vminsert receives the data on the `/insert/multitenant` path, it extracts `<tenant_id>` from the `vm_account_id` and `vm_project_id` labels for
|
||||
each distinct time series.
|
||||
|
||||
> If `vm_account_id` or `vm_project_id` labels are missing or invalid, then the corresponding accountID and projectID are set to 0.
|
||||
|
||||
The `vm_account_id` and `vm_project_id` labels can be specified via [relabeling](https://docs.victoriametrics.com/victoriametrics/relabeling/) before sending the metrics to `-remoteWrite.url`.
|
||||
For example, the following relabeling rule instructs sending metrics to `<account_id>:0` tenant defined in the `prometheus.io/account_id` annotation of Kubernetes pod deployment:
|
||||
@@ -516,11 +560,12 @@ scrape_configs:
|
||||
target_label: vm_account_id
|
||||
```
|
||||
|
||||
vmagent can get tenant identifier from `__tenant_id__` label at target discovery phase.
|
||||
It implicitly converts `__tenant_id__` label into `vm_account_id` and `vm_project_id` labels and attaches
|
||||
it to the scraped metrics and metrics metadata.
|
||||
vmagent can get the tenant identifier from the `__tenant_id__` label during the target discovery phase.
|
||||
It implicitly converts the `__tenant_id__` label into `vm_account_id` and `vm_project_id` labels and attaches
|
||||
them to the scraped metrics and metrics metadata.
|
||||
|
||||
For example, the following relabeling rule instructs sending metrics to the `10:5` tenant defined in the `prometheus.io/tenant_id: 10:5` annotation of the Kubernetes pod deployment:
|
||||
For example, the following relabeling rule instructs sending metrics to the `10:5` tenant defined in the `prometheus.io/tenant_id: 10:5`
|
||||
annotation of the Kubernetes pod deployment:
|
||||
|
||||
```yaml
|
||||
scrape_configs:
|
||||
@@ -531,47 +576,54 @@ scrape_configs:
|
||||
target_label: __tenant_id__
|
||||
```
|
||||
|
||||
vmagent can [enforce adding labels](https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics) to all scraped
|
||||
or forwarded metrics.
|
||||
vmagent can also [enforce adding labels](https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics)
|
||||
on all scraped or forwarded metrics.
|
||||
|
||||
### Multitenancy via path
|
||||
|
||||
vmagent can write data to multiple distinct tenants if `-remoteWrite.url` points to [multitenant URL at VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-labels),
|
||||
tenant is specified in the [write path](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format), and `-enableMultitenantHandlers` command-line flag is set:
|
||||
vmagent can write data to multiple distinct tenants if:
|
||||
* its `-remoteWrite.url` points to the [VictoriaMetrics cluster multitenant URL](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-labels)
|
||||
* its `-enableMultitenantHandlers` command-line flag is set
|
||||
* clients ingest data into vmagent with the tenant specified in the [write path](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format)
|
||||
|
||||
```mermaid
|
||||
flowchart LR
|
||||
A["requests_total{instance=foo}"] --> |/insert/0/#60;suffix#62;| V[vmagent]
|
||||
B["requests_total{instance=bar}"] --> |/insert/1/#60;suffix#62;| V
|
||||
V --> |"/insert/multitenant/#60;suffix#62;"| C[vminsert]
|
||||
A["requests_total{instance=foo}"] --> |<a href="https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format">/insert/0/#60;suffix#62;</a>| V[vmagent]
|
||||
B["requests_total{instance=bar}"] --> |<a href="https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format">/insert/1/#60;suffix#62;</a>| V
|
||||
V --> |"/insert/multitenant/prometheus/api/v1/write"| C[vminsert]
|
||||
```
|
||||
|
||||
In this configuration, vmagent accepts writes via the same multitenant endpoints (`/insert/<accountID>/<suffix>`) [as vminsert does](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format).
|
||||
For all received data, vmagent will automatically convert tenant identifiers from the URL to `vm_account_id` and `vm_project_id` labels and set tenant info in metadata.
|
||||
For all the received data, vmagent will automatically convert tenant identifiers in the URL path to `vm_account_id` and `vm_project_id` labels, and set tenant information in metadata.
|
||||
|
||||
These tenant labels are added before applying [relabeling](https://docs.victoriametrics.com/victoriametrics/relabeling/) specified via `-remoteWrite.relabelConfig`
|
||||
and `-remoteWrite.urlRelabelConfig` command-line flags.
|
||||
These tenant labels are added before applying [relabeling](https://docs.victoriametrics.com/victoriametrics/relabeling/)
|
||||
specified via `-remoteWrite.relabelConfig` and `-remoteWrite.urlRelabelConfig` command-line flags.
|
||||
|
||||
### Multitenancy via headers
|
||||
|
||||
vmagent can write data to multiple distinct tenants if `-remoteWrite.url` points to [multitenant URL at VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-labels),
|
||||
tenant is specified [via headers](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-headers) {{% available_from "v1.143.0" %}}, both `-enableMultitenantHandlers` and `-enableMultitenancyViaHeaders` command-line flags are set:
|
||||
vmagent can write data to multiple distinct tenants if:
|
||||
* its `-remoteWrite.url` points to the [VictoriaMetrics cluster multitenant URL](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-labels)
|
||||
* its `-enableMultitenantHandlers` and `-enableMultitenancyViaHeaders` command-line flags are both set
|
||||
* clients ingest data into vmagent with the tenants specified [via headers](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-headers) {{% available_from "v1.143.0" %}}
|
||||
|
||||
```mermaid
|
||||
flowchart LR
|
||||
A["requests_total{instance=foo}"] --> |/insert/#60;suffix#62; <br>--header AccountID: 0| V[vmagent]
|
||||
B["requests_total{instance=bar}"] --> |/insert/#60;suffix#62; <br>--header AccountID: 1| V
|
||||
V --> |"/insert/multitenant/#60;suffix#62;"| C[vminsert]
|
||||
A["requests_total{instance=foo}"] --> |<a href="https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-headers">/insert/#60;suffix#62;</a> <br>--header AccountID: 0| V[vmagent]
|
||||
B["requests_total{instance=bar}"] --> |<a href="https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#multitenancy-via-headers">/insert/#60;suffix#62;</a> <br>--header AccountID: 1| V
|
||||
V --> |"/insert/multitenant/prometheus/api/v1/write"| C[vminsert]
|
||||
```
|
||||
|
||||
In this configuration, vmagent accepts writes via the same simplified multitenant endpoints (`/insert/<suffix>`) [as vminsert does](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/#url-format).
|
||||
The tenant information is extracted from the `AccountID` and `ProjectID` HTTP headers, which are expected to be included in all incoming requests. If headers are missing, then the tenant is set to `0:0` as the default.
|
||||
The tenant information is extracted from the `AccountID` and `ProjectID` HTTP headers, which are expected to be included in all incoming requests.
|
||||
|
||||
For all received data, vmagent will automatically convert tenant identifiers from the headers to `vm_account_id` and `vm_project_id` labels and set tenant info in metadata.
|
||||
These tenant labels are added before applying [relabeling](https://docs.victoriametrics.com/victoriametrics/relabeling/) specified via `-remoteWrite.relabelConfig`
|
||||
> If headers are missing, then the tenant is set to `0:0` by default.
|
||||
|
||||
For all the received data, vmagent will automatically convert tenant identifiers in the headers to `vm_account_id` and `vm_project_id` labels, and set tenant info in metadata.
|
||||
These tenant labels are added before applying [relabeling](https://docs.victoriametrics.com/victoriametrics/relabeling/) specified via the `-remoteWrite.relabelConfig`
|
||||
and `-remoteWrite.urlRelabelConfig` command-line flags.
|
||||
|
||||
vmauth can [enforce adding headers](https://docs.victoriametrics.com/victoriametrics/vmauth/#modifying-http-headers) to all
|
||||
forwarded requests via `headers` param in the config file.
|
||||
forwarded requests via the `headers` parameter in the config file.
|
||||
|
||||
## Adding labels to metrics
|
||||
|
||||
|
||||
@@ -270,7 +270,7 @@ users:
|
||||
url_prefix: "http://victoria-metrics:8428/"
|
||||
```
|
||||
|
||||
JWT tokens must contain a `"vm_access": {}` claim, more on that in [JWT claim-based request templating](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-based-request-templating)
|
||||
The `vm_access` claim is optional starting from {{% available_from "#" %}}: when present it is used for [request templating](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-based-request-templating), and when absent the default tenant `0:0` is assumed for any `vm_access`-based placeholders. Routing can rely solely on other token claims via [JWT claim matching](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-matching).
|
||||
|
||||
For testing, skip signature verification with `skip_verify: true` (not recommended for production).
|
||||
|
||||
@@ -520,7 +520,8 @@ for dynamic URL rewriting based on `vm_access` claim fields.
|
||||
|
||||
`vmauth` can dynamically rewrite{{% available_from "v1.137.0" %}} upstream URLs and request headers using values from the JWT `vm_access` claim.
|
||||
This enables routing different users to different backends or tenants based solely on the JWT token,
|
||||
without maintaining separate user configs per tenant.
|
||||
without maintaining separate user configs per tenant. In addition `vm_access` claim could be defined at `jwt` section with `default_vm_access_claim` {{% available_from "#" %}}.
|
||||
In this case, if JWT token doesn't have `vm_access` claim defined, value from `default_vm_access_claim` will be used for templaing.
|
||||
|
||||
Example: minimal valid JWT. If vm_access is empty, tenant `0:0` is assumed and no additional filters are applied.
|
||||
```json
|
||||
@@ -575,6 +576,28 @@ Placeholders are supported in the following locations:
|
||||
Placeholders are **not** supported in response headers.
|
||||
They are also only valid for JWT-authenticated users — using them in configs for `username`/`password` or `bearer_token` users causes a configuration error.
|
||||
|
||||
Example: default `vm_access` claim:
|
||||
|
||||
```yaml
|
||||
users:
|
||||
- jwt:
|
||||
default_vm_access_claim:
|
||||
metrics_account_id: 10
|
||||
metrics_project_id: 10
|
||||
metrics_extra_filters:
|
||||
- '{instance="sandbox"}'
|
||||
metrics_extra_labels:
|
||||
- team=dev
|
||||
- env=dev
|
||||
public_keys:
|
||||
- |
|
||||
-----BEGIN PUBLIC KEY-----
|
||||
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA...
|
||||
-----END PUBLIC KEY-----
|
||||
url_prefix: "http://vminsert:8480/insert/{{.MetricsAccountID}}:{{.MetricsProjectID}}/prometheus/?extra_filters={{.MetricsExtraFilters}}&extra_label={{.MetricsExtraLabels}}"
|
||||
```
|
||||
|
||||
|
||||
Example: route requests to the VictoriaMetrics single-node:
|
||||
|
||||
```yaml
|
||||
|
||||
584
docs/victoriametrics/vmestimator.md
Normal file
584
docs/victoriametrics/vmestimator.md
Normal file
@@ -0,0 +1,584 @@
|
||||
---
|
||||
weight: 12
|
||||
menu:
|
||||
docs:
|
||||
parent: victoriametrics
|
||||
weight: 12
|
||||
title: vmestimator
|
||||
tags:
|
||||
- metrics
|
||||
- cardinality
|
||||
aliases:
|
||||
- /vmestimator.html
|
||||
- /vmestimator/index.html
|
||||
- /vmestimator/
|
||||
---
|
||||
|
||||
`vmestimator` measures metrics cardinality across arbitrary label dimensions and exposes the results as metrics.
|
||||
|
||||
## Why measure?
|
||||
|
||||
Consider a setup where metrics are scraped from dozens of Prometheus targets.
|
||||
One day, a team deploys a new version of their service with a `trace_id` or `user_id` label.
|
||||
Overnight, that job's cardinality explodes from 500 to 500,000 time series.
|
||||
Suddenly, VictoriaMetrics consumes 100x more memory and disk.
|
||||
Ingestion slows down, storage struggles to keep up, and in the worst case becomes unavailable.
|
||||
|
||||
By the time someone gets paged, the damage is already done: indexes are bloated, caches are oversized, and observability across the entire system is affected.
|
||||
|
||||
`vmestimator` continuously tracks cardinality and exposes the estimation results as [metrics](https://github.com/VictoriaMetrics/vmestimator/blob/main/README.md#cardinality-metrics).
|
||||
This allows alerting on cardinality spikes within minutes and identifying the offending job directly from the alert.
|
||||
Instead of discovering the problem after it impacts the infrastructure, it becomes possible to react before it turns into an outage.
|
||||
|
||||
Per-job cardinality tracking is the most actionable use case, but it’s not the only one (see [use cases](https://github.com/VictoriaMetrics/vmestimator/#use-cases)).
|
||||
`vmestimator` can measure cardinality across arbitrary label dimensions,
|
||||
enabling use cases such as per-tenant usage analysis, long-term trend tracking, and capacity planning.
|
||||
|
||||
## Design
|
||||
|
||||
We recommend deploying `vmestimator` close to the metrics source, ideally alongside `vmagent` instances that scrape targets.
|
||||
Each `vmagent` mirrors all ingested metrics into the estimator.
|
||||
|
||||
To reduce overhead, persistent queueing and metadata ingestion can be disabled for the estimator remote write path.
|
||||
It is safe to send metrics from multiple independent `vmagent` instances into a single `vmestimator`.
|
||||
|
||||
Run vmestimator (see [configuration](https://github.com/VictoriaMetrics/vmestimator#configuration)):
|
||||
```bash
|
||||
/path/to/vmestimator -config=streams.yaml # -httpListenAddr=:8490
|
||||
```
|
||||
|
||||
Run vmagent:
|
||||
```bash
|
||||
/path/to/vmagent \
|
||||
-remoteWrite.url=http://127.0.0.1:8428/api/v1/write \
|
||||
-remoteWrite.url=http://127.0.0.1:8490/cardinality/api/v1/write \
|
||||
-remoteWrite.disableOnDiskQueue=false,true \
|
||||
-remoteWrite.disableMetadata=false,true
|
||||
```
|
||||
|
||||
The next step is to expose cardinality estimates as metrics.
|
||||
For this, `vmagent` should scrape the estimator `/metrics` endpoint and forward those metrics to a `vmsingle` instance (or another VictoriaMetrics storage).
|
||||
|
||||
<img style="min-width:0;width: 100%" src="https://github.com/user-attachments/assets/e52d9210-b6f9-457b-8d8f-1d6ff6ba1416" />
|
||||
|
||||
This setup is straightforward and introduces minimal overhead.
|
||||
The main drawback is that cardinality data shares the same storage with production metrics.
|
||||
If that storage becomes unavailable, the visibility into cardinality is lost precisely when it may be most needed.
|
||||
|
||||
To mitigate this, we recommend running a separate `vmsingle` instance dedicated to scraping and storing VictoriaMetrics-related monitoring signals only.
|
||||
This pattern is commonly referred to as a monitoring-of-monitoring (MoM) setup.
|
||||
In this architecture, `vmestimator` metrics are isolated from production observability storage,
|
||||
ensuring cardinality visibility remains available even during incidents affecting the primary monitoring system.
|
||||
|
||||
The resulting topology looks like this:
|
||||
<img style="min-width:0;width: 100%" src="https://github.com/user-attachments/assets/e2ca4a69-e931-47a1-9d91-99749382d4a9" />
|
||||
|
||||
## Install
|
||||
|
||||
Create a `streams.yaml` from [example config](https://github.com/VictoriaMetrics/vmestimator/blob/main/streams.yaml).
|
||||
Run the Docker image from [Docker Hub](https://hub.docker.com/r/victoriametrics/vmestimator) or [Quay](https://quay.io/repository/victoriametrics/vmestimator), mounting your config file:
|
||||
```bash
|
||||
docker run --rm \
|
||||
-p 8490:8490 \
|
||||
-v /path/to/streams.yaml:/streams.yaml \
|
||||
docker.io/victoriametrics/vmestimator:latest \
|
||||
-config=/streams.yaml
|
||||
```
|
||||
|
||||
See [Use Cases](https://github.com/VictoriaMetrics/vmestimator#use-cases) for more configuration examples and
|
||||
[Command-line flags](https://github.com/VictoriaMetrics/vmestimator#command-line-flags) for all available options.
|
||||
|
||||
To build from sources, see [How to build from sources](https://github.com/VictoriaMetrics/vmestimator#how-to-build-from-sources).
|
||||
|
||||
## Configuration
|
||||
|
||||
To run vmestimator a `streams.yaml` config has to be provided (see [example config](https://github.com/VictoriaMetrics/vmestimator/blob/main/streams.yaml)):
|
||||
|
||||
```bash
|
||||
/path/to/vmestimator -config=streams.yaml # -httpListenAddr=:8490
|
||||
```
|
||||
|
||||
Config reference:
|
||||
```yaml
|
||||
streams:
|
||||
-
|
||||
# The measurement window: how long unique series are retained before the HLL sketch resets.
|
||||
# Increases are always reflected immediately. Interval only controls how fast the estimate
|
||||
# drops after previously seen series disappear.
|
||||
#
|
||||
# Running two streams with different intervals (e.g. 5m and 1h) lets you derive churn rate
|
||||
# by comparing their estimates. See Use Cases -> Churn Rate
|
||||
#
|
||||
# default: 5m
|
||||
interval: 'golang duration'
|
||||
|
||||
# Label names used to split the cardinality estimate into per-combination groups.
|
||||
# Each distinct combination of values for these labels gets its own estimate metric.
|
||||
# Omit entirely for a single global estimate across all series.
|
||||
# Examples:
|
||||
# - ["job"]
|
||||
# - ["__name__"]
|
||||
# - ["vm_account_id","vm_project_id"]
|
||||
#
|
||||
# default: none (single global estimate)
|
||||
group_by: 'string array'
|
||||
|
||||
# Maximum number of distinct groups (HLL sketches) to track.
|
||||
# Once the limit is reached, excess groups are counted in a single shared "rejected" sketch
|
||||
# rather than getting their own entry. Acts as a memory cap and a safeguard against OOM
|
||||
# when the group_by label values grow unboundedly.
|
||||
# Memory upper bound per stream:
|
||||
# group_limit * 2^hll_precision bytes.
|
||||
#
|
||||
# default: 10000
|
||||
group_limit: 'integer'
|
||||
|
||||
# Number of shards used to reduce lock contention during parallel ingestion.
|
||||
# Slightly increases memory for global streams (no group_by); negligible otherwise.
|
||||
# Leave at the default unless you have profiled lock contention or have a specific reason to change it.
|
||||
#
|
||||
# default: min(64, 2*availableCPUs)
|
||||
buckets: 'integer'
|
||||
|
||||
# HyperLogLog precision p, in range [4..18].
|
||||
# Determines the number of registers m = 2^p and the relative error 1.04 / sqrt(m):
|
||||
# p=14 → m=16 384, error ~0.81%, memory ~16 KB per sketch (default, suits most cases)
|
||||
# p=18 → m=262 144, error ~0.20%, memory ~256 KB per sketch (billing-grade accuracy)
|
||||
# p=10 → m=1 024, error ~3.25%, memory ~1 KB per sketch (thousands of groups, memory-tight)
|
||||
# See more in https://research.google.com/pubs/archive/40671.pdf
|
||||
#
|
||||
# default: 14
|
||||
hll_precision: 'integer'
|
||||
|
||||
# Whether to use the sparse HyperLogLog representation for low-cardinality groups.
|
||||
# Sparse mode uses far less memory until a group's cardinality reaches ~2^(p-1),
|
||||
# at which point it automatically promotes to the dense representation.
|
||||
# See more in https://research.google.com/pubs/archive/40671.pdf
|
||||
#
|
||||
# default: true
|
||||
hll_sparse: 'boolean'
|
||||
|
||||
# Static labels attached to every output metric produced by this stream entry.
|
||||
# Useful when multiple vmestimator instances feed the same storage and you need
|
||||
# to distinguish their estimates in dashboards and alerts.
|
||||
labels: 'map key string: value string'
|
||||
```
|
||||
|
||||
## Cardinality Metrics
|
||||
|
||||
Cardinality estimates are exposed as the `cardinality_estimate` metric.
|
||||
All metrics include `interval`, `group_by_keys`, `group_by_values`, and any static labels defined in the stream config.
|
||||
|
||||
For global estimates (no `group_by` configured), `group_by_keys` is `__global__` and `group_by_values` is omitted:
|
||||
```
|
||||
cardinality_estimate{interval="1h0m0s",group_by_keys="__global__"} 142300
|
||||
```
|
||||
|
||||
For grouped estimates, one summary line shows the total number of distinct groups `group_by_keys="__group__"`, followed by one line per distinct label value combination.
|
||||
Each per-group line also includes individual `by_{key}="{val}"` labels:
|
||||
```
|
||||
cardinality_estimate{interval="5m0s",group_by_keys="__group__",group_by_values="instance,job"} 2
|
||||
cardinality_estimate{interval="5m0s",group_by_keys="instance,job",group_by_values="host1:9090,prometheus",by_instance="host1:9090",by_job="prometheus"} 312
|
||||
cardinality_estimate{interval="5m0s",group_by_keys="instance,job",group_by_values="host2:9100,node",by_instance="host2:9100",by_job="node"} 87
|
||||
```
|
||||
|
||||
Note: the total distinct group count in the summary line may exceed the number of per-group lines when `group_limit` is reached
|
||||
and excess groups are counted in a single shared "rejected" sketch rather than getting their own entry.
|
||||
|
||||
By default, cardinality estimates are merged with the estimator's operational metrics and exposed at `/metrics`.
|
||||
This is controlled by the `-cardinalityMetrics.exposeAt` flag:
|
||||
- `-cardinalityMetrics.exposeAt=/metrics` (default): cardinality metrics merged with operational metrics at `/metrics`
|
||||
- `-cardinalityMetrics.exposeAt=/cardinality/metrics`: cardinality metrics exposed at separate path
|
||||
- `-cardinalityMetrics.exposeAt=`: cardinality metrics not exposed via HTTP
|
||||
|
||||
Computing cardinality estimates is expensive, so results are cached.
|
||||
Cache duration is controlled by `-cardinalityMetrics.cacheTTL` (default: `30s`).
|
||||
Set to `0` to disable caching entirely.
|
||||
|
||||
## Use Cases
|
||||
|
||||
### Basic
|
||||
|
||||
Global cardinality:
|
||||
```yaml
|
||||
# streams.yaml
|
||||
|
||||
- interval: '5m'
|
||||
```
|
||||
|
||||
Per metric name cardinality:
|
||||
```yaml
|
||||
# streams.yaml
|
||||
|
||||
- interval: '5m'
|
||||
group_by: ['__name__']
|
||||
```
|
||||
|
||||
Per job label cardinality:
|
||||
```yaml
|
||||
# streams.yaml
|
||||
|
||||
- interval: '5m'
|
||||
group_by: ['job']
|
||||
```
|
||||
|
||||
Per tenant cardinality:
|
||||
```yaml
|
||||
# streams.yaml
|
||||
|
||||
- interval: '5m'
|
||||
group_by: ['vm_account_id', 'vm_project_id']
|
||||
```
|
||||
|
||||
### Churn calculation
|
||||
|
||||
[Churn rate](https://valyala.medium.com/prometheus-storage-technical-terms-for-humans-4ab4de6c3d48#churn-rate) measures how quickly time series are created and disappear.
|
||||
[High churn](https://docs.victoriametrics.com/victoriametrics/faq/#what-is-high-churn-rate) means many series appear briefly and are replaced by new ones.
|
||||
This puts pressure on storage, because each new series must be indexed regardless of how short its lifetime is.
|
||||
|
||||
To measure churn, configure two streams with the same `group_by` but different intervals. A short one (`15m`) and a long one (`30m`):
|
||||
```yaml
|
||||
# streams.yaml
|
||||
|
||||
- interval: '15m'
|
||||
group_by: ['job']
|
||||
|
||||
- interval: '30m'
|
||||
group_by: ['job']
|
||||
```
|
||||
|
||||
When churn is low, both estimates are roughly equal.
|
||||
When churn is high, the `30m` estimate grows significantly larger than the `15m` estimate, because the long window accumulates series that have already disappeared.
|
||||
|
||||
The following query computes the churn ratio per job:
|
||||
```
|
||||
(
|
||||
sum(
|
||||
max(cardinality_estimate{group_by_keys="job",interval="30m0s"}) without (instance)
|
||||
) by (group_by_keys,group_by_values)
|
||||
-
|
||||
sum(
|
||||
max(cardinality_estimate{group_by_keys="job",interval="15m0s"}) without (instance)
|
||||
) by (group_by_keys,group_by_values)
|
||||
)
|
||||
/
|
||||
sum(
|
||||
max(cardinality_estimate{group_by_keys="job",interval="30m0s"}) without (instance)
|
||||
) by (group_by_keys,group_by_values) * 100
|
||||
```
|
||||
|
||||
A result near `0` means the series set is stable. The same series were active throughout the entire hour.
|
||||
A result near `1` means complete churn. Entirely different series appeared each 5-minute window.
|
||||
Values in between indicate the fraction of maximum possible churn that is occurring.
|
||||
|
||||
This helps identify jobs that create the most indexing pressure on storage, even when their current active cardinality appears moderate.
|
||||
|
||||
### Alerting
|
||||
|
||||
Pre-built alert rules for cardinality monitoring are available in
|
||||
[deployment/docker/rules/alerts-cardinality.yml](https://github.com/VictoriaMetrics/vmestimator/blob/main/deployment/docker/rules/alerts-cardinality.yml).
|
||||
|
||||
They require two streams with the same `group_by` but different intervals to also support churn detection:
|
||||
```yaml
|
||||
# streams.yaml
|
||||
# or use example config:
|
||||
# https://github.com/VictoriaMetrics/vmestimator/blob/main/streams.yaml
|
||||
|
||||
- interval: '15m'
|
||||
group_by: ['job']
|
||||
|
||||
- interval: '30m'
|
||||
group_by: ['job']
|
||||
```
|
||||
|
||||
The included alerts are:
|
||||
|
||||
- **JobTooHighCardinality** — fires when any job exceeds 20,000 estimated active series over the last 30 minutes.
|
||||
The threshold is a starting point and should be calibrated to reflect the expected cardinality of your largest jobs.
|
||||
|
||||
- **JobTooHighChurnRate** — fires when more than 10% of a job's series churned between the 15m and 30m windows.
|
||||
Catches jobs that generate continuous indexing pressure even when their active series count looks moderate.
|
||||
|
||||
- **CardinalityGroupLimitNearlyReached** — fires when the number of tracked groups exceeds 80% of the configured `group_limit`.
|
||||
Acts as an early warning that some label value combinations may soon be dropped from individual tracking.
|
||||
|
||||
- **CardinalityGroupLimitReached** — fires when groups are actively rejected because `group_limit` is full.
|
||||
At this point, some label combinations are being counted in a shared "rejected" sketch rather than tracked individually.
|
||||
|
||||
All alerts link to the [Cardinality Explorer dashboard](https://play-grafana.victoriametrics.com/d/mktd5h8/).
|
||||
|
||||
## Alternative solutions
|
||||
|
||||
### PromQL
|
||||
|
||||
Cardinality can be estimated with PromQL.
|
||||
|
||||
Global cardinality:
|
||||
```
|
||||
count({__name__=~".*"})
|
||||
```
|
||||
|
||||
Top ten metric names by cardinality:
|
||||
```
|
||||
topk(10, count({__name__=~".*"}) by (__name__))
|
||||
```
|
||||
|
||||
Top ten jobs by cardinality:
|
||||
```
|
||||
topk(10, count({__name__=~".*"}) by (job))
|
||||
```
|
||||
|
||||
This approach works for small setups but does not scale well, because these queries scan the entire time series set.
|
||||
Most critically, if the storage is overloaded or unavailable, these queries could not be executed.
|
||||
|
||||
### Cardinality Explorer
|
||||
|
||||
VictoriaMetrics includes a built-in [cardinality explorer](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#cardinality-explorer).
|
||||
It provides per-metric detail beyond raw series counts: query frequency, last access time, day-over-day change, and share of total cardinality.
|
||||
It is well suited for in-depth, ad-hoc investigation.
|
||||
For example, finding metrics that are high-cardinality but rarely queried,
|
||||
so they can be [dropped via relabeling](https://docs.victoriametrics.com/victoriametrics/relabeling/#how-to-drop-metrics-during-scrape) or reduce cardinality with [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/).
|
||||
|
||||
Both tools serve different purposes and work well together.
|
||||
Use `vmestimator` for continuous monitoring, alerting, and cross-cluster cardinality tracking.
|
||||
Use the cardinality explorer when you need to drill into a specific metric or label and understand what is driving its cardinality.
|
||||
|
||||
## Cluster
|
||||
|
||||
`vmestimator` supports a clustered deployment for high availability or when CPU on a single instance becomes a limiting factor.
|
||||
|
||||
Instances are split into two roles: **storage nodes** accept Prometheus remote write and maintain local HyperLogLog sketches; **selector nodes** query all storage nodes, merge their sketches, and expose a unified cardinality estimate. Cardinality estimate results should be scraped from selector nodes.
|
||||
|
||||
<img style="min-width:0;width: 100%" src="https://github.com/user-attachments/assets/846e5f77-378a-44dc-a4c8-2a1c64eca9d8" />
|
||||
|
||||
**Storage nodes:**
|
||||
```
|
||||
vmestimator -config=streams.yaml -httpListenAddr=:8491 -cardinalityMetrics.exposeAt=/cardinality/metrics
|
||||
vmestimator -config=streams.yaml -httpListenAddr=:8492 -cardinalityMetrics.exposeAt=/cardinality/metrics
|
||||
vmestimator -config=streams.yaml -httpListenAddr=:8493 -cardinalityMetrics.exposeAt=/cardinality/metrics
|
||||
```
|
||||
|
||||
**Selector nodes:**
|
||||
```
|
||||
vmestimator -storageNode=http://vmestimator-storage-1:8491 \
|
||||
-storageNode=http://vmestimator-storage-2:8492 \
|
||||
-storageNode=http://vmestimator-storage-3:8493 \
|
||||
-httpListenAddr=:8490
|
||||
```
|
||||
|
||||
Setting `-cardinalityMetrics.exposeAt=/cardinality/metrics` on storage nodes keeps per-node estimates off the default `/metrics` path. The `/metrics` endpoint then returns only operational metrics, while `/cardinality/metrics` exposes the node's local estimate — useful for inspecting or debugging a specific node.
|
||||
|
||||
A selector with `-storageNode` flags and no `-config` runs without local estimators and only merges remote data.
|
||||
|
||||
When multiple selector nodes are scraped, each returns a fully merged estimate.
|
||||
Deduplicate at query time to avoid overcounting:
|
||||
```
|
||||
max(cardinality_estimate) without (instance)
|
||||
```
|
||||
|
||||
## Operational metrics
|
||||
|
||||
When grouping is enabled, vmestimator exposes per-bucket operational metrics at `/metrics`:
|
||||
|
||||
- `vmestimator_estimator_group_size{group_by_keys, bucket}` — number of active groups in this bucket after the last rotation
|
||||
- `vmestimator_estimator_group_rejected_size{group_by_keys}` — estimated number of distinct group values rejected since the last rotation because `group_limit` was reached
|
||||
- `vmestimator_estimator_group_limit{group_by_keys, bucket}` — configured `group_limit` for this bucket
|
||||
|
||||
|
||||
## Dashboards
|
||||
|
||||
Two Grafana dashboards are available in the [dashboards](https://github.com/VictoriaMetrics/vmestimator/tree/main/dashboards) directory:
|
||||
|
||||
- [VictoriaMetrics - vmestimator](https://play-grafana.victoriametrics.com/d/mkv22l4/victoriametrics-vmestimator) — application health: CPU, memory, ingestion rates, concurrent inserts, and group key saturation.
|
||||
<img width="1507" height="801" alt="Screenshot 2026-06-29 at 19 06 46" src="https://github.com/user-attachments/assets/cbfd979d-f403-4270-b098-2d2f0b392172" />
|
||||
|
||||
- [VictoriaMetrics - Cardinality Explorer](https://play-grafana.victoriametrics.com/d/mktd5h8/victoriametrics-cardinality-explorer) — cardinality analysis: global estimates, per-group-key series counts, and top-10 highest-cardinality label value combinations.
|
||||
<img width="1510" height="796" alt="Screenshot 2026-06-29 at 19 05 47" src="https://github.com/user-attachments/assets/a1aea6e1-8714-4d5a-a629-8bdee978f1c6" />
|
||||
|
||||
## How to build from sources
|
||||
|
||||
It is recommended to use the [docker images](https://hub.docker.com/r/victoriametrics/vmestimator).
|
||||
|
||||
Development build:
|
||||
1. [Install Go](https://golang.org/doc/install).
|
||||
1. Run `make vmestimator` from the root folder of [the repository](https://github.com/VictoriaMetrics/vmestimator).
|
||||
It builds `vmestimator` binary and places it into the `bin` folder.
|
||||
|
||||
Production build:
|
||||
1. [Install docker](https://docs.docker.com/install/).
|
||||
1. Run `make vmestimator-prod` from the root folder of [the repository](https://github.com/VictoriaMetrics/vmestimator).
|
||||
It builds `vmestimator-prod` binary and puts it into the `bin` folder.
|
||||
|
||||
Building docker images:
|
||||
|
||||
Run `make package-vmestimator`. It builds `victoriametrics/vmestimator:<PKG_TAG>` docker image locally.
|
||||
`<PKG_TAG>` is auto-generated image tag, which depends on source code in the repository.
|
||||
The `<PKG_TAG>` may be manually set via `PKG_TAG=foobar make package-vmestimator`.
|
||||
|
||||
The base docker image is [alpine](https://hub.docker.com/_/alpine) but it is possible to use any other base image by setting it via `<ROOT_IMAGE>` environment variable.
|
||||
For example, the following command builds the image on top of [scratch](https://hub.docker.com/_/scratch) image:
|
||||
|
||||
```sh
|
||||
ROOT_IMAGE=scratch make package-vmestimator
|
||||
```
|
||||
|
||||
You can build and publish to your own registry and namespace:
|
||||
```
|
||||
DOCKER_REGISTRIES=ghcr.io DOCKER_NAMESPACE=foo make publish-vmestimator
|
||||
```
|
||||
|
||||
## Command-line flags
|
||||
|
||||
Run `vmestimator -help` in order to see all the available options:
|
||||
|
||||
```
|
||||
Usage of ./bin/vmestimator:
|
||||
-cardinalityMetrics.cacheTTL duration
|
||||
Duration for caching cardinality metrics response (default 30s)
|
||||
-cardinalityMetrics.exposeAt string
|
||||
HTTP path for exposing cardinality metrics. If set to the default /metrics, cardinality metrics are merged with regular metrics and exposed together. If set to a different path, only cardinality metrics are exposed at that endpoint. If set to an empty value, cardinality metrics are not exposed via HTTP at all. (default "/metrics")
|
||||
-config string
|
||||
Path to YAML configuration file. Must be set unless -storageNode is specified. See https://github.com/VictoriaMetrics/vmestimator/blob/main/streams.yaml for config example
|
||||
-enableTCP6
|
||||
Whether to enable IPv6 for listening and dialing. By default, only IPv4 TCP and UDP are used
|
||||
-envflag.enable
|
||||
Whether to enable reading flags from environment variables in addition to the command line. Command line flag values have priority over values from environment vars. Flags are read only from the command line if this flag isn't set. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#environment-variables for more details
|
||||
-envflag.prefix string
|
||||
Prefix for environment variables if -envflag.enable is set
|
||||
-filestream.disableFadvise
|
||||
Whether to disable fadvise() syscall when reading large data files. The fadvise() syscall prevents from eviction of recently accessed data from OS page cache during background merges and backups. In some rare cases it is better to disable the syscall if it uses too much CPU
|
||||
-flagsAuthKey value
|
||||
Auth key for /flags endpoint. It must be passed via authKey query arg. It overrides -httpAuth.*
|
||||
Flag value can be read from the given file when using -flagsAuthKey=file:///abs/path/to/file or -flagsAuthKey=file://./relative/path/to/file.
|
||||
Flag value can be read from the given http/https url when using -flagsAuthKey=http://host/path or -flagsAuthKey=https://host/path
|
||||
-fs.maxConcurrency int
|
||||
The maximum number of concurrent goroutines to work with files; smaller values may help reducing Go scheduling latency on systems with small number of CPU cores; higher values may help reducing data ingestion latency on systems with high-latency storage such as NFS or Ceph (default 160)
|
||||
-http.connTimeout duration
|
||||
Incoming connections to -httpListenAddr are closed after the configured timeout. This may help evenly spreading load among a cluster of services behind TCP-level load balancer. Zero value disables closing of incoming connections (default 2m0s)
|
||||
-http.disableCORS
|
||||
Disable CORS for all origins (*)
|
||||
-http.disableKeepAlive
|
||||
Whether to disable HTTP keep-alive for incoming connections at -httpListenAddr
|
||||
-http.disableResponseCompression
|
||||
Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth
|
||||
-http.header.csp string
|
||||
Value for 'Content-Security-Policy' header, recommended: "default-src 'self'"
|
||||
-http.header.disableServerHostname
|
||||
Whether to disable 'X-Server-Hostname' header in HTTP responses
|
||||
-http.header.frameOptions string
|
||||
Value for 'X-Frame-Options' header
|
||||
-http.header.hsts string
|
||||
Value for 'Strict-Transport-Security' header, recommended: 'max-age=31536000; includeSubDomains'
|
||||
-http.idleConnTimeout duration
|
||||
Timeout for incoming idle http connections (default 1m0s)
|
||||
-http.maxGracefulShutdownDuration duration
|
||||
The maximum duration for a graceful shutdown of the HTTP server. A highly loaded server may require increased value for a graceful shutdown (default 7s)
|
||||
-http.pathPrefix string
|
||||
An optional prefix to add to all the paths handled by http server. For example, if '-http.pathPrefix=/foo/bar' is set, then all the http requests will be handled on '/foo/bar/*' paths. This may be useful for proxied requests. See https://www.robustperception.io/using-external-urls-and-proxies-with-prometheus
|
||||
-http.shutdownDelay duration
|
||||
Optional delay before http server shutdown. During this delay, the server returns non-OK responses from /health page, so load balancers can route new requests to other servers
|
||||
-httpAuth.password value
|
||||
Password for HTTP server's Basic Auth. The authentication is disabled if -httpAuth.username is empty
|
||||
Flag value can be read from the given file when using -httpAuth.password=file:///abs/path/to/file or -httpAuth.password=file://./relative/path/to/file.
|
||||
Flag value can be read from the given http/https url when using -httpAuth.password=http://host/path or -httpAuth.password=https://host/path
|
||||
-httpAuth.username string
|
||||
Username for HTTP server's Basic Auth. The authentication is disabled if empty. See also -httpAuth.password
|
||||
-httpListenAddr array
|
||||
TCP address to listen for incoming HTTP requests
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-insert.maxQueueDuration duration
|
||||
The maximum duration to wait in the queue when -maxConcurrentInserts concurrent insert requests are executed (default 1m0s)
|
||||
-internStringCacheExpireDuration duration
|
||||
The expiry duration for caches for interned strings. See https://en.wikipedia.org/wiki/String_interning . See also -internStringMaxLen and -internStringDisableCache (default 6m0s)
|
||||
-internStringDisableCache
|
||||
Whether to disable caches for interned strings. This may reduce memory usage at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning . See also -internStringCacheExpireDuration and -internStringMaxLen
|
||||
-internStringMaxLen int
|
||||
The maximum length for strings to intern. A lower limit may save memory at the cost of higher CPU usage. See https://en.wikipedia.org/wiki/String_interning . See also -internStringDisableCache and -internStringCacheExpireDuration (default 500)
|
||||
-loggerDisableTimestamps
|
||||
Whether to disable writing timestamps in logs
|
||||
-loggerErrorsPerSecondLimit int
|
||||
Per-second limit on the number of ERROR messages. If more than the given number of errors are emitted per second, the remaining errors are suppressed. Zero values disable the rate limit
|
||||
-loggerFormat string
|
||||
Format for logs. Possible values: default, json (default "default")
|
||||
-loggerJSONFields string
|
||||
Allows renaming fields in JSON formatted logs. Example: "ts:timestamp,msg:message" renames "ts" to "timestamp" and "msg" to "message". Supported fields: ts, level, caller, msg
|
||||
-loggerLevel string
|
||||
Minimum level of errors to log. Possible values: INFO, WARN, ERROR, FATAL, PANIC (default "INFO")
|
||||
-loggerMaxArgLen int
|
||||
The maximum length of a single logged argument. Longer arguments are replaced with 'arg_start..arg_end', where 'arg_start' and 'arg_end' is prefix and suffix of the arg with the length not exceeding -loggerMaxArgLen / 2 (default 5000)
|
||||
-loggerOutput string
|
||||
Output for the logs. Supported values: stderr, stdout (default "stderr")
|
||||
-loggerTimezone string
|
||||
Timezone to use for timestamps in logs. Timezone must be a valid IANA Time Zone. For example: America/New_York, Europe/Berlin, Etc/GMT+3 or Local (default "UTC")
|
||||
-loggerWarnsPerSecondLimit int
|
||||
Per-second limit on the number of WARN messages. If more than the given number of warns are emitted per second, then the remaining warns are suppressed. Zero values disable the rate limit
|
||||
-maxConcurrentInserts int
|
||||
The maximum number of concurrent insert requests. Set higher value when clients send data over slow networks. Default value depends on the number of available CPU cores. It should work fine in most cases since it minimizes resource usage. See also -insert.maxQueueDuration (default 20)
|
||||
-maxInsertRequestSize size
|
||||
The maximum size in bytes of a single Prometheus remote_write API request
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 33554432)
|
||||
-memory.allowedBytes size
|
||||
Allowed size of system memory VictoriaMetrics caches may occupy. This option overrides -memory.allowedPercent if set to a non-zero value. Too low a value may increase the cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache resulting in higher disk IO usage. The process may behave unexpectedly if this flag is set too small (e.g., 1 byte).
|
||||
Supports the following optional suffixes for size values: KB, MB, GB, TB, KiB, MiB, GiB, TiB (default 0)
|
||||
-memory.allowedPercent float
|
||||
Allowed percent of system memory VictoriaMetrics caches may occupy. See also -memory.allowedBytes. Too low a value may increase cache miss rate usually resulting in higher CPU and disk IO usage. Too high a value may evict too much data from the OS page cache which will result in higher disk IO usage (default 60)
|
||||
-metrics.exposeMetadata
|
||||
Whether to expose TYPE and HELP metadata at the /metrics page, which is exposed at -httpListenAddr . The metadata may be needed when the /metrics page is consumed by systems, which require this information. For example, Managed Prometheus in Google Cloud - https://cloud.google.com/stackdriver/docs/managed-prometheus/troubleshooting#missing-metric-type
|
||||
-metricsAuthKey value
|
||||
Auth key for /metrics endpoint. It must be passed via authKey query arg. It overrides -httpAuth.*
|
||||
Flag value can be read from the given file when using -metricsAuthKey=file:///abs/path/to/file or -metricsAuthKey=file://./relative/path/to/file.
|
||||
Flag value can be read from the given http/https url when using -metricsAuthKey=http://host/path or -metricsAuthKey=https://host/path
|
||||
-pprofAuthKey value
|
||||
Auth key for /debug/pprof/* endpoints. It must be passed via authKey query arg. It overrides -httpAuth.*
|
||||
Flag value can be read from the given file when using -pprofAuthKey=file:///abs/path/to/file or -pprofAuthKey=file://./relative/path/to/file.
|
||||
Flag value can be read from the given http/https url when using -pprofAuthKey=http://host/path or -pprofAuthKey=https://host/path
|
||||
-pushmetrics.disableCompression
|
||||
Whether to disable request body compression when pushing metrics to every -pushmetrics.url
|
||||
-pushmetrics.extraLabel array
|
||||
Optional labels to add to metrics pushed to every -pushmetrics.url . For example, -pushmetrics.extraLabel='instance="foo"' adds instance="foo" label to all the metrics pushed to every -pushmetrics.url
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-pushmetrics.header array
|
||||
Optional HTTP request header to send to every -pushmetrics.url . For example, -pushmetrics.header='Authorization: Basic foobar' adds 'Authorization: Basic foobar' header to every request to every -pushmetrics.url
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-pushmetrics.interval duration
|
||||
Interval for pushing metrics to every -pushmetrics.url (default 10s)
|
||||
-pushmetrics.url array
|
||||
Optional URL to push metrics exposed at /metrics page. See https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#push-metrics . By default, metrics exposed at /metrics page aren't pushed to any remote storage
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-secret.flags array
|
||||
Comma-separated list of flag names with secret values. Values for these flags are hidden in logs and on /metrics page
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-storageNode array
|
||||
HTTP URLs of remote vmestimator nodes to query for cardinality snapshots, e.g. http://vmestimator-2:8490
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-tls array
|
||||
Whether to enable TLS for incoming HTTP requests at the given -httpListenAddr (aka https). -tlsCertFile and -tlsKeyFile must be set if -tls is set. See also -mtls
|
||||
Supports array of values separated by comma or specified via multiple flags.
|
||||
Empty values are set to false.
|
||||
-tlsCertFile array
|
||||
Path to file with TLS certificate for the corresponding -httpListenAddr if -tls is set. Prefer ECDSA certs instead of RSA certs as RSA certs are slower. The provided certificate file is automatically re-read every second, so it can be dynamically updated. See also -tlsAutocertHosts
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-tlsCipherSuites array
|
||||
Optional list of TLS cipher suites for incoming requests over HTTPS if -tls is set. See the list of supported cipher suites at https://pkg.go.dev/crypto/tls#pkg-constants
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-tlsKeyFile array
|
||||
Path to file with TLS key for the corresponding -httpListenAddr if -tls is set. The provided key file is automatically re-read every second, so it can be dynamically updated. See also -tlsAutocertHosts
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-tlsMinVersion array
|
||||
Optional minimum TLS version to use for the corresponding -httpListenAddr if -tls is set. Supported values: TLS10, TLS11, TLS12, TLS13
|
||||
Supports an array of values separated by comma or specified via multiple flags.
|
||||
Each array item can contain comma inside single-quoted or double-quoted string, {}, [] and () braces.
|
||||
-version
|
||||
Show VictoriaMetrics version
|
||||
```
|
||||
|
||||
|
||||
@@ -58,10 +58,13 @@ var (
|
||||
|
||||
disableKeepAlive = flag.Bool("http.disableKeepAlive", false, "Whether to disable HTTP keep-alive for incoming connections at -httpListenAddr")
|
||||
disableResponseCompression = flag.Bool("http.disableResponseCompression", false, "Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth")
|
||||
maxGracefulShutdownDuration = flag.Duration("http.maxGracefulShutdownDuration", 7*time.Second, `The maximum duration for a graceful shutdown of the HTTP server. A highly loaded server may require increased value for a graceful shutdown`)
|
||||
shutdownDelay = flag.Duration("http.shutdownDelay", 0, `Optional delay before http server shutdown. During this delay, the server returns non-OK responses from /health page, so load balancers can route new requests to other servers`)
|
||||
idleConnTimeout = flag.Duration("http.idleConnTimeout", time.Minute, "Timeout for incoming idle http connections")
|
||||
connTimeout = flag.Duration("http.connTimeout", 2*time.Minute, "Incoming connections to -httpListenAddr are closed after the configured timeout. "+
|
||||
maxGracefulShutdownDuration = flag.Duration("http.maxGracefulShutdownDuration", 7*time.Second, "The maximum duration for a graceful shutdown of the HTTP server. "+
|
||||
"During this period the server stops accepting new connections, but it will continue serving existing connections. "+
|
||||
"The remaining in-flight requests are canceled before the deadline, so the shutdown can finish within this duration. "+
|
||||
"A highly loaded server may require increased value for a graceful shutdown")
|
||||
shutdownDelay = flag.Duration("http.shutdownDelay", 0, `Optional delay before http server shutdown. During this delay, the server returns non-OK responses from /health page, so load balancers can route new requests to other servers`)
|
||||
idleConnTimeout = flag.Duration("http.idleConnTimeout", time.Minute, "Timeout for incoming idle http connections")
|
||||
connTimeout = flag.Duration("http.connTimeout", 2*time.Minute, "Incoming connections to -httpListenAddr are closed after the configured timeout. "+
|
||||
"This may help evenly spreading load among a cluster of services behind TCP-level load balancer. Zero value disables closing of incoming connections")
|
||||
|
||||
headerHSTS = flag.String("http.header.hsts", "", "Value for 'Strict-Transport-Security' header, recommended: 'max-age=31536000; includeSubDomains'")
|
||||
@@ -80,6 +83,7 @@ var (
|
||||
type server struct {
|
||||
shutdownDelayDeadline atomic.Int64
|
||||
s *http.Server
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// RequestHandler must serve the given request r and write response to w.
|
||||
@@ -156,7 +160,11 @@ func serve(addr string, rh RequestHandler, idx int, opts ServeOptions) {
|
||||
func serveWithListener(addr string, ln net.Listener, rh RequestHandler, disableBuiltinRoutes bool) {
|
||||
var s server
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.s = &http.Server{
|
||||
BaseContext: func(l net.Listener) context.Context {
|
||||
return ctx
|
||||
},
|
||||
|
||||
// Disable http/2, since it doesn't give any advantages for VictoriaMetrics services.
|
||||
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
|
||||
@@ -170,6 +178,7 @@ func serveWithListener(addr string, ln net.Listener, rh RequestHandler, disableB
|
||||
ErrorLog: log.New(&tlsErrorSkipLogger{}, "", 0),
|
||||
}
|
||||
s.s.SetKeepAlivesEnabled(!*disableKeepAlive)
|
||||
s.cancel = cancel
|
||||
if *connTimeout > 0 {
|
||||
s.s.ConnContext = func(ctx context.Context, _ net.Conn) context.Context {
|
||||
timeoutSec := connTimeout.Seconds()
|
||||
@@ -265,8 +274,18 @@ func stop(addr string) error {
|
||||
logger.Infof("Starting shutdown for http server %q", addr)
|
||||
}
|
||||
|
||||
// Cancel in-flight requests shortly before the deadline, reserving up to 2s (or 20%
|
||||
// of the window, whichever is smaller) for them to unwind, so Shutdown returns cleanly
|
||||
// within -http.maxGracefulShutdownDuration instead of timing out and dying via
|
||||
// logger.Fatalf -> os.Exit, which skips the storage flush and loses data.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/1502
|
||||
cancelInflightAfter := *maxGracefulShutdownDuration - min(*maxGracefulShutdownDuration/5, 2*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), *maxGracefulShutdownDuration)
|
||||
defer cancel()
|
||||
|
||||
t := time.AfterFunc(cancelInflightAfter, s.cancel)
|
||||
defer t.Stop()
|
||||
|
||||
if err := s.s.Shutdown(ctx); err != nil {
|
||||
return fmt.Errorf("cannot gracefully shutdown http server at %q in %.3fs; "+
|
||||
"probably, `-http.maxGracefulShutdownDuration` command-line flag value must be increased; error: %s", addr, maxGracefulShutdownDuration.Seconds(), err)
|
||||
|
||||
@@ -105,6 +105,10 @@ type body struct {
|
||||
Scope string `json:"scope,omitempty"`
|
||||
vmAccessClaim VMAccessClaim
|
||||
|
||||
// hasVMAccess is set to true when the token body contains a `vm_access` claim.
|
||||
// Presence enforcement is left to the caller via Token.HasVMAccess.
|
||||
hasVMAccess bool
|
||||
|
||||
buf []byte
|
||||
p *fastjson.Parser
|
||||
|
||||
@@ -121,7 +125,6 @@ type body struct {
|
||||
}
|
||||
|
||||
func (b *body) parse(src string) error {
|
||||
|
||||
var err error
|
||||
b.buf, err = decodeB64(b.buf[:0], src)
|
||||
if err != nil {
|
||||
@@ -132,6 +135,9 @@ func (b *body) parse(src string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if jv.Type() != fastjson.TypeObject {
|
||||
return fmt.Errorf("unexpected non json object; type: %q", jv.Type())
|
||||
}
|
||||
if expObject := jv.Get("exp"); expObject != nil {
|
||||
b.Exp, err = expObject.Int64()
|
||||
if err != nil {
|
||||
@@ -153,30 +159,31 @@ func (b *body) parse(src string) error {
|
||||
}
|
||||
|
||||
vaObject := jv.Get("vm_access")
|
||||
if vaObject == nil {
|
||||
return ErrVMAccessFieldMissing
|
||||
}
|
||||
// some IDPs encode custom claims as a string
|
||||
// try parsing as an object and fallback to a string
|
||||
switch vaObject.Type() {
|
||||
case fastjson.TypeObject:
|
||||
if err := b.vmAccessClaim.parseFrom(vaObject); err != nil {
|
||||
return err
|
||||
}
|
||||
case fastjson.TypeString:
|
||||
b.claimsParser = parserPool.Get()
|
||||
va, err := b.claimsParser.ParseBytes(vaObject.GetStringBytes())
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse `vm_access` string json: %w", err)
|
||||
}
|
||||
if err := b.vmAccessClaim.parseFrom(va); err != nil {
|
||||
return fmt.Errorf("cannot parse `vm_access` values from string json: %w", err)
|
||||
}
|
||||
b.vmAccessClaimObject = va
|
||||
case fastjson.TypeNull:
|
||||
return ErrVMAccessFieldMissing
|
||||
switch {
|
||||
case vaObject == nil || vaObject.Type() == fastjson.TypeNull:
|
||||
b.hasVMAccess = false
|
||||
default:
|
||||
return fmt.Errorf("unexpected type for `vm_access` field; got: %q, want object {}", vaObject.Type())
|
||||
// some IDPs encode custom claims as a string
|
||||
// try parsing as an object and fallback to a string
|
||||
switch vaObject.Type() {
|
||||
case fastjson.TypeObject:
|
||||
if err := b.vmAccessClaim.parseFrom(vaObject); err != nil {
|
||||
return err
|
||||
}
|
||||
case fastjson.TypeString:
|
||||
b.claimsParser = parserPool.Get()
|
||||
va, err := b.claimsParser.ParseBytes(vaObject.GetStringBytes())
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse `vm_access` string json: %w", err)
|
||||
}
|
||||
if err := b.vmAccessClaim.parseFrom(va); err != nil {
|
||||
return fmt.Errorf("cannot parse `vm_access` values from string json: %w", err)
|
||||
}
|
||||
b.vmAccessClaimObject = va
|
||||
default:
|
||||
return fmt.Errorf("unexpected type for `vm_access` field; got: %q, want object {}", vaObject.Type())
|
||||
}
|
||||
b.hasVMAccess = true
|
||||
}
|
||||
b.Jti = bytesutil.ToUnsafeString(jv.GetStringBytes("jti"))
|
||||
|
||||
@@ -218,6 +225,7 @@ func (b *body) reset() {
|
||||
b.buf = b.buf[:0]
|
||||
b.allClaims = nil
|
||||
b.vmAccessClaim.reset()
|
||||
b.hasVMAccess = false
|
||||
if b.p != nil {
|
||||
parserPool.Put(b.p)
|
||||
b.p = nil
|
||||
@@ -229,11 +237,9 @@ func (b *body) reset() {
|
||||
if b.vmAccessClaimObject != nil {
|
||||
b.vmAccessClaimObject = nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Parse parses JWT token from given source string
|
||||
//
|
||||
// Token field is valid until src is reachable
|
||||
func (t *Token) Parse(src string, enforceAuthPrefix bool) error {
|
||||
if enforceAuthPrefix && (len(src) < len(prefix) || !strings.EqualFold(src[:len(prefix)], prefix)) {
|
||||
@@ -268,6 +274,11 @@ func (t *Token) Parse(src string, enforceAuthPrefix bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasVMAccessClaim reports whether the parsed token contains a `vm_access` claim.
|
||||
func (t *Token) HasVMAccessClaim() bool {
|
||||
return t.body.hasVMAccess
|
||||
}
|
||||
|
||||
// Issuer returns `iss` claim value from token body
|
||||
func (t *Token) Issuer() string {
|
||||
return t.body.Iss
|
||||
@@ -371,30 +382,30 @@ func (t *Token) Reset() {
|
||||
|
||||
// VMAccessClaim represent JWT claim object
|
||||
type VMAccessClaim struct {
|
||||
MetricsExtraFilters []string `json:"metrics_extra_filters,omitempty"`
|
||||
MetricsExtraLabels []string `json:"metrics_extra_labels,omitempty"`
|
||||
LogsExtraFilters []string `json:"logs_extra_filters,omitempty"`
|
||||
LogsExtraStreamFilters []string `json:"logs_extra_stream_filters,omitempty"`
|
||||
MetricsExtraFilters []string `json:"metrics_extra_filters,omitempty" yaml:"metrics_extra_filters,omitempty"`
|
||||
MetricsExtraLabels []string `json:"metrics_extra_labels,omitempty" yaml:"metrics_extra_labels,omitempty"`
|
||||
LogsExtraFilters []string `json:"logs_extra_filters,omitempty" yaml:"logs_extra_filters,omitempty"`
|
||||
LogsExtraStreamFilters []string `json:"logs_extra_stream_filters,omitempty" yaml:"logs_extra_stream_filters,omitempty"`
|
||||
|
||||
MetricsAccountID uint32 `json:"metrics_account_id,omitempty"`
|
||||
MetricsProjectID uint32 `json:"metrics_project_id,omitempty"`
|
||||
MetricsAccountID uint32 `json:"metrics_account_id,omitempty" yaml:"metrics_account_id,omitempty"`
|
||||
MetricsProjectID uint32 `json:"metrics_project_id,omitempty" yaml:"metrics_project_id,omitempty"`
|
||||
|
||||
LogsAccountID uint32 `json:"logs_account_id,omitempty"`
|
||||
LogsProjectID uint32 `json:"logs_project_id,omitempty"`
|
||||
LogsAccountID uint32 `json:"logs_account_id,omitempty" yaml:"logs_account_id,omitempty"`
|
||||
LogsProjectID uint32 `json:"logs_project_id,omitempty" yaml:"logs_project_id,omitempty"`
|
||||
|
||||
// Properties below are deprecated and retained only for compatibility with vmgateway, which is itself deprecated.
|
||||
|
||||
// promql filters applied to each select query
|
||||
// Deprecated
|
||||
ExtraFilters []string `json:"extra_filters,omitempty"`
|
||||
ExtraFilters []string `json:"extra_filters,omitempty" yaml:"-"`
|
||||
// Deprecated
|
||||
Tenant TenantID `json:"tenant_id"`
|
||||
Tenant TenantID `json:"tenant_id" yaml:"-"`
|
||||
// role can be denied as 1 = read, 2 = write, 3 = read and write
|
||||
// 0 = unconfigured - read and write
|
||||
// Deprecated
|
||||
Mode int `json:"mode,omitempty"`
|
||||
Mode int `json:"mode,omitempty" yaml:"-"`
|
||||
// Deprecated
|
||||
Labels []string `json:"extra_labels,omitempty"`
|
||||
Labels []string `json:"extra_labels,omitempty" yaml:"-"`
|
||||
// labelsBuf holds allocated memory for Labels
|
||||
// Deprecated
|
||||
labelsBuf []byte
|
||||
@@ -425,7 +436,6 @@ func (vac *VMAccessClaim) reset() {
|
||||
}
|
||||
|
||||
func (vac *VMAccessClaim) parseFrom(jv *fastjson.Value) error {
|
||||
|
||||
if err := vac.Tenant.parseFrom(jv); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -569,6 +579,9 @@ func NewToken(auth string, enforceAuthPrefix bool) (*Token, error) {
|
||||
if err := t.parse(jwt[0], jwt[1], jwt[2]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !t.body.hasVMAccess {
|
||||
return nil, ErrVMAccessFieldMissing
|
||||
}
|
||||
return &t, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -168,17 +168,10 @@ func TestParseJWTBody_Failure(t *testing.T) {
|
||||
true,
|
||||
)
|
||||
|
||||
// invalid body type json
|
||||
// non-object body type
|
||||
f(
|
||||
`[]`,
|
||||
"missing `vm_access` claim",
|
||||
true,
|
||||
)
|
||||
|
||||
// missing vm_access claim
|
||||
f(
|
||||
`{}`,
|
||||
"missing `vm_access` claim",
|
||||
`unexpected non json object; type: "array"`,
|
||||
true,
|
||||
)
|
||||
|
||||
@@ -189,13 +182,6 @@ func TestParseJWTBody_Failure(t *testing.T) {
|
||||
true,
|
||||
)
|
||||
|
||||
// vm_access claim null
|
||||
f(
|
||||
`{"vm_access": null}`,
|
||||
"missing `vm_access` claim",
|
||||
true,
|
||||
)
|
||||
|
||||
// invalid vm_access: account_id type mismatch
|
||||
f(
|
||||
`{"vm_access": {"tenant_id": {"account_id": "1", "project_id": 5}}}`,
|
||||
@@ -555,6 +541,33 @@ func TestParseJWTBody_Success(t *testing.T) {
|
||||
)
|
||||
}
|
||||
|
||||
func TestParseJWTBody_VMAccessPresence(t *testing.T) {
|
||||
f := func(data string, wantHasVMAccess bool) {
|
||||
t.Helper()
|
||||
|
||||
encodedLen := base64.RawURLEncoding.EncodedLen(len(data))
|
||||
encoded := make([]byte, encodedLen)
|
||||
base64.RawURLEncoding.Encode(encoded, []byte(data))
|
||||
|
||||
var b body
|
||||
if err := b.parse(string(encoded)); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if b.hasVMAccess != wantHasVMAccess {
|
||||
t.Fatalf("unexpected hasVMAccess; got %v; want %v", b.hasVMAccess, wantHasVMAccess)
|
||||
}
|
||||
}
|
||||
|
||||
// vm_access claim is present
|
||||
f(`{"vm_access": {}}`, true)
|
||||
f(`{"vm_access": {"metrics_account_id": 1}}`, true)
|
||||
|
||||
// vm_access claim is absent or null - parsing must succeed with hasVMAccess=false
|
||||
f(`{}`, false)
|
||||
f(`{"vm_access": null}`, false)
|
||||
f(`{"role": "admin"}`, false)
|
||||
}
|
||||
|
||||
func TestNewTokenFromRequest_Failure(t *testing.T) {
|
||||
f := func(r *http.Request) {
|
||||
t.Helper()
|
||||
@@ -866,7 +879,6 @@ func TestNewTokenFromRequest_Success(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTokenMatchClaims(t *testing.T) {
|
||||
|
||||
/*
|
||||
{
|
||||
"iss": "https://login.microsoftonline.com/-6691-4868-a77b-1b0f9bbe5f43/v2.0",
|
||||
|
||||
311
lib/mdx/filter.go
Normal file
311
lib/mdx/filter.go
Normal file
@@ -0,0 +1,311 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
var (
|
||||
vmLabel = flag.String("mdx.label", "", "Optional label value in the form 'name=value' used to identify VictoriaMetrics metrics for MDX. "+
|
||||
"Metrics containing the specified label are forwarded to `-remoteWrite.url` endpoints configured with `-remoteWrite.mdx.enable=true`.")
|
||||
)
|
||||
|
||||
const (
|
||||
vmAppLabelName = "victoriametrics_app"
|
||||
vmAppLabelValue = "true"
|
||||
vmAppVersionMetricName = "vm_app_version"
|
||||
)
|
||||
|
||||
// Ctx defines filtering context
|
||||
type Ctx struct {
|
||||
// labels hold modified timeseries labels
|
||||
// valid until PutContext call
|
||||
labels []prompb.Label
|
||||
|
||||
buf []byte
|
||||
hasVMAppLabel bool
|
||||
hasVMAppVersionLabel bool
|
||||
hasFilterLabelValue bool
|
||||
jobLabelValue string
|
||||
instanceLabelValue string
|
||||
}
|
||||
|
||||
func (ctx *Ctx) reset() {
|
||||
// do not reset labels intentionally
|
||||
// it must live until PutContext call
|
||||
|
||||
ctx.buf = ctx.buf[:0]
|
||||
ctx.hasVMAppLabel = false
|
||||
ctx.hasVMAppVersionLabel = false
|
||||
ctx.hasFilterLabelValue = false
|
||||
ctx.jobLabelValue = ""
|
||||
ctx.instanceLabelValue = ""
|
||||
}
|
||||
|
||||
var ctxPool = &sync.Pool{
|
||||
New: func() any {
|
||||
return &Ctx{}
|
||||
},
|
||||
}
|
||||
|
||||
// GetContext returns filtering context
|
||||
func GetContext() *Ctx {
|
||||
return ctxPool.Get().(*Ctx)
|
||||
}
|
||||
|
||||
// PutContext resets context
|
||||
func PutContext(ctx *Ctx) {
|
||||
clear(ctx.labels)
|
||||
ctx.labels = ctx.labels[:0]
|
||||
ctx.reset()
|
||||
ctxPool.Put(ctx)
|
||||
}
|
||||
|
||||
// Filter manages the list of VictoriaMetrics instances grouped by job:instance labels.
|
||||
// job and instance must present at timeseries.
|
||||
//
|
||||
// Filter keeps timeseries with any of the following conditions:
|
||||
// * vm_app_version present
|
||||
// * victoriametrics_app=true label present at timeseries
|
||||
// * if labels has label value defined with flag `-mdx.label`
|
||||
//
|
||||
// Filter track entries with TTL of 1 hour
|
||||
type Filter struct {
|
||||
tracker *instanceTracker
|
||||
filterByLabelName string
|
||||
label string
|
||||
}
|
||||
|
||||
// NewFilter returns new Filter instance
|
||||
func NewFilter() *Filter {
|
||||
filter := &Filter{
|
||||
tracker: newInstanceTracker(),
|
||||
}
|
||||
if len(*vmLabel) > 0 {
|
||||
n := strings.IndexByte(*vmLabel, '=')
|
||||
if n < 0 {
|
||||
logger.Fatalf("missing '=' in `-mdx.label`. It must contain label in the form `name=value`; got %q", *vmLabel)
|
||||
}
|
||||
filter.filterByLabelName = (*vmLabel)[:n]
|
||||
filter.label = (*vmLabel)[n+1:]
|
||||
if len(filter.filterByLabelName) == 0 || len(filter.label) == 0 {
|
||||
logger.Fatalf("label name and value cannot be empty in `-mdx.label`. It must contain label in the form `name=value`; got %q", *vmLabel)
|
||||
}
|
||||
}
|
||||
|
||||
return filter
|
||||
}
|
||||
|
||||
// VMInstancesCount returns amount of currently tracked instances
|
||||
func (filter *Filter) VMInstancesCount() int {
|
||||
return filter.tracker.len()
|
||||
}
|
||||
|
||||
// MustStop stops filter instance
|
||||
func (filter *Filter) MustStop() {
|
||||
filter.tracker.mustStop()
|
||||
}
|
||||
|
||||
// Filter filters provided timeseries with given context.
|
||||
//
|
||||
// Returned timeseries is valid as long as Ctx is valid
|
||||
func (filter *Filter) Filter(ctx *Ctx, tss []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
dstTss := tss[:0]
|
||||
for _, ts := range tss {
|
||||
ctx.prepare(ts.Labels, filter.filterByLabelName, filter.label)
|
||||
key := ctx.formatTimeSeriesKey()
|
||||
if len(key) == 0 {
|
||||
// metrics with empty job or instance labels must be always dropped
|
||||
// despite any other conditions
|
||||
continue
|
||||
}
|
||||
if ctx.hasVMAppLabel {
|
||||
filter.trackInstance(key)
|
||||
dstTss = append(dstTss, ts)
|
||||
continue
|
||||
}
|
||||
if ctx.hasFilterLabelValue || ctx.hasVMAppVersionLabel {
|
||||
ts.Labels = ctx.addVMAppLabel(ts.Labels)
|
||||
filter.trackInstance(key)
|
||||
dstTss = append(dstTss, ts)
|
||||
continue
|
||||
}
|
||||
ok := filter.tracker.has(key)
|
||||
if ok {
|
||||
ts.Labels = ctx.addVMAppLabel(ts.Labels)
|
||||
dstTss = append(dstTss, ts)
|
||||
}
|
||||
}
|
||||
return dstTss
|
||||
}
|
||||
|
||||
func (filter *Filter) trackInstance(key string) {
|
||||
if filter.tracker.has(key) {
|
||||
return
|
||||
}
|
||||
key = strings.Clone(key)
|
||||
filter.tracker.register(key)
|
||||
}
|
||||
|
||||
func (ctx *Ctx) prepare(labels []prompb.Label, filterByLabelName, label string) {
|
||||
ctx.reset()
|
||||
|
||||
// always use the last label=value pair
|
||||
// because in case of possible label duplicates,
|
||||
// the last added label must win
|
||||
for _, l := range labels {
|
||||
switch l.Name {
|
||||
case "job":
|
||||
ctx.jobLabelValue = l.Value
|
||||
case "instance":
|
||||
ctx.instanceLabelValue = l.Value
|
||||
case vmAppLabelName:
|
||||
if l.Value == vmAppLabelValue {
|
||||
ctx.hasVMAppLabel = true
|
||||
}
|
||||
case "__name__":
|
||||
if l.Value == vmAppVersionMetricName {
|
||||
ctx.hasVMAppVersionLabel = true
|
||||
}
|
||||
}
|
||||
if len(filterByLabelName) > 0 {
|
||||
if l.Name == filterByLabelName && l.Value == label {
|
||||
ctx.hasFilterLabelValue = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// formatTimeSeriesKey returns timeseries key after ctx.prepare call
|
||||
// if it catched job and instances labels
|
||||
//
|
||||
// returned string is valid until next ctx.prepare
|
||||
func (ctx *Ctx) formatTimeSeriesKey() string {
|
||||
if len(ctx.jobLabelValue) == 0 || len(ctx.instanceLabelValue) == 0 {
|
||||
return ""
|
||||
}
|
||||
buf := ctx.buf[:0]
|
||||
buf = strconv.AppendQuote(buf, ctx.jobLabelValue)
|
||||
buf = append(buf, ':')
|
||||
buf = strconv.AppendQuote(buf, ctx.instanceLabelValue)
|
||||
ctx.buf = buf
|
||||
return bytesutil.ToUnsafeString(buf)
|
||||
}
|
||||
|
||||
func (ctx *Ctx) addVMAppLabel(labels []prompb.Label) []prompb.Label {
|
||||
// unconditionally add vmAppLabelValue at the end of labels list
|
||||
// it will overwrite any exist vmAppLabelName labels with a value different to vmAppLabelValue
|
||||
// it's guaranteed by VictoriaMetrics ingestion contract
|
||||
poolLabels := ctx.labels
|
||||
poolLabelsLen := len(poolLabels)
|
||||
poolLabels = append(poolLabels, labels...)
|
||||
poolLabels = append(poolLabels, prompb.Label{Name: vmAppLabelName, Value: vmAppLabelValue})
|
||||
ctx.labels = poolLabels
|
||||
return poolLabels[poolLabelsLen:len(poolLabels):len(poolLabels)]
|
||||
}
|
||||
|
||||
type instanceTracker struct {
|
||||
mu sync.RWMutex
|
||||
lastAccessByKey map[string]*atomic.Uint64
|
||||
wg sync.WaitGroup
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
func newInstanceTracker() *instanceTracker {
|
||||
c := &instanceTracker{
|
||||
lastAccessByKey: make(map[string]*atomic.Uint64),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
c.wg.Add(1)
|
||||
go c.startStaleWatcher()
|
||||
return c
|
||||
}
|
||||
|
||||
func (it *instanceTracker) len() int {
|
||||
it.mu.RLock()
|
||||
s := len(it.lastAccessByKey)
|
||||
it.mu.RUnlock()
|
||||
return s
|
||||
}
|
||||
|
||||
func (it *instanceTracker) has(key string) bool {
|
||||
it.mu.RLock()
|
||||
lat, ok := it.lastAccessByKey[key]
|
||||
it.mu.RUnlock()
|
||||
if ok {
|
||||
lat.Store(fasttime.UnixTimestamp())
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
func (it *instanceTracker) register(key string) {
|
||||
it.mu.Lock()
|
||||
// key could be registered by concurrent goroutine
|
||||
lat, ok := it.lastAccessByKey[key]
|
||||
if !ok {
|
||||
lat = &atomic.Uint64{}
|
||||
it.lastAccessByKey[key] = lat
|
||||
}
|
||||
it.mu.Unlock()
|
||||
lat.Store(fasttime.UnixTimestamp())
|
||||
}
|
||||
|
||||
func (it *instanceTracker) startStaleWatcher() {
|
||||
defer it.wg.Done()
|
||||
|
||||
t := time.NewTicker(time.Minute)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-it.stop:
|
||||
return
|
||||
case <-t.C:
|
||||
it.cleanStale()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var entryTTLSeconds = uint64(time.Hour.Seconds())
|
||||
|
||||
func (it *instanceTracker) cleanStale() {
|
||||
ct := fasttime.UnixTimestamp()
|
||||
var toDelete map[string]*atomic.Uint64
|
||||
|
||||
it.mu.RLock()
|
||||
for key, lastAccessTime := range it.lastAccessByKey {
|
||||
accessedAt := lastAccessTime.Load()
|
||||
if ct > accessedAt+entryTTLSeconds {
|
||||
if toDelete == nil {
|
||||
toDelete = make(map[string]*atomic.Uint64)
|
||||
}
|
||||
toDelete[key] = lastAccessTime
|
||||
}
|
||||
}
|
||||
it.mu.RUnlock()
|
||||
|
||||
if len(toDelete) > 0 {
|
||||
it.mu.Lock()
|
||||
for key, lastAccessTime := range toDelete {
|
||||
accessedAt := lastAccessTime.Load()
|
||||
// concurrent goroutine may refresh lastAccessTime
|
||||
if ct > accessedAt+entryTTLSeconds {
|
||||
delete(it.lastAccessByKey, key)
|
||||
}
|
||||
}
|
||||
it.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (it *instanceTracker) mustStop() {
|
||||
close(it.stop)
|
||||
it.wg.Wait()
|
||||
}
|
||||
104
lib/mdx/filter_synctest_test.go
Normal file
104
lib/mdx/filter_synctest_test.go
Normal file
@@ -0,0 +1,104 @@
|
||||
//go:build synctest
|
||||
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"testing/synctest"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
func TestMdxInstanceCleanup(t *testing.T) {
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
filter := NewFilter()
|
||||
defer filter.MustStop()
|
||||
assertFilterLen := func(expectedLen int) {
|
||||
t.Helper()
|
||||
if filter.VMInstancesCount() != expectedLen {
|
||||
t.Fatalf("unexpected instance map length; got %d; want %d", filter.VMInstancesCount(), expectedLen)
|
||||
}
|
||||
}
|
||||
|
||||
ctx := GetContext()
|
||||
filter.Filter(ctx, []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_up"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "node-exporter1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_request_duration_seconds"},
|
||||
{Name: "instance", Value: "service1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent1:8429"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "scrape_targets_up"},
|
||||
{Name: "instance", Value: "vmagent1:8429"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
PutContext(ctx)
|
||||
|
||||
time.Sleep(1 * time.Minute)
|
||||
// the entries should not be cleaned.
|
||||
assertFilterLen(2)
|
||||
|
||||
time.Sleep(58 * time.Minute)
|
||||
// receive samples from victoria-metrics1:8428 after 59 minutes.
|
||||
// so the entry will be refreshed.
|
||||
ctx = GetContext()
|
||||
filter.Filter(ctx, []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_up"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
PutContext(ctx)
|
||||
assertFilterLen(2)
|
||||
|
||||
// entry for job:instance - test:vmagent1:8429 must be removed
|
||||
time.Sleep(4 * time.Minute)
|
||||
assertFilterLen(1)
|
||||
|
||||
// no samples from vmagent1:8429 in the last hour, so it should be removed from the mdx instance list.
|
||||
time.Sleep(2 * time.Hour)
|
||||
|
||||
assertFilterLen(0)
|
||||
})
|
||||
|
||||
}
|
||||
435
lib/mdx/filter_test.go
Normal file
435
lib/mdx/filter_test.go
Normal file
@@ -0,0 +1,435 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
func TestMdxInstanceFilter(t *testing.T) {
|
||||
originalVmLabel := *vmLabel
|
||||
*vmLabel = "service=victoriametrics"
|
||||
t.Cleanup(func() {
|
||||
*vmLabel = originalVmLabel
|
||||
})
|
||||
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries) {
|
||||
t.Helper()
|
||||
filter := NewFilter()
|
||||
defer filter.MustStop()
|
||||
|
||||
ctx := GetContext()
|
||||
defer PutContext(ctx)
|
||||
inputCopy := append([]prompb.TimeSeries{}, input...)
|
||||
output := filter.Filter(ctx, inputCopy)
|
||||
if diff := cmp.Diff(expectedOutput, output); len(diff) > 0 {
|
||||
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
|
||||
}
|
||||
// make sure that result is the same over multiple calls
|
||||
inputCopy = append([]prompb.TimeSeries{}, input...)
|
||||
output = filter.Filter(ctx, inputCopy)
|
||||
if diff := cmp.Diff(expectedOutput, output); len(diff) > 0 {
|
||||
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
}
|
||||
// metrics with vm_app_version and different order of labels.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics2:8428"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics3:8428"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics2:8428"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics3:8428"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
}},
|
||||
)
|
||||
// metrics without vm_app_version but with service=victoriametrics that is specified in `-mdx.label`.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
})
|
||||
// metrics without vm_app_version but with service=victoriametrics that is specified in `-mdx.label`.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
}},
|
||||
[]prompb.TimeSeries{
|
||||
// 2.
|
||||
// metrics without vm_app_version but with service=victoriametrics that is specified in `-mdx.label`.
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
}})
|
||||
|
||||
// metrics with vm_app_version and service=victoriametrics should be preserved.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
// metrics without vm_app_version and `service=victoriametrics` but with `victoriametrics_app=true`, which should be preserved.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics6:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics6:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// metrics without vm_app_version and service=victoriametrics and `victoriametrics_app=true`, which should be filtered out.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "node-exporter1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_request_duration_seconds"},
|
||||
{Name: "instance", Value: "service1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{},
|
||||
)
|
||||
|
||||
// metrics with vm_app_version but job or instance is empty (or missing), they should be dropped.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: ""},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent2:8429"},
|
||||
{Name: "job", Value: ""},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent2:8429"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{})
|
||||
|
||||
// metrics without vm_app_version, but the instances were already registered with first timeseries
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_rows_inserted_total"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_rows_inserted_total"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// metrics without vm_app_version, `service=victoriametrics` and `victoriametrics_app=true`, and the instance wasn't already registered in the previous call, so it will be dropped.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
|
||||
{Name: "instance", Value: "victoria-metrics7:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
}},
|
||||
)
|
||||
|
||||
// metrics with duplicate victoriametrics_app label
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "other_value"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "other_value"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// metrics with duplicate job and instance labels
|
||||
// last value wins
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "job", Value: "test2"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_requests_total"},
|
||||
{Name: "job", Value: "test2"},
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "job", Value: "test2"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_requests_total"},
|
||||
{Name: "job", Value: "test2"},
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
}})
|
||||
|
||||
}
|
||||
|
||||
func TestMdxInstanceFilterConcurrent(t *testing.T) {
|
||||
originalVmLabel := *vmLabel
|
||||
*vmLabel = "service=victoriametrics"
|
||||
t.Cleanup(func() { *vmLabel = originalVmLabel })
|
||||
|
||||
filter := NewFilter()
|
||||
defer filter.MustStop()
|
||||
|
||||
const concurrency = 8
|
||||
const iterations = 200
|
||||
|
||||
generateSeries := func(g int) []prompb.TimeSeries {
|
||||
return []prompb.TimeSeries{
|
||||
{Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: fmt.Sprintf("vm-%d:8428", g)},
|
||||
}},
|
||||
// shared job:instance
|
||||
{Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "vmagent:8428"},
|
||||
}},
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for worker := range concurrency {
|
||||
wg.Go(func() {
|
||||
input := generateSeries(worker)
|
||||
var expectedOutput []prompb.TimeSeries
|
||||
for _, inputTs := range input {
|
||||
labels := append([]prompb.Label{}, inputTs.Labels...)
|
||||
labels = append(labels, prompb.Label{Name: vmAppLabelName, Value: vmAppLabelValue})
|
||||
expectedOutput = append(expectedOutput, prompb.TimeSeries{Labels: labels})
|
||||
}
|
||||
for range iterations {
|
||||
ctx := GetContext()
|
||||
inputCopy := append([]prompb.TimeSeries{}, input...)
|
||||
output := filter.Filter(ctx, inputCopy)
|
||||
if diff := cmp.Diff(expectedOutput, output); len(diff) > 0 {
|
||||
t.Errorf("unexpected result (-want, +got):\n%s", diff)
|
||||
}
|
||||
PutContext(ctx)
|
||||
|
||||
}
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// goroutines + 1 shared
|
||||
if got := filter.VMInstancesCount(); got != concurrency+1 {
|
||||
t.Errorf("unexpected instance count: got %d, want %d", got, concurrency+1)
|
||||
}
|
||||
}
|
||||
85
lib/mdx/filter_timing_test.go
Normal file
85
lib/mdx/filter_timing_test.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
func BenchmarkFilter(b *testing.B) {
|
||||
f := func(name string, input, want []prompb.TimeSeries) {
|
||||
b.Helper()
|
||||
|
||||
b.Run(name, func(b *testing.B) {
|
||||
filter := NewFilter()
|
||||
defer filter.MustStop()
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
ctx := GetContext()
|
||||
localInput := append([]prompb.TimeSeries{}, input...)
|
||||
tss := filter.Filter(ctx, localInput)
|
||||
if len(tss) != len(want) {
|
||||
diff := cmp.Diff(want, tss)
|
||||
b.Fatalf("unexpected result (-want, +got):\n%s", diff)
|
||||
}
|
||||
PutContext(ctx)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
input := []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "http_requests_total"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "http_requests_errors_total"},
|
||||
},
|
||||
},
|
||||
}
|
||||
expected := []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "http_requests_total"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "http_requests_errors_total"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
}
|
||||
f("match vm_app_version", input, expected)
|
||||
}
|
||||
@@ -112,11 +112,19 @@ func (m *TimeSeries) size() (n int) {
|
||||
}
|
||||
for _, e := range m.Labels {
|
||||
l := e.size()
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
for _, e := range m.Samples {
|
||||
l := e.size()
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
@@ -126,10 +134,18 @@ func (m *Label) size() (n int) {
|
||||
return 0
|
||||
}
|
||||
if l := len(m.Name); l > 0 {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
if l := len(m.Value); l > 0 {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
@@ -160,6 +176,11 @@ func (m *WriteRequest) marshalToSizedBuffer(dst []byte) (int, error) {
|
||||
}
|
||||
|
||||
func encodeVarint(dst []byte, offset int, v uint64) int {
|
||||
if v < 1<<7 {
|
||||
offset--
|
||||
dst[offset] = byte(v)
|
||||
return offset
|
||||
}
|
||||
offset -= sov(v)
|
||||
base := offset
|
||||
for v >= 1<<7 {
|
||||
@@ -180,7 +201,11 @@ func (m *WriteRequest) size() (n int) {
|
||||
}
|
||||
for _, e := range m.Metadata {
|
||||
l := e.size()
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
@@ -238,13 +263,25 @@ func (m *MetricMetadata) size() (n int) {
|
||||
n += 1 + sov(uint64(m.Type))
|
||||
}
|
||||
if l := len(m.MetricFamilyName); l > 0 {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
if l := len(m.Help); l > 0 {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
if l := len(m.Unit); l > 0 {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
if m.AccountID != 0 {
|
||||
n += 1 + sov(uint64(m.AccountID))
|
||||
|
||||
Reference in New Issue
Block a user