mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-06-30 22:23:49 +03:00
Compare commits
12 Commits
polish-das
...
master
| Author | SHA1 | Date | |
|---|---|---|---|
|
|
c82127b6d4 | ||
|
|
06bc808ddc | ||
|
|
a6927c46be | ||
|
|
15a4c31e87 | ||
|
|
54f9cd6edd | ||
|
|
2e16874e95 | ||
|
|
81d330f297 | ||
|
|
3278ddd170 | ||
|
|
cc790c2ea1 | ||
|
|
950f38fd6a | ||
|
|
ab9db9152f | ||
|
|
5b89f52c72 |
@@ -35,6 +35,9 @@ func InsertHandler(at *auth.Token, req *http.Request) error {
|
||||
}
|
||||
|
||||
func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, mms []prompb.MetricMetadata, extraLabels []prompb.Label) error {
|
||||
if len(extraLabels) == 0 && !prommetadata.IsEnabled() && at == nil {
|
||||
return insertRowsFast(at, timeseries)
|
||||
}
|
||||
ctx := common.GetPushCtx()
|
||||
defer common.PutPushCtx(ctx)
|
||||
|
||||
@@ -102,3 +105,17 @@ func insertRows(at *auth.Token, timeseries []prompb.TimeSeries, mms []prompb.Met
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return nil
|
||||
}
|
||||
|
||||
func insertRowsFast(at *auth.Token, timeseries []prompb.TimeSeries) error {
|
||||
rowsTotal := 0
|
||||
for i := range timeseries {
|
||||
rowsTotal += len(timeseries[i].Samples)
|
||||
}
|
||||
wr := &prompb.WriteRequest{Timeseries: timeseries}
|
||||
if !remotewrite.TryPush(at, wr) {
|
||||
return remotewrite.ErrQueueFullHTTPRetry
|
||||
}
|
||||
rowsInserted.Add(rowsTotal)
|
||||
rowsPerInsert.Update(float64(rowsTotal))
|
||||
return nil
|
||||
}
|
||||
|
||||
@@ -12,19 +12,18 @@ import (
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/cespare/xxhash/v2"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/auth"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bloomfilter"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/cgroup"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/consistenthash"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/flagutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fs"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/httpserver"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/mdx"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/memory"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/persistentqueue"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
|
||||
@@ -106,6 +105,9 @@ var (
|
||||
"cannot be pushed into the configured -remoteWrite.url systems in a timely manner. See https://docs.victoriametrics.com/victoriametrics/vmagent/#disabling-on-disk-persistence")
|
||||
disableMetadataPerURL = flagutil.NewArrayBool("remoteWrite.disableMetadata", "Whether to disable sending metadata to the corresponding -remoteWrite.url. "+
|
||||
"By default, metadata sending is controlled by the global -enableMetadata flag")
|
||||
|
||||
enableMdx = flagutil.NewArrayBool("remoteWrite.mdx.enable", "Whether to only retain metrics from VictoriaMetrics services before sending them to the corresponding -remoteWrite.url. "+
|
||||
"Please see https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange")
|
||||
)
|
||||
|
||||
var (
|
||||
@@ -162,8 +164,8 @@ func InitSecretFlags() {
|
||||
}
|
||||
|
||||
var (
|
||||
shardByURLLabelsMap map[string]struct{}
|
||||
shardByURLIgnoreLabelsMap map[string]struct{}
|
||||
shardByURLLabelsFilter []string
|
||||
shardByURLIgnoreLabelsFilter []string
|
||||
)
|
||||
|
||||
// Init initializes remotewrite.
|
||||
@@ -210,8 +212,8 @@ func Init() {
|
||||
logger.Fatalf("-remoteWrite.shardByURL.labels and -remoteWrite.shardByURL.ignoreLabels cannot be set simultaneously; " +
|
||||
"see https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages")
|
||||
}
|
||||
shardByURLLabelsMap = newMapFromStrings(*shardByURLLabels)
|
||||
shardByURLIgnoreLabelsMap = newMapFromStrings(*shardByURLIgnoreLabels)
|
||||
shardByURLLabelsFilter = slices.Clone(*shardByURLLabels)
|
||||
shardByURLIgnoreLabelsFilter = slices.Clone(*shardByURLIgnoreLabels)
|
||||
|
||||
initLabelsGlobal()
|
||||
|
||||
@@ -307,6 +309,10 @@ func initRemoteWriteCtxs(urls []string) {
|
||||
}
|
||||
fs.RegisterPathFsMetrics(*tmpDataPath)
|
||||
|
||||
if slices.Contains(*enableMdx, true) && *shardByURL {
|
||||
logger.Fatalf("-remoteWrite.mdx.enable and -remoteWrite.shardByURL cannot be set to true simultaneously.")
|
||||
}
|
||||
|
||||
if *shardByURL {
|
||||
consistentHashNodes := make([]string, 0, len(urls))
|
||||
for i, url := range urls {
|
||||
@@ -698,18 +704,18 @@ func shardAmountRemoteWriteCtx(tssBlock []prompb.TimeSeries, shards [][]prompb.T
|
||||
|
||||
for _, ts := range tssBlock {
|
||||
hashLabels := ts.Labels
|
||||
if len(shardByURLLabelsMap) > 0 {
|
||||
if len(shardByURLLabelsFilter) > 0 {
|
||||
hashLabels = tmpLabels.Labels[:0]
|
||||
for _, label := range ts.Labels {
|
||||
if _, ok := shardByURLLabelsMap[label.Name]; ok {
|
||||
if slices.Contains(shardByURLLabelsFilter, label.Name) {
|
||||
hashLabels = append(hashLabels, label)
|
||||
}
|
||||
}
|
||||
tmpLabels.Labels = hashLabels
|
||||
} else if len(shardByURLIgnoreLabelsMap) > 0 {
|
||||
} else if len(shardByURLIgnoreLabelsFilter) > 0 {
|
||||
hashLabels = tmpLabels.Labels[:0]
|
||||
for _, label := range ts.Labels {
|
||||
if _, ok := shardByURLIgnoreLabelsMap[label.Name]; !ok {
|
||||
if !slices.Contains(shardByURLIgnoreLabelsFilter, label.Name) {
|
||||
hashLabels = append(hashLabels, label)
|
||||
}
|
||||
}
|
||||
@@ -810,34 +816,26 @@ var (
|
||||
// it omits the '=' separator between label name and value for backward compatibility.
|
||||
// Changing it would re-shard all series across remoteWrite targets.
|
||||
func getLabelsHashForShard(labels []prompb.Label) uint64 {
|
||||
bb := labelsHashBufPool.Get()
|
||||
b := bb.B[:0]
|
||||
var d xxhash.Digest
|
||||
d.Reset()
|
||||
for _, label := range labels {
|
||||
b = append(b, label.Name...)
|
||||
b = append(b, label.Value...)
|
||||
_, _ = d.WriteString(label.Name)
|
||||
_, _ = d.WriteString(label.Value)
|
||||
}
|
||||
h := xxhash.Sum64(b)
|
||||
bb.B = b
|
||||
labelsHashBufPool.Put(bb)
|
||||
return h
|
||||
return d.Sum64()
|
||||
}
|
||||
|
||||
func getLabelsHash(labels []prompb.Label) uint64 {
|
||||
bb := labelsHashBufPool.Get()
|
||||
b := bb.B[:0]
|
||||
var d xxhash.Digest
|
||||
d.Reset()
|
||||
for _, label := range labels {
|
||||
b = append(b, label.Name...)
|
||||
b = append(b, '=')
|
||||
b = append(b, label.Value...)
|
||||
_, _ = d.WriteString(label.Name)
|
||||
_, _ = d.WriteString("=")
|
||||
_, _ = d.WriteString(label.Value)
|
||||
}
|
||||
h := xxhash.Sum64(b)
|
||||
bb.B = b
|
||||
labelsHashBufPool.Put(bb)
|
||||
return h
|
||||
return d.Sum64()
|
||||
}
|
||||
|
||||
var labelsHashBufPool bytesutil.ByteBufferPool
|
||||
|
||||
func logSkippedSeries(labels []prompb.Label, flagName string, flagValue int) {
|
||||
select {
|
||||
case <-logSkippedSeriesTicker.C:
|
||||
@@ -862,6 +860,7 @@ type remoteWriteCtx struct {
|
||||
|
||||
sas atomic.Pointer[streamaggr.Aggregators]
|
||||
deduplicator *streamaggr.Deduplicator
|
||||
mdxFilter *mdx.Filter
|
||||
|
||||
streamAggrKeepInput bool
|
||||
streamAggrDropInput bool
|
||||
@@ -876,6 +875,7 @@ type remoteWriteCtx struct {
|
||||
|
||||
rowsPushedAfterRelabel *metrics.Counter
|
||||
rowsDroppedByRelabel *metrics.Counter
|
||||
mdxRowsPreserved *metrics.Counter
|
||||
|
||||
pushFailures *metrics.Counter
|
||||
metadataDroppedOnPushFailure *metrics.Counter
|
||||
@@ -972,7 +972,6 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
for i := range pss {
|
||||
pss[i] = newPendingSeries(fq, &c.useVMProto, sf, rd)
|
||||
}
|
||||
|
||||
rwctx := &remoteWriteCtx{
|
||||
idx: argIdx,
|
||||
fq: fq,
|
||||
@@ -989,6 +988,16 @@ func newRemoteWriteCtx(argIdx int, remoteWriteURL *url.URL, sanitizedURL string)
|
||||
}
|
||||
rwctx.initStreamAggrConfig()
|
||||
|
||||
if enableMdx.GetOptionalArg(argIdx) {
|
||||
mdxFilter := mdx.NewFilter()
|
||||
rwctx.mdxFilter = mdxFilter
|
||||
rwctx.mdxRowsPreserved = metrics.GetOrCreateCounter(fmt.Sprintf(`vmagent_remotewrite_mdx_rows_preserved_total{path=%q,url=%q}`, queuePath, sanitizedURL))
|
||||
_ = metrics.NewGauge(fmt.Sprintf(`vmagent_remotewrite_mdx_tracked_instances{path=%q,url=%q}`, queuePath, sanitizedURL), func() float64 {
|
||||
return float64(mdxFilter.VMInstancesCount())
|
||||
})
|
||||
|
||||
}
|
||||
|
||||
return rwctx
|
||||
}
|
||||
|
||||
@@ -1002,6 +1011,11 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
rwctx.deduplicator.MustStop()
|
||||
rwctx.deduplicator = nil
|
||||
}
|
||||
if rwctx.mdxFilter != nil {
|
||||
rwctx.mdxFilter.MustStop()
|
||||
rwctx.mdxFilter = nil
|
||||
rwctx.mdxRowsPreserved = nil
|
||||
}
|
||||
|
||||
for _, ps := range rwctx.pss {
|
||||
ps.MustStop()
|
||||
@@ -1017,6 +1031,7 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
|
||||
rwctx.rowsPushedAfterRelabel = nil
|
||||
rwctx.rowsDroppedByRelabel = nil
|
||||
|
||||
}
|
||||
|
||||
// TryPushTimeSeries sends tss series to the configured remote write endpoint
|
||||
@@ -1024,16 +1039,41 @@ func (rwctx *remoteWriteCtx) MustStop() {
|
||||
// TryPushTimeSeries doesn't modify tss, so tss can be passed concurrently to TryPush across distinct rwctx instances.
|
||||
func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDropSamplesOnFailure bool) bool {
|
||||
var rctx *relabelCtx
|
||||
var mctx *mdx.Ctx
|
||||
var v *[]prompb.TimeSeries
|
||||
defer func() {
|
||||
if rctx == nil {
|
||||
return
|
||||
if v != nil {
|
||||
*v = prompb.ResetTimeSeries(tss)
|
||||
tssPool.Put(v)
|
||||
}
|
||||
if rctx != nil {
|
||||
putRelabelCtx(rctx)
|
||||
}
|
||||
if mctx != nil {
|
||||
mdx.PutContext(mctx)
|
||||
}
|
||||
*v = prompb.ResetTimeSeries(tss)
|
||||
tssPool.Put(v)
|
||||
putRelabelCtx(rctx)
|
||||
}()
|
||||
|
||||
copyTimeSeriesIfNeeded := func() {
|
||||
if v == nil {
|
||||
v := tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
}
|
||||
}
|
||||
|
||||
if rwctx.mdxFilter != nil {
|
||||
mctx = mdx.GetContext()
|
||||
// Make a copy of tss before applying relabeling in order to prevent
|
||||
// from affecting time series for other remoteWrite.mdx configs.
|
||||
copyTimeSeriesIfNeeded()
|
||||
tss = rwctx.mdxFilter.Filter(mctx, tss)
|
||||
if len(tss) == 0 {
|
||||
return true
|
||||
}
|
||||
rowsCount := getRowsCount(tss)
|
||||
rwctx.mdxRowsPreserved.Add(rowsCount)
|
||||
}
|
||||
|
||||
// Apply relabeling
|
||||
rcs := allRelabelConfigs.Load()
|
||||
pcs := rcs.perURL[rwctx.idx]
|
||||
@@ -1043,8 +1083,7 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
// from affecting time series for other remoteWrite.url configs.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/467
|
||||
// and https://github.com/VictoriaMetrics/VictoriaMetrics/issues/599
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
copyTimeSeriesIfNeeded()
|
||||
rowsCountBeforeRelabel := getRowsCount(tss)
|
||||
tss = rctx.applyRelabeling(tss, pcs)
|
||||
rowsCountAfterRelabel := getRowsCount(tss)
|
||||
@@ -1062,8 +1101,7 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
if rctx == nil {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before dropping aggregated series
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
copyTimeSeriesIfNeeded()
|
||||
}
|
||||
tss = dropAggregatedSeries(tss, matchIdxs.B, rwctx.streamAggrDropInput)
|
||||
} else if rwctx.streamAggrDropInput {
|
||||
@@ -1071,8 +1109,7 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
|
||||
if rctx == nil {
|
||||
rctx = getRelabelCtx()
|
||||
// Make a copy of tss before dropping aggregated series
|
||||
v = tssPool.Get().(*[]prompb.TimeSeries)
|
||||
tss = append(*v, tss...)
|
||||
copyTimeSeriesIfNeeded()
|
||||
}
|
||||
tss = dropUnaggregatedSeries(tss, matchIdxs.B)
|
||||
}
|
||||
@@ -1191,15 +1228,6 @@ func getRowsCount(tss []prompb.TimeSeries) int {
|
||||
}
|
||||
return rowsCount
|
||||
}
|
||||
|
||||
func newMapFromStrings(a []string) map[string]struct{} {
|
||||
m := make(map[string]struct{}, len(a))
|
||||
for _, s := range a {
|
||||
m[s] = struct{}{}
|
||||
}
|
||||
return m
|
||||
}
|
||||
|
||||
func getMaxHourlySeries() int {
|
||||
limit := *maxHourlySeries
|
||||
if limit == -1 || limit > math.MaxInt32 {
|
||||
|
||||
@@ -145,10 +145,10 @@ func TestRuleValidate(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestGroupValidate_Failure(t *testing.T) {
|
||||
f := func(group *Group, validateExpressions bool, errStrExpected string) {
|
||||
f := func(data []byte, validateExpressions bool, errStrExpected string) {
|
||||
t.Helper()
|
||||
|
||||
err := group.Validate(nil, validateExpressions)
|
||||
_, err := parse(map[string][]byte{"test.yaml": data}, nil, validateExpressions)
|
||||
if err == nil {
|
||||
t.Fatalf("expecting non-nil error")
|
||||
}
|
||||
@@ -158,275 +158,238 @@ func TestGroupValidate_Failure(t *testing.T) {
|
||||
}
|
||||
}
|
||||
|
||||
f(&Group{}, false, "group name must be set")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: ""
|
||||
`), false, "group name must be set")
|
||||
|
||||
f(&Group{
|
||||
Name: "both record and alert are not set",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Expr: "sum(up == 0 ) by (host)",
|
||||
For: promutil.NewDuration(10 * time.Millisecond),
|
||||
},
|
||||
{
|
||||
Expr: "sumSeries(time('foo.bar',10))",
|
||||
},
|
||||
},
|
||||
}, false, "invalid rule")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: both record and alert are not set
|
||||
rules:
|
||||
- expr: "sum(up == 0 ) by (host)"
|
||||
for: 10ms
|
||||
- expr: "sumSeries(time('foo.bar',10))"
|
||||
`), false, "invalid rule")
|
||||
|
||||
f(&Group{
|
||||
Name: "negative interval",
|
||||
Interval: promutil.NewDuration(-1),
|
||||
}, false, "interval shouldn't be lower than 0")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: negative interval
|
||||
interval: -1ms
|
||||
`), false, "interval shouldn't be lower than 0")
|
||||
|
||||
f(&Group{
|
||||
Name: "too big eval_offset",
|
||||
Interval: promutil.NewDuration(time.Minute),
|
||||
EvalOffset: promutil.NewDuration(2 * time.Minute),
|
||||
}, false, "eval_offset should be smaller than interval")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: too big eval_offset
|
||||
interval: 1m
|
||||
eval_offset: 2m
|
||||
`), false, "eval_offset should be smaller than interval")
|
||||
|
||||
f(&Group{
|
||||
Name: "too big negative eval_offset",
|
||||
Interval: promutil.NewDuration(time.Minute),
|
||||
EvalOffset: promutil.NewDuration(-2 * time.Minute),
|
||||
}, false, "eval_offset should be smaller than interval")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: too big negative eval_offset
|
||||
interval: 1m
|
||||
eval_offset: -2m
|
||||
`), false, "eval_offset should be smaller than interval")
|
||||
|
||||
limit := -1
|
||||
f(&Group{
|
||||
Name: "wrong limit",
|
||||
Limit: &limit,
|
||||
}, false, "invalid limit")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: wrong limit
|
||||
limit: -1
|
||||
`), false, "invalid limit")
|
||||
|
||||
f(&Group{
|
||||
Name: "wrong concurrency",
|
||||
Concurrency: -1,
|
||||
}, false, "invalid concurrency")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: wrong concurrency
|
||||
concurrency: -1
|
||||
`), false, "invalid concurrency")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Alert: "alert",
|
||||
Expr: "up == 1",
|
||||
},
|
||||
{
|
||||
Alert: "alert",
|
||||
Expr: "up == 1",
|
||||
},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
`), false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
`), false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{Record: "record", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
{Record: "record", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- record: record
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
- record: record
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
`), false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"description": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{Record: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, "duplicate")
|
||||
|
||||
f(&Group{
|
||||
Name: "test thanos",
|
||||
Type: NewRawType("thanos"),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"description": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, true, "unknown datasource type")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test thanos
|
||||
type: thanos
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
description: "{{ value|query }}"
|
||||
`), true, "unknown datasource type")
|
||||
|
||||
// validate expressions
|
||||
f(&Group{
|
||||
Name: "test prometheus expr",
|
||||
Type: NewPrometheusType(),
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "record",
|
||||
Expr: "up | 0",
|
||||
},
|
||||
},
|
||||
}, true, "bad MetricsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test prometheus expr
|
||||
type: prometheus
|
||||
rules:
|
||||
- record: record
|
||||
expr: "up | 0"
|
||||
`), true, "bad MetricsQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test graphite expr",
|
||||
Type: NewGraphiteType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"description": "some-description",
|
||||
}},
|
||||
},
|
||||
}, true, "bad GraphiteQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test graphite expr
|
||||
type: graphite
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
description: some-description
|
||||
`), true, "bad GraphiteQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test vlogs expr",
|
||||
Type: NewVLogsType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "stats count(*) as requests"},
|
||||
},
|
||||
}, true, "bad LogsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test vlogs expr
|
||||
type: vlogs
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: "stats count(*) as requests"
|
||||
`), true, "bad LogsQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test vlogs expr",
|
||||
Type: NewVLogsType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "_time: 1m | stats by (path, _time: 1m) count(*) as requests"},
|
||||
},
|
||||
}, true, "bad LogsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test vlogs expr multipart
|
||||
type: vlogs
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: "_time: 1m | stats by (path, _time: 1m) count(*) as requests"
|
||||
`), true, "bad LogsQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test graphite with prometheus expr",
|
||||
Type: NewGraphiteType(),
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "r1",
|
||||
ID: 1,
|
||||
Expr: "sumSeries(time('foo.bar',10))",
|
||||
For: promutil.NewDuration(10 * time.Millisecond),
|
||||
},
|
||||
{
|
||||
Record: "r2",
|
||||
ID: 2,
|
||||
Expr: "sum(up == 0 ) by (host)",
|
||||
},
|
||||
},
|
||||
}, true, "bad GraphiteQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test graphite with prometheus expr
|
||||
type: graphite
|
||||
rules:
|
||||
- record: r1
|
||||
expr: "sumSeries(time('foo.bar',10))"
|
||||
for: 10ms
|
||||
- record: r2
|
||||
expr: "sum(up == 0 ) by (host)"
|
||||
`), true, "bad GraphiteQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test vlogs with prometheus exp",
|
||||
Type: NewVLogsType(),
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "r1",
|
||||
Expr: "sum(up == 0 ) by (host)",
|
||||
For: promutil.NewDuration(10 * time.Millisecond),
|
||||
},
|
||||
},
|
||||
}, true, "bad LogsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test vlogs with prometheus expr
|
||||
type: vlogs
|
||||
rules:
|
||||
- record: r1
|
||||
expr: "sum(up == 0 ) by (host)"
|
||||
for: 10ms
|
||||
`), true, "bad LogsQL expr")
|
||||
|
||||
f(&Group{
|
||||
Name: "test prometheus with vlogs exp",
|
||||
Type: NewPrometheusType(),
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "r1",
|
||||
Expr: "* | stats by (path) count()",
|
||||
For: promutil.NewDuration(10 * time.Millisecond),
|
||||
},
|
||||
},
|
||||
}, true, "bad MetricsQL expr")
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test prometheus with vlogs expr
|
||||
type: prometheus
|
||||
rules:
|
||||
- record: r1
|
||||
expr: "* | stats by (path) count()"
|
||||
for: 10ms
|
||||
`), true, "bad MetricsQL expr")
|
||||
}
|
||||
|
||||
func TestGroupValidate_Success(t *testing.T) {
|
||||
f := func(group *Group, validateAnnotations, validateExpressions bool) {
|
||||
f := func(data []byte, validateAnnotations, validateExpressions bool) {
|
||||
t.Helper()
|
||||
|
||||
var validateTplFn ValidateTplFn
|
||||
if validateAnnotations {
|
||||
validateTplFn = notifier.ValidateTemplates
|
||||
}
|
||||
err := group.Validate(validateTplFn, validateExpressions)
|
||||
_, err := parse(map[string][]byte{"test.yaml": data}, validateTplFn, validateExpressions)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
}
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Record: "record",
|
||||
Expr: "up | 0",
|
||||
},
|
||||
},
|
||||
}, false, false)
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- record: record
|
||||
expr: "up | 0"
|
||||
`), false, false)
|
||||
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Alert: "alert",
|
||||
Expr: "up == 1",
|
||||
Labels: map[string]string{
|
||||
"summary": "{{ value|query }}",
|
||||
},
|
||||
},
|
||||
},
|
||||
}, false, false)
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "{{ value|query }}"
|
||||
`), false, false)
|
||||
|
||||
// validate annotations
|
||||
f(&Group{
|
||||
Name: "test",
|
||||
Rules: []Rule{
|
||||
{
|
||||
Alert: "alert",
|
||||
Expr: "up == 1",
|
||||
Labels: map[string]string{
|
||||
"summary": `
|
||||
{{ with printf "node_memory_MemTotal{job='node',instance='%s'}" "localhost" | query }}
|
||||
{{ . | first | value | humanize1024 }}B
|
||||
{{ end }}`,
|
||||
},
|
||||
},
|
||||
},
|
||||
}, true, false)
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
summary: "\n{{ with printf \"node_memory_MemTotal{job='node',instance='%s'}\" \"localhost\" | query }}\n {{ . | first | value | humanize1024 }}B\n{{ end }}"
|
||||
`), true, false)
|
||||
|
||||
// validate expressions
|
||||
f(&Group{
|
||||
Name: "test prometheus",
|
||||
Type: NewPrometheusType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: "up == 1", Labels: map[string]string{
|
||||
"description": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, true)
|
||||
f(&Group{
|
||||
Name: "test victorialogs",
|
||||
Type: NewVLogsType(),
|
||||
Rules: []Rule{
|
||||
{Alert: "alert", Expr: " _time: 1m | stats count(*) as requests", Labels: map[string]string{
|
||||
"description": "{{ value|query }}",
|
||||
}},
|
||||
},
|
||||
}, false, true)
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test prometheus
|
||||
type: prometheus
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: up == 1
|
||||
labels:
|
||||
description: "{{ value|query }}"
|
||||
`), false, true)
|
||||
|
||||
f([]byte(`
|
||||
groups:
|
||||
- name: test victorialogs
|
||||
type: vlogs
|
||||
rules:
|
||||
- alert: alert
|
||||
expr: " _time: 1m | stats count(*) as requests"
|
||||
labels:
|
||||
description: "{{ value|query }}"
|
||||
`), false, true)
|
||||
}
|
||||
|
||||
func TestHashRule_NotEqual(t *testing.T) {
|
||||
|
||||
@@ -140,6 +140,18 @@ users:
|
||||
- "ProjectID: {{.MetricsProjectID}}"
|
||||
url_prefix: "http://vminsert:8480/insert/prometheus"
|
||||
|
||||
# JWT-based routing that relies solely on custom claims.
|
||||
# The `vm_access` claim is missing, default value will be used.
|
||||
# e.g. {"role": "admin"}.
|
||||
- name: jwt-custom-claims
|
||||
jwt:
|
||||
skip_verify: true
|
||||
vm_default_access_claim:
|
||||
metrics_account_id: 1
|
||||
match_claims:
|
||||
role: admin
|
||||
url_prefix: "http://vmselect-admin:8481/select/0/prometheus"
|
||||
|
||||
# Requests without Authorization header are proxied according to `unauthorized_user` section.
|
||||
# Requests are proxied in round-robin fashion between `url_prefix` backends.
|
||||
# The deny_partial_response query arg is added to all the proxied requests.
|
||||
|
||||
@@ -65,6 +65,8 @@ type JWTConfig struct {
|
||||
MatchClaims map[string]string `yaml:"match_claims,omitempty"`
|
||||
parsedMatchClaims []*jwt.Claim
|
||||
|
||||
DefaultVMAccessClaim *jwt.VMAccessClaim `yaml:"default_vm_access_claim,omitempty"`
|
||||
|
||||
// verifierPool is used to verify JWT tokens.
|
||||
// It is initialized from PublicKeys and/or PublicKeyFiles.
|
||||
// In this case, it is initialized once at config reload and never updated until next reload
|
||||
@@ -432,7 +434,6 @@ func validateJWTPlaceholdersForURL(up *URLPrefix, isAllowed bool) error {
|
||||
}
|
||||
if strings.Contains(p, placeholderPrefix) {
|
||||
return fmt.Errorf("invalid placeholder found in URL request path: %q, supported values are: %s", bu.Path, strings.Join(allPlaceholders, ", "))
|
||||
|
||||
}
|
||||
}
|
||||
for param, values := range bu.Query() {
|
||||
@@ -487,7 +488,6 @@ func hasAnyPlaceholders(u *url.URL) bool {
|
||||
return true
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
return false
|
||||
}
|
||||
|
||||
@@ -190,6 +190,10 @@ func requestHandler(w http.ResponseWriter, r *http.Request) bool {
|
||||
if tkn == nil {
|
||||
logger.Panicf("BUG: unexpected nil jwt token for user %q", ui.name())
|
||||
}
|
||||
if !tkn.HasVMAccessClaim() && ui.JWT.DefaultVMAccessClaim == nil {
|
||||
http.Error(w, "Unauthorized", http.StatusUnauthorized)
|
||||
return true
|
||||
}
|
||||
defer putToken(tkn)
|
||||
processUserRequest(w, r, ui, tkn)
|
||||
return true
|
||||
@@ -424,8 +428,12 @@ func processRequest(w http.ResponseWriter, r *http.Request, ui *UserInfo, tkn *j
|
||||
}
|
||||
targetURL := bu.url
|
||||
if tkn != nil {
|
||||
vmac := tkn.VMAccess()
|
||||
if !tkn.HasVMAccessClaim() {
|
||||
vmac = ui.JWT.DefaultVMAccessClaim
|
||||
}
|
||||
// for security reasons allow templating only for configured url values and headers
|
||||
targetURL, hc = replaceJWTPlaceholders(bu, hc, tkn.VMAccess())
|
||||
targetURL, hc = replaceJWTPlaceholders(bu, hc, vmac)
|
||||
}
|
||||
if isDefault {
|
||||
// Don't change path and add request_path query param for default route.
|
||||
|
||||
@@ -739,6 +739,12 @@ users:
|
||||
"vm_access": map[string]any{},
|
||||
}, false)
|
||||
|
||||
// token without vm_access claim, but with a custom claim usable for routing
|
||||
roleToken := genToken(t, map[string]any{
|
||||
"exp": time.Now().Add(10 * time.Minute).Unix(),
|
||||
"role": "admin",
|
||||
}, true)
|
||||
|
||||
fullToken := genToken(t, map[string]any{
|
||||
"exp": time.Now().Add(10 * time.Minute).Unix(),
|
||||
"vm_access": map[string]any{
|
||||
@@ -779,6 +785,26 @@ statusCode=401
|
||||
Unauthorized`
|
||||
f(simpleCfgStr, request, responseExpected)
|
||||
|
||||
// token without vm_access claim is accepted when it
|
||||
request = httptest.NewRequest(`GET`, "http://some-host.com/abc", nil)
|
||||
request.Header.Set(`Authorization`, `Bearer `+roleToken)
|
||||
responseExpected = `
|
||||
statusCode=200
|
||||
path: /foo/abc
|
||||
query:
|
||||
headers:`
|
||||
f(fmt.Sprintf(`
|
||||
users:
|
||||
- jwt:
|
||||
public_keys:
|
||||
- %q
|
||||
default_vm_access_claim:
|
||||
metrics_account_id: 10
|
||||
metrics_project_id: 10
|
||||
match_claims:
|
||||
role: admin
|
||||
url_prefix: {BACKEND}/foo`, string(publicKeyPEM)), request, responseExpected)
|
||||
|
||||
// expired token
|
||||
request = httptest.NewRequest(`GET`, "http://some-host.com/abc", nil)
|
||||
request.Header.Set(`Authorization`, `Bearer `+expiredToken)
|
||||
|
||||
@@ -956,6 +956,7 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
|
||||
start, end, step int64, r *http.Request, ct int64, etfs [][]storage.TagFilter) error {
|
||||
deadline := searchutil.GetDeadlineForQuery(r, startTime)
|
||||
mayCache := !httputil.GetBool(r, "nocache")
|
||||
optimizeRepeatedBinaryOpSubexprs := httputil.GetBool(r, "optimize_repeated_binary_op_subexprs")
|
||||
lookbackDelta, err := getMaxLookback(r)
|
||||
if err != nil {
|
||||
return err
|
||||
@@ -977,18 +978,19 @@ func queryRangeHandler(qt *querytracer.Tracer, startTime time.Time, w http.Respo
|
||||
}
|
||||
|
||||
ec := &promql.EvalConfig{
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: 0, // let vmstorage use maxUniqueTimeseries by default
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
MayCache: mayCache,
|
||||
LookbackDelta: lookbackDelta,
|
||||
RoundDigits: getRoundDigits(r),
|
||||
EnforcedTagFilterss: etfs,
|
||||
CacheTagFilters: etfs,
|
||||
Start: start,
|
||||
End: end,
|
||||
Step: step,
|
||||
MaxPointsPerSeries: *maxPointsPerTimeseries,
|
||||
MaxSeries: 0, // let vmstorage use maxUniqueTimeseries by default
|
||||
QuotedRemoteAddr: httpserver.GetQuotedRemoteAddr(r),
|
||||
Deadline: deadline,
|
||||
MayCache: mayCache,
|
||||
OptimizeRepeatedBinaryOpSubexprs: optimizeRepeatedBinaryOpSubexprs,
|
||||
LookbackDelta: lookbackDelta,
|
||||
RoundDigits: getRoundDigits(r),
|
||||
EnforcedTagFilterss: etfs,
|
||||
CacheTagFilters: etfs,
|
||||
GetRequestURI: func() string {
|
||||
return httpserver.GetRequestURI(r)
|
||||
},
|
||||
|
||||
@@ -132,6 +132,9 @@ type EvalConfig struct {
|
||||
// Whether the response can be cached.
|
||||
MayCache bool
|
||||
|
||||
// Whether repeated cacheable binary op subexpressions can be optimized.
|
||||
OptimizeRepeatedBinaryOpSubexprs bool
|
||||
|
||||
// LookbackDelta is analog to `-query.lookback-delta` from Prometheus.
|
||||
LookbackDelta int64
|
||||
|
||||
@@ -171,6 +174,7 @@ func copyEvalConfig(src *EvalConfig) *EvalConfig {
|
||||
ec.MaxPointsPerSeries = src.MaxPointsPerSeries
|
||||
ec.Deadline = src.Deadline
|
||||
ec.MayCache = src.MayCache
|
||||
ec.OptimizeRepeatedBinaryOpSubexprs = src.OptimizeRepeatedBinaryOpSubexprs
|
||||
ec.LookbackDelta = src.LookbackDelta
|
||||
ec.RoundDigits = src.RoundDigits
|
||||
ec.EnforcedTagFilterss = src.EnforcedTagFilterss
|
||||
@@ -467,7 +471,8 @@ func isAggrFuncWithoutGrouping(e metricsql.Expr) bool {
|
||||
}
|
||||
|
||||
func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSecond metricsql.Expr, be *metricsql.BinaryOpExpr) ([]*timeseries, []*timeseries, error) {
|
||||
if !canPushdownCommonFilters(be) {
|
||||
canPushdown := canPushdownCommonFilters(be)
|
||||
if !canPushdown && !shouldOptimizeRepeatedBinaryOpSubexprs(ec, exprFirst, exprSecond) {
|
||||
// Execute exprFirst and exprSecond in parallel, since it is impossible to pushdown common filters
|
||||
// from exprFirst to exprSecond.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/2886
|
||||
@@ -500,6 +505,25 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
|
||||
}
|
||||
return tssFirst, tssSecond, nil
|
||||
}
|
||||
if !canPushdown {
|
||||
qt = qt.NewChild("execute left and right sides of %q sequentially because repeated cacheable subexpression was found", be.Op)
|
||||
defer qt.Done()
|
||||
|
||||
qtFirst := qt.NewChild("expr1")
|
||||
tssFirst, err := evalExpr(qtFirst, ec, exprFirst)
|
||||
qtFirst.Done()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
|
||||
qtSecond := qt.NewChild("expr2")
|
||||
tssSecond, err := evalExpr(qtSecond, ec, exprSecond)
|
||||
qtSecond.Done()
|
||||
if err != nil {
|
||||
return nil, nil, err
|
||||
}
|
||||
return tssFirst, tssSecond, nil
|
||||
}
|
||||
|
||||
// Execute binary operation in the following way:
|
||||
//
|
||||
@@ -544,6 +568,78 @@ func execBinaryOpArgs(qt *querytracer.Tracer, ec *EvalConfig, exprFirst, exprSec
|
||||
return tssFirst, tssSecond, nil
|
||||
}
|
||||
|
||||
func shouldOptimizeRepeatedBinaryOpSubexprs(ec *EvalConfig, exprFirst, exprSecond metricsql.Expr) bool {
|
||||
if !ec.OptimizeRepeatedBinaryOpSubexprs {
|
||||
return false
|
||||
}
|
||||
if ec.Start == ec.End {
|
||||
return false
|
||||
}
|
||||
if !ec.mayCache() {
|
||||
return false
|
||||
}
|
||||
|
||||
candidatesFirst := make(map[string]struct{}, 1)
|
||||
var b []byte
|
||||
visitOptimizedAggrs(exprFirst, func(ae *metricsql.AggrFuncExpr) {
|
||||
if hasUnseededVolatileFunc(ae) {
|
||||
return
|
||||
}
|
||||
b = ae.AppendString(b[:0])
|
||||
candidatesFirst[string(b)] = struct{}{}
|
||||
})
|
||||
if len(candidatesFirst) == 0 {
|
||||
return false
|
||||
}
|
||||
|
||||
repeated := false
|
||||
visitOptimizedAggrs(exprSecond, func(ae *metricsql.AggrFuncExpr) {
|
||||
if repeated {
|
||||
return
|
||||
}
|
||||
b = ae.AppendString(b[:0])
|
||||
_, repeated = candidatesFirst[string(b)]
|
||||
})
|
||||
return repeated
|
||||
}
|
||||
|
||||
func visitOptimizedAggrs(e metricsql.Expr, f func(ae *metricsql.AggrFuncExpr)) {
|
||||
metricsql.VisitAll(e, func(expr metricsql.Expr) {
|
||||
ae, ok := expr.(*metricsql.AggrFuncExpr)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
if getIncrementalAggrFuncCallbacks(ae.Name) == nil {
|
||||
return
|
||||
}
|
||||
fe, _ := tryGetArgRollupFuncWithMetricExpr(ae)
|
||||
if fe == nil {
|
||||
return
|
||||
}
|
||||
f(ae)
|
||||
})
|
||||
}
|
||||
|
||||
func hasUnseededVolatileFunc(e metricsql.Expr) bool {
|
||||
found := false
|
||||
metricsql.VisitAll(e, func(expr metricsql.Expr) {
|
||||
if found {
|
||||
return
|
||||
}
|
||||
fe, ok := expr.(*metricsql.FuncExpr)
|
||||
if !ok {
|
||||
return
|
||||
}
|
||||
switch strings.ToLower(fe.Name) {
|
||||
case "now":
|
||||
found = true
|
||||
case "rand", "rand_normal", "rand_exponential":
|
||||
found = len(fe.Args) == 0
|
||||
}
|
||||
})
|
||||
return found
|
||||
}
|
||||
|
||||
func getCommonLabelFilters(tss []*timeseries) []metricsql.LabelFilter {
|
||||
if len(tss) == 0 {
|
||||
return nil
|
||||
|
||||
@@ -170,3 +170,87 @@ func TestGetSumInstantValues(t *testing.T) {
|
||||
[]*timeseries{ts("foo", 100, 1)},
|
||||
)
|
||||
}
|
||||
|
||||
func TestShouldOptimizeRepeatedBinaryOpSubexprsGate(t *testing.T) {
|
||||
e, err := metricsql.Parse(`count(count(vm_requests_total) by (action,addr,cluster,endpoint)) by (action,addr,cluster) / count(count(vm_requests_total) by (action,addr,cluster,endpoint))`)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error in metricsql.Parse(): %s", err)
|
||||
}
|
||||
be, ok := e.(*metricsql.BinaryOpExpr)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected expr type; got %T; want *metricsql.BinaryOpExpr", e)
|
||||
}
|
||||
|
||||
f := func(name string, ec *EvalConfig, resultExpected bool) {
|
||||
t.Helper()
|
||||
result := shouldOptimizeRepeatedBinaryOpSubexprs(ec, be.Left, be.Right)
|
||||
if result != resultExpected {
|
||||
t.Fatalf("unexpected result for %q; got %v; want %v", name, result, resultExpected)
|
||||
}
|
||||
}
|
||||
|
||||
f("disabled optimization", &EvalConfig{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Step: 1000,
|
||||
}, false)
|
||||
f("disabled cache", &EvalConfig{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Step: 1000,
|
||||
OptimizeRepeatedBinaryOpSubexprs: true,
|
||||
}, false)
|
||||
f("instant query", &EvalConfig{
|
||||
Start: 1000,
|
||||
End: 1000,
|
||||
Step: 1000,
|
||||
MayCache: true,
|
||||
OptimizeRepeatedBinaryOpSubexprs: true,
|
||||
}, false)
|
||||
f("repeated cacheable aggregate subexpression", &EvalConfig{
|
||||
Start: 1000,
|
||||
End: 2000,
|
||||
Step: 1000,
|
||||
MayCache: true,
|
||||
OptimizeRepeatedBinaryOpSubexprs: true,
|
||||
}, true)
|
||||
f("unaligned range query", &EvalConfig{
|
||||
Start: 1001,
|
||||
End: 2000,
|
||||
Step: 1000,
|
||||
MayCache: true,
|
||||
OptimizeRepeatedBinaryOpSubexprs: true,
|
||||
}, false)
|
||||
}
|
||||
|
||||
func TestShouldOptimizeRepeatedBinaryOpSubexprsExpressions(t *testing.T) {
|
||||
f := func(name, q string, resultExpected bool) {
|
||||
t.Helper()
|
||||
e, err := metricsql.Parse(q)
|
||||
if err != nil {
|
||||
t.Fatalf("unexpected error in metricsql.Parse(%q) for %q: %s", q, name, err)
|
||||
}
|
||||
be, ok := e.(*metricsql.BinaryOpExpr)
|
||||
if !ok {
|
||||
t.Fatalf("unexpected expr type for %q; got %T; want *metricsql.BinaryOpExpr", name, e)
|
||||
}
|
||||
ec := &EvalConfig{Start: 1000, End: 2000, Step: 1000, MayCache: true, OptimizeRepeatedBinaryOpSubexprs: true}
|
||||
result := shouldOptimizeRepeatedBinaryOpSubexprs(ec, be.Left, be.Right)
|
||||
if result != resultExpected {
|
||||
t.Fatalf("unexpected result for %q; got %v; want %v; query: %q", name, result, resultExpected, q)
|
||||
}
|
||||
}
|
||||
|
||||
f("original issue query", `count(count(vm_requests_total) by (action,addr,cluster,endpoint)) by (action,addr,cluster) / count(count(vm_requests_total) by (action,addr,cluster,endpoint))`, true)
|
||||
f("right side contains repeated count aggregate", `count(foo) by (job) / (count(foo) by (job) + 1)`, true)
|
||||
f("same sum aggregate", `sum(rate(foo[5m])) by (job) / sum(rate(foo[5m])) by (job)`, true)
|
||||
f("same inner rollup but different aggregates", `sum(rate(foo[5m])) by (job) / count(rate(foo[5m])) by (job)`, false)
|
||||
f("different count aggregates", `count(foo) by (job) / count(bar) by (job)`, false)
|
||||
f("bare metric selector", `foo / foo`, false)
|
||||
f("bare rollup function", `rate(a[5m]) / rate(a[5m])`, false)
|
||||
f("now at modifier", `sum(rate(foo[5m] @ now())) by (job) / sum(rate(foo[5m] @ now())) by (job)`, false)
|
||||
f("unseeded rand at modifier", `sum(rate(foo[5m] @ rand())) by (job) / sum(rate(foo[5m] @ rand())) by (job)`, false)
|
||||
f("unseeded rand_normal at modifier", `sum(rate(foo[5m] @ rand_normal())) by (job) / sum(rate(foo[5m] @ rand_normal())) by (job)`, false)
|
||||
f("unseeded rand_exponential at modifier", `sum(rate(foo[5m] @ rand_exponential())) by (job) / sum(rate(foo[5m] @ rand_exponential())) by (job)`, false)
|
||||
f("seeded rand at modifier", `sum(rate(foo[5m] @ rand(1))) by (job) / sum(rate(foo[5m] @ rand(1))) by (job)`, true)
|
||||
}
|
||||
|
||||
@@ -1121,29 +1121,29 @@ func groupLeTimeseries(tss []*timeseries) map[string][]leTimeseries {
|
||||
|
||||
func fixBrokenBuckets(i int, xss []leTimeseries) {
|
||||
// Buckets are already sorted by le, so their values must be in ascending order,
|
||||
// since the upper bucket includes all the lower buckets.
|
||||
// If the upper bucket has lower value than the current bucket,
|
||||
// then the upper bucket must be substituted with the current bucket value.
|
||||
// since the next bucket includes all the previous buckets.
|
||||
// If the next bucket has lower value than the current bucket,
|
||||
// then the next bucket must be substituted with the current bucket value.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4580#issuecomment-2186659102
|
||||
if len(xss) < 2 {
|
||||
return
|
||||
}
|
||||
|
||||
vPrev := xss[0].ts.Values[i]
|
||||
vNext := xss[0].ts.Values[i]
|
||||
// Set the lowest bucket to 0 if its value is NaN, so it can be properly
|
||||
// compared with upper buckets in the loop below.
|
||||
if math.IsNaN(vPrev) {
|
||||
vPrev = 0
|
||||
xss[0].ts.Values[i] = vPrev
|
||||
if math.IsNaN(vNext) {
|
||||
vNext = 0
|
||||
xss[0].ts.Values[i] = vNext
|
||||
}
|
||||
// Substitute upper bucket values with lower bucket values if the upper values are NaN
|
||||
// or are smaller than the lower bucket values.
|
||||
// or are bigger than the lower bucket values.
|
||||
for j := 1; j < len(xss); j++ {
|
||||
v := xss[j].ts.Values[i]
|
||||
if math.IsNaN(v) || vPrev > v {
|
||||
xss[j].ts.Values[i] = vPrev
|
||||
if math.IsNaN(v) || vNext > v {
|
||||
xss[j].ts.Values[i] = vNext
|
||||
} else {
|
||||
vPrev = v
|
||||
vNext = v
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -6,7 +6,7 @@ COPY web/ /build/
|
||||
RUN GOOS=linux GOARCH=amd64 CGO_ENABLED=0 go build -o web-amd64 github.com/VictoriMetrics/vmui/ && \
|
||||
GOOS=windows GOARCH=amd64 CGO_ENABLED=0 go build -o web-windows github.com/VictoriMetrics/vmui/
|
||||
|
||||
FROM alpine:3.23.4
|
||||
FROM alpine:3.24.1
|
||||
USER root
|
||||
|
||||
COPY --from=build-web-stage /build/web-amd64 /app/web
|
||||
|
||||
@@ -896,7 +896,7 @@
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(min_over_time(up{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by (job)",
|
||||
"expr": "sum(min_over_time(vm_app_version{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by (job)",
|
||||
"format": "time_series",
|
||||
"instant": false,
|
||||
"legendFormat": "{{job}}",
|
||||
|
||||
@@ -897,7 +897,7 @@
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(min_over_time(up{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by (job)",
|
||||
"expr": "sum(min_over_time(vm_app_version{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by (job)",
|
||||
"format": "time_series",
|
||||
"instant": false,
|
||||
"legendFormat": "{{job}}",
|
||||
|
||||
@@ -892,7 +892,7 @@
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(min_over_time(up{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by (job)",
|
||||
"expr": "sum(up{job=~\"$job\", instance=~\"$instance\"}) by (job)",
|
||||
"format": "time_series",
|
||||
"instant": false,
|
||||
"interval": "",
|
||||
|
||||
@@ -891,7 +891,7 @@
|
||||
"uid": "$ds"
|
||||
},
|
||||
"editorMode": "code",
|
||||
"expr": "sum(min_over_time(up{job=~\"$job\", instance=~\"$instance\"}[$__rate_interval])) by (job)",
|
||||
"expr": "sum(up{job=~\"$job\", instance=~\"$instance\"}) by (job)",
|
||||
"format": "time_series",
|
||||
"instant": false,
|
||||
"interval": "",
|
||||
|
||||
@@ -3,9 +3,9 @@
|
||||
DOCKER_REGISTRIES ?= docker.io quay.io
|
||||
DOCKER_NAMESPACE ?= victoriametrics
|
||||
|
||||
ROOT_IMAGE ?= alpine:3.23.4
|
||||
ROOT_IMAGE ?= alpine:3.24.1
|
||||
ROOT_IMAGE_SCRATCH ?= scratch
|
||||
CERTS_IMAGE := alpine:3.23.4
|
||||
CERTS_IMAGE := alpine:3.24.1
|
||||
|
||||
GO_BUILDER_IMAGE := golang:1.26.4
|
||||
|
||||
|
||||
@@ -245,7 +245,7 @@ The following steps must be performed during the upgrade / downgrade procedure:
|
||||
* Wait until the process stops. This can take a few seconds.
|
||||
* Start the upgraded VictoriaMetrics.
|
||||
|
||||
Prometheus doesn't drop data during VictoriaMetrics restart. See [this article](https://grafana.com/blog/2019/03/25/whats-new-in-prometheus-2.8-wal-based-remote-write/) for details. The same applies also to [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/).
|
||||
Prometheus doesn't drop data during VictoriaMetrics restart. See [this article](https://grafana.com/blog/whats-new-in-prometheus-2-8-wal-based-remote-write/) for details. The same applies also to [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/).
|
||||
|
||||
> If you'd prefer not to manage upgrades yourself, [VictoriaMetrics Cloud](https://console.victoriametrics.cloud/signUp?utm_source=website&utm_campaign=docs_vm_single_upgrade)
|
||||
> performs version upgrades automatically during maintenance windows with no action required on your part.
|
||||
@@ -417,7 +417,7 @@ VictoriaMetrics is configured via command-line flags, so it must be restarted wh
|
||||
* Wait until the process stops. This can take a few seconds.
|
||||
* Start VictoriaMetrics with the new command-line flags.
|
||||
|
||||
Prometheus doesn't drop data during VictoriaMetrics restart. See [this article](https://grafana.com/blog/2019/03/25/whats-new-in-prometheus-2.8-wal-based-remote-write/) for details. The same applies also to [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/).
|
||||
Prometheus doesn't drop data during VictoriaMetrics restart. See [this article](https://grafana.com/blog/whats-new-in-prometheus-2-8-wal-based-remote-write/) for details. The same applies also to [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/).
|
||||
|
||||
## How to scrape Prometheus exporters such as [node-exporter](https://github.com/prometheus/node_exporter)
|
||||
|
||||
@@ -478,6 +478,11 @@ and [/api/v1/query_range](https://docs.victoriametrics.com/victoriametrics/keyco
|
||||
to the given number of digits after the decimal point.
|
||||
For example, `/api/v1/query?query=avg_over_time(temperature[1h])&round_digits=2` would round response values to up to two digits after the decimal point.
|
||||
|
||||
VictoriaMetrics accepts `optimize_repeated_binary_op_subexprs=1` query arg for [/api/v1/query_range](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#range-query)
|
||||
handler. It allows `vmselect` to execute left and right sides of binary operators sequentially when they contain the same
|
||||
optimized aggregate rollup result expression, so the second side may reuse the rollup result cache populated by the first side.
|
||||
The optimization is disabled by default and applies only when rollup result cache can be used for the request.
|
||||
|
||||
VictoriaMetrics accepts `limit` query arg for [/api/v1/labels](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labels)
|
||||
and [`/api/v1/label/<labelName>/values`](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1labelvalues) handlers for limiting the number of returned entries.
|
||||
For example, the query to `/api/v1/labels?limit=5` returns a sample of up to 5 unique labels, while ignoring the rest of labels.
|
||||
|
||||
@@ -26,16 +26,22 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
|
||||
## tip
|
||||
|
||||
* SECURITY: upgrade base docker image (Alpine) from 3.23.4 to 3.24.1. See [Alpine 3.24.1 release notes](https://www.alpinelinux.org/posts/Alpine-3.24.1-released.html).
|
||||
|
||||
* BUGFIX: all VictoriaMetrics components: cancel in-flight HTTP requests shortly before `-http.maxGracefulShutdownDuration` elapses during graceful shutdown, so they can drain and the shutdown completes cleanly within that window instead of timing out and exiting via `logger.Fatalf` -> `os.Exit`. This prevents skipping the storage flush and losing in-memory data when long-lived requests are in flight (such as VictoriaLogs live tailing). See [#1502](https://github.com/VictoriaMetrics/VictoriaLogs/issues/1502).
|
||||
* BUGFIX: `vminsert` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): fixes unexpected rare rerouting. See [#11162](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11162).
|
||||
* BUGFIX: `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): propagate cache reset operation to `selectNode` when `/internal/resetRollupResultCache` is called. Previously, the propagation only happened when the `delete_series` API was called. See [#11112](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11112).
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix possible unexpected increases in `rate_avg` and `rate_sum` if an out-of-order sample is ingested after the previous flush. See [#11140](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11140).
|
||||
|
||||
* FEATURE: [vmauth](https://docs.victoriametrics.com/victoriametrics/vmauth/): add `default_vm_access_claim` field into `jwt` section of auth config. It could be used at [JWT claim placeholders](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-based-request-templating), if `JWT` token doesn't have `vm_access` claim. See [#11054](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11054).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): reduces CPU usage by 10% at [sharding among remote storages](https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages). See [#11113](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11113). Thanks to @bennf for contribution.
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): add `optimize_repeated_binary_op_subexprs=1` query arg to [/api/v1/query_range](https://docs.victoriametrics.com/victoriametrics/keyconcepts/#range-query) for executing binary operator sides sequentially when they share the same optimized aggregate rollup result expression. This allows the second side to reuse rollup result cache populated by the first side. See [#10575](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10575).
|
||||
|
||||
## [v1.146.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.146.0)
|
||||
|
||||
Released at 2026-06-22
|
||||
|
||||
* FEATURE: all VictoriaMetrics components: add `-http.header.disableServerHostname` command-line flag for disabling the `X-Server-Hostname` HTTP response header. See [#11067](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11067). Thanks to @zasdaym for contribution.
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): expose `vm_streamaggr_dedup_dropped_samples_total` to allow tracking dropped old samples during [deduplication](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#deduplication).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): use the aggregation rule interval as the default [staleness_interval](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#staleness) instead of `2*interval`, to reduce spikes when there are gaps between received samples. See [#11102](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11102).
|
||||
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): add new aggregation output [sum_samples_total](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/configuration/#sum_samples_total) for summing input delta values into a cumulative counter. See issues [#11002](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11002) and [#4843](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/4843).
|
||||
@@ -44,6 +50,7 @@ Released at 2026-06-22
|
||||
* FEATURE: [vmctl](https://docs.victoriametrics.com/victoriametrics/vmctl/): add `-vm-headers` and `-vm-bearer-token` flags for authenticating requests to the VictoriaMetrics import destination. The flags are available in `opentsdb`, `influx`, `remote-read`, `prometheus`, `mimir`, and `thanos` vmctl sub-commands. See [#8897](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/8897).
|
||||
* FEATURE: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): log calls to [/api/v1/admin/tsdb/delete_series](https://docs.victoriametrics.com/victoriametrics/url-examples/#apiv1admintsdbdelete_series) API handler. This should help to identify events of metrics deletion from the database. See [#11104](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11104).
|
||||
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): add the `last` value to graph legend statistics. See [#10759](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10759).
|
||||
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): add support for [Monitoring Data eXchange (MDX)](https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange): the ability to route only metrics from VictoriaMetrics services to a specific `-remoteWrite.url`. MDX is useful for building monitoring-of-monitoring where one remote storage should receive the full metric stream and another should receive only VictoriaMetrics metrics. Enable per destination with `-remoteWrite.mdx.enable=true`. See [#10600](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10600).
|
||||
|
||||
* BUGFIX: [enterprise](https://docs.victoriametrics.com/enterprise/) [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly expose metric `vm_retention_filters_partitions_scheduled_rows`. See [#11138](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11138)
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): fix issue with producing aggregated samples with identical timestamps between flushes. See [#10808](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10808).
|
||||
@@ -58,6 +65,7 @@ Released at 2026-06-22
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmselect` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): properly escape `metricFamilyName` at metrics metadata response. See [#11129](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/11129). Thanks for @fxrlv for the contribution.
|
||||
* BUGFIX: [vmsingle](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/) and `vmstorage` in [VictoriaMetrics cluster](https://docs.victoriametrics.com/victoriametrics/cluster-victoriametrics/): prevent more cases of panic during directory deletion on `NFS`-based mounts. See [#11060](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/11060).
|
||||
|
||||
|
||||
## [v1.145.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.145.0)
|
||||
|
||||
Released at 2026-06-08
|
||||
|
||||
@@ -268,6 +268,39 @@ for the collected samples. Examples:
|
||||
```sh
|
||||
./vmagent -remoteWrite.url=http://remote-storage/api/v1/write -streamAggr.dropInputLabels=replica -streamAggr.dedupInterval=60s
|
||||
```
|
||||
|
||||
### Monitoring Data eXchange
|
||||
|
||||
The Monitoring Data eXchange (MDX){{% available_from "#" %}} feature allows `vmagent` to forward only VictoriaMetrics metrics to selected `-remoteWrite.url` destinations while dropping metrics from non-VictoriaMetrics services.
|
||||
|
||||
To enable MDX, set `-remoteWrite.mdx.enable=true` for the target URL and `-remoteWrite.mdx.enable=false` for other URLs:
|
||||
|
||||
```sh
|
||||
./vmagent \
|
||||
-remoteWrite.url=http://service-to-keep-all-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=false \
|
||||
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=true
|
||||
```
|
||||
When MDX is enabled for a `-remoteWrite.url`, `vmagent` forwards only metrics that:
|
||||
- come from the target that exposes the `vm_app_version` metric (emitted by all VictoriaMetrics components)
|
||||
- contain the `victoriametrics_app=true` label, which will be added automatically to the metrics if the instance was deployed via [VictoriaMetrics Operator](https://docs.victoriametrics.com/operator/).
|
||||
|
||||
`victoriametrics_app=true` label will be added to all metrics that are preserved by MDX if it's absent.
|
||||
|
||||
- contain the label specified via `-mdx.label`.
|
||||
|
||||
```sh
|
||||
./vmagent \
|
||||
-remoteWrite.url=http://service-to-keep-only-vm-metrics:8428/api/v1/write \
|
||||
-remoteWrite.mdx.enable=true \
|
||||
-mdx.label="service=victoriametrics"
|
||||
```
|
||||
In this configuration, metrics with the label `service=victoriametrics` are preserved even if their scrape targets do not expose `vm_app_version` metric.
|
||||
|
||||
The number of VictoriaMetrics metrics preserved by MDX is exposed as `vmagent_remotewrite_mdx_rows_preserved_total`.
|
||||
|
||||
The scope of MDX is at the per-url level, so it works after global level mechanisms, such as stream aggregation, relabeling, complexity limiter, and cardinality limiter. See [Life of a sample](https://docs.victoriametrics.com/victoriametrics/vmagent/#life-of-a-sample).
|
||||
|
||||
### Life of a sample
|
||||
|
||||
@@ -285,18 +318,20 @@ flowchart TB
|
||||
F --> G[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#replication-and-high-availability">replicate</a> to each <b>-remoteWrite.url</b><br/>or <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#sharding-among-remote-storages">shard</a> if <b>-remoteWrite.shardByURL</b> is set]
|
||||
|
||||
%% Left branch
|
||||
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H2 --> H3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H3 --> H4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H4 --> H5[[push to <b>-remoteWrite.url</b>]]
|
||||
G --> H1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange/">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
|
||||
H1 --> H2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
H2 --> H3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
H3 --> H4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
H4 --> H5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
H5 --> H6[[push to <b>-remoteWrite.url</b>]]
|
||||
|
||||
%% Right branch
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R2 --> R3["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R3 --> R4[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R4 --> R5[[push to <b>-remoteWrite.url</b>]]
|
||||
G --> R1[per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#monitoring-data-exchange">mdx filter</a><br><b>-remoteWrite.mdx.enable</b>]
|
||||
R1 --> R2[per-url <a href="https://docs.victoriametrics.com/victoriametrics/relabeling/">relabeling</a><br><b>-remoteWrite.urlRelabelConfig</b>]
|
||||
R2 --> R3[per-url <a href="https://docs.victoriametrics.com/victoriametrics/stream-aggregation">aggregation</a><br><b>-remoteWrite.streamAggr.config</b><br><b>-remoteWrite.streamAggr.dedupInterval</b>]
|
||||
R3 --> R4["per-url <a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#calculating-disk-space-for-persistence-queue">queue</a> (default: enabled)<br><b>-remoteWrite.disableOnDiskQueue</b>"]
|
||||
R4 --> R5[<a href="https://docs.victoriametrics.com/victoriametrics/vmagent/#adding-labels-to-metrics">add extra labels</a><br><b>-remoteWrite.label</b>]
|
||||
R5 --> R6[[push to <b>-remoteWrite.url</b>]]
|
||||
```
|
||||
|
||||
Scraping has additional settings that can be applied before samples are pushed to the processing pipeline above:
|
||||
|
||||
@@ -270,7 +270,7 @@ users:
|
||||
url_prefix: "http://victoria-metrics:8428/"
|
||||
```
|
||||
|
||||
JWT tokens must contain a `"vm_access": {}` claim, more on that in [JWT claim-based request templating](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-based-request-templating)
|
||||
The `vm_access` claim is optional starting from {{% available_from "#" %}}: when present it is used for [request templating](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-based-request-templating), and when absent the default tenant `0:0` is assumed for any `vm_access`-based placeholders. Routing can rely solely on other token claims via [JWT claim matching](https://docs.victoriametrics.com/victoriametrics/vmauth/#jwt-claim-matching).
|
||||
|
||||
For testing, skip signature verification with `skip_verify: true` (not recommended for production).
|
||||
|
||||
@@ -520,7 +520,8 @@ for dynamic URL rewriting based on `vm_access` claim fields.
|
||||
|
||||
`vmauth` can dynamically rewrite{{% available_from "v1.137.0" %}} upstream URLs and request headers using values from the JWT `vm_access` claim.
|
||||
This enables routing different users to different backends or tenants based solely on the JWT token,
|
||||
without maintaining separate user configs per tenant.
|
||||
without maintaining separate user configs per tenant. In addition `vm_access` claim could be defined at `jwt` section with `default_vm_access_claim` {{% available_from "#" %}}.
|
||||
In this case, if JWT token doesn't have `vm_access` claim defined, value from `default_vm_access_claim` will be used for templaing.
|
||||
|
||||
Example: minimal valid JWT. If vm_access is empty, tenant `0:0` is assumed and no additional filters are applied.
|
||||
```json
|
||||
@@ -575,6 +576,28 @@ Placeholders are supported in the following locations:
|
||||
Placeholders are **not** supported in response headers.
|
||||
They are also only valid for JWT-authenticated users — using them in configs for `username`/`password` or `bearer_token` users causes a configuration error.
|
||||
|
||||
Example: default `vm_access` claim:
|
||||
|
||||
```yaml
|
||||
users:
|
||||
- jwt:
|
||||
default_vm_access_claim:
|
||||
metrics_account_id: 10
|
||||
metrics_project_id: 10
|
||||
metrics_extra_filters:
|
||||
- '{instance="sandbox"}'
|
||||
metrics_extra_labels:
|
||||
- team=dev
|
||||
- env=dev
|
||||
public_keys:
|
||||
- |
|
||||
-----BEGIN PUBLIC KEY-----
|
||||
MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA...
|
||||
-----END PUBLIC KEY-----
|
||||
url_prefix: "http://vminsert:8480/insert/{{.MetricsAccountID}}:{{.MetricsProjectID}}/prometheus/?extra_filters={{.MetricsExtraFilters}}&extra_label={{.MetricsExtraLabels}}"
|
||||
```
|
||||
|
||||
|
||||
Example: route requests to the VictoriaMetrics single-node:
|
||||
|
||||
```yaml
|
||||
|
||||
@@ -16,7 +16,7 @@ aliases:
|
||||
|
||||
`vmestimator` measures metrics cardinality across arbitrary label dimensions and exposes the results as metrics.
|
||||
|
||||
## Why measure ?
|
||||
## Why measure?
|
||||
|
||||
Consider a setup where metrics are scraped from dozens of Prometheus targets.
|
||||
One day, a team deploys a new version of their service with a `trace_id` or `user_id` label.
|
||||
@@ -75,7 +75,7 @@ The resulting topology looks like this:
|
||||
|
||||
## Install
|
||||
|
||||
Create a `streams.yaml` from [example config](https://github.com/VictoriaMetrics/cestimator/blob/main/streams.yaml).
|
||||
Create a `streams.yaml` from [example config](https://github.com/VictoriaMetrics/vmestimator/blob/main/streams.yaml).
|
||||
Run the Docker image from [Docker Hub](https://hub.docker.com/r/victoriametrics/vmestimator) or [Quay](https://quay.io/repository/victoriametrics/vmestimator), mounting your config file:
|
||||
```bash
|
||||
docker run --rm \
|
||||
@@ -92,7 +92,7 @@ To build from sources, see [How to build from sources](https://github.com/Victor
|
||||
|
||||
## Configuration
|
||||
|
||||
To run vmestimator a `streams.yaml` config has to be provided (see [example config](https://github.com/VictoriaMetrics/cestimator/blob/main/streams.yaml):
|
||||
To run vmestimator a `streams.yaml` config has to be provided (see [example config](https://github.com/VictoriaMetrics/vmestimator/blob/main/streams.yaml)):
|
||||
|
||||
```bash
|
||||
/path/to/vmestimator -config=streams.yaml # -httpListenAddr=:8490
|
||||
@@ -153,7 +153,7 @@ streams:
|
||||
# Whether to use the sparse HyperLogLog representation for low-cardinality groups.
|
||||
# Sparse mode uses far less memory until a group's cardinality reaches ~2^(p-1),
|
||||
# at which point it automatically promotes to the dense representation.
|
||||
# See more in # See more in https://research.google.com/pubs/archive/40671.pdf
|
||||
# See more in https://research.google.com/pubs/archive/40671.pdf
|
||||
#
|
||||
# default: true
|
||||
hll_sparse: 'boolean'
|
||||
@@ -419,17 +419,17 @@ The base docker image is [alpine](https://hub.docker.com/_/alpine) but it is pos
|
||||
For example, the following command builds the image on top of [scratch](https://hub.docker.com/_/scratch) image:
|
||||
|
||||
```sh
|
||||
ROOT_IMAGE=scratch make package-vmrestore
|
||||
ROOT_IMAGE=scratch make package-vmestimator
|
||||
```
|
||||
|
||||
You can build and publish to your own registry and namespace:
|
||||
```
|
||||
DOCKER_REGISTRIES=ghcr.io DOCKER_NAMESPACE=foo make publish-vmagent
|
||||
DOCKER_REGISTRIES=ghcr.io DOCKER_NAMESPACE=foo make publish-vmestimator
|
||||
```
|
||||
|
||||
## Command-line flags
|
||||
|
||||
Run `vmestimate -help` in order to see all the available options:
|
||||
Run `vmestimator -help` in order to see all the available options:
|
||||
|
||||
```
|
||||
Usage of ./bin/vmestimator:
|
||||
@@ -438,7 +438,7 @@ Usage of ./bin/vmestimator:
|
||||
-cardinalityMetrics.exposeAt string
|
||||
HTTP path for exposing cardinality metrics. If set to the default /metrics, cardinality metrics are merged with regular metrics and exposed together. If set to a different path, only cardinality metrics are exposed at that endpoint. If set to an empty value, cardinality metrics are not exposed via HTTP at all. (default "/metrics")
|
||||
-config string
|
||||
Path to YAML configuration file. Must be set unless -storageNode is specified. See https://github.com/VictoriaMetrics/cestimator/blob/main/streams.yaml for config example
|
||||
Path to YAML configuration file. Must be set unless -storageNode is specified. See https://github.com/VictoriaMetrics/vmestimator/blob/main/streams.yaml for config example
|
||||
-enableTCP6
|
||||
Whether to enable IPv6 for listening and dialing. By default, only IPv4 TCP and UDP are used
|
||||
-envflag.enable
|
||||
|
||||
@@ -58,10 +58,13 @@ var (
|
||||
|
||||
disableKeepAlive = flag.Bool("http.disableKeepAlive", false, "Whether to disable HTTP keep-alive for incoming connections at -httpListenAddr")
|
||||
disableResponseCompression = flag.Bool("http.disableResponseCompression", false, "Disable compression of HTTP responses to save CPU resources. By default, compression is enabled to save network bandwidth")
|
||||
maxGracefulShutdownDuration = flag.Duration("http.maxGracefulShutdownDuration", 7*time.Second, `The maximum duration for a graceful shutdown of the HTTP server. A highly loaded server may require increased value for a graceful shutdown`)
|
||||
shutdownDelay = flag.Duration("http.shutdownDelay", 0, `Optional delay before http server shutdown. During this delay, the server returns non-OK responses from /health page, so load balancers can route new requests to other servers`)
|
||||
idleConnTimeout = flag.Duration("http.idleConnTimeout", time.Minute, "Timeout for incoming idle http connections")
|
||||
connTimeout = flag.Duration("http.connTimeout", 2*time.Minute, "Incoming connections to -httpListenAddr are closed after the configured timeout. "+
|
||||
maxGracefulShutdownDuration = flag.Duration("http.maxGracefulShutdownDuration", 7*time.Second, "The maximum duration for a graceful shutdown of the HTTP server. "+
|
||||
"During this period the server stops accepting new connections, but it will continue serving existing connections. "+
|
||||
"The remaining in-flight requests are canceled before the deadline, so the shutdown can finish within this duration. "+
|
||||
"A highly loaded server may require increased value for a graceful shutdown")
|
||||
shutdownDelay = flag.Duration("http.shutdownDelay", 0, `Optional delay before http server shutdown. During this delay, the server returns non-OK responses from /health page, so load balancers can route new requests to other servers`)
|
||||
idleConnTimeout = flag.Duration("http.idleConnTimeout", time.Minute, "Timeout for incoming idle http connections")
|
||||
connTimeout = flag.Duration("http.connTimeout", 2*time.Minute, "Incoming connections to -httpListenAddr are closed after the configured timeout. "+
|
||||
"This may help evenly spreading load among a cluster of services behind TCP-level load balancer. Zero value disables closing of incoming connections")
|
||||
|
||||
headerHSTS = flag.String("http.header.hsts", "", "Value for 'Strict-Transport-Security' header, recommended: 'max-age=31536000; includeSubDomains'")
|
||||
@@ -80,6 +83,7 @@ var (
|
||||
type server struct {
|
||||
shutdownDelayDeadline atomic.Int64
|
||||
s *http.Server
|
||||
cancel context.CancelFunc
|
||||
}
|
||||
|
||||
// RequestHandler must serve the given request r and write response to w.
|
||||
@@ -156,7 +160,11 @@ func serve(addr string, rh RequestHandler, idx int, opts ServeOptions) {
|
||||
func serveWithListener(addr string, ln net.Listener, rh RequestHandler, disableBuiltinRoutes bool) {
|
||||
var s server
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
s.s = &http.Server{
|
||||
BaseContext: func(l net.Listener) context.Context {
|
||||
return ctx
|
||||
},
|
||||
|
||||
// Disable http/2, since it doesn't give any advantages for VictoriaMetrics services.
|
||||
TLSNextProto: make(map[string]func(*http.Server, *tls.Conn, http.Handler)),
|
||||
@@ -170,6 +178,7 @@ func serveWithListener(addr string, ln net.Listener, rh RequestHandler, disableB
|
||||
ErrorLog: log.New(&tlsErrorSkipLogger{}, "", 0),
|
||||
}
|
||||
s.s.SetKeepAlivesEnabled(!*disableKeepAlive)
|
||||
s.cancel = cancel
|
||||
if *connTimeout > 0 {
|
||||
s.s.ConnContext = func(ctx context.Context, _ net.Conn) context.Context {
|
||||
timeoutSec := connTimeout.Seconds()
|
||||
@@ -265,8 +274,18 @@ func stop(addr string) error {
|
||||
logger.Infof("Starting shutdown for http server %q", addr)
|
||||
}
|
||||
|
||||
// Cancel in-flight requests shortly before the deadline, reserving up to 2s (or 20%
|
||||
// of the window, whichever is smaller) for them to unwind, so Shutdown returns cleanly
|
||||
// within -http.maxGracefulShutdownDuration instead of timing out and dying via
|
||||
// logger.Fatalf -> os.Exit, which skips the storage flush and loses data.
|
||||
// See https://github.com/VictoriaMetrics/VictoriaLogs/issues/1502
|
||||
cancelInflightAfter := *maxGracefulShutdownDuration - min(*maxGracefulShutdownDuration/5, 2*time.Second)
|
||||
ctx, cancel := context.WithTimeout(context.Background(), *maxGracefulShutdownDuration)
|
||||
defer cancel()
|
||||
|
||||
t := time.AfterFunc(cancelInflightAfter, s.cancel)
|
||||
defer t.Stop()
|
||||
|
||||
if err := s.s.Shutdown(ctx); err != nil {
|
||||
return fmt.Errorf("cannot gracefully shutdown http server at %q in %.3fs; "+
|
||||
"probably, `-http.maxGracefulShutdownDuration` command-line flag value must be increased; error: %s", addr, maxGracefulShutdownDuration.Seconds(), err)
|
||||
|
||||
@@ -105,6 +105,10 @@ type body struct {
|
||||
Scope string `json:"scope,omitempty"`
|
||||
vmAccessClaim VMAccessClaim
|
||||
|
||||
// hasVMAccess is set to true when the token body contains a `vm_access` claim.
|
||||
// Presence enforcement is left to the caller via Token.HasVMAccess.
|
||||
hasVMAccess bool
|
||||
|
||||
buf []byte
|
||||
p *fastjson.Parser
|
||||
|
||||
@@ -121,7 +125,6 @@ type body struct {
|
||||
}
|
||||
|
||||
func (b *body) parse(src string) error {
|
||||
|
||||
var err error
|
||||
b.buf, err = decodeB64(b.buf[:0], src)
|
||||
if err != nil {
|
||||
@@ -132,6 +135,9 @@ func (b *body) parse(src string) error {
|
||||
if err != nil {
|
||||
return err
|
||||
}
|
||||
if jv.Type() != fastjson.TypeObject {
|
||||
return fmt.Errorf("unexpected non json object; type: %q", jv.Type())
|
||||
}
|
||||
if expObject := jv.Get("exp"); expObject != nil {
|
||||
b.Exp, err = expObject.Int64()
|
||||
if err != nil {
|
||||
@@ -153,30 +159,31 @@ func (b *body) parse(src string) error {
|
||||
}
|
||||
|
||||
vaObject := jv.Get("vm_access")
|
||||
if vaObject == nil {
|
||||
return ErrVMAccessFieldMissing
|
||||
}
|
||||
// some IDPs encode custom claims as a string
|
||||
// try parsing as an object and fallback to a string
|
||||
switch vaObject.Type() {
|
||||
case fastjson.TypeObject:
|
||||
if err := b.vmAccessClaim.parseFrom(vaObject); err != nil {
|
||||
return err
|
||||
}
|
||||
case fastjson.TypeString:
|
||||
b.claimsParser = parserPool.Get()
|
||||
va, err := b.claimsParser.ParseBytes(vaObject.GetStringBytes())
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse `vm_access` string json: %w", err)
|
||||
}
|
||||
if err := b.vmAccessClaim.parseFrom(va); err != nil {
|
||||
return fmt.Errorf("cannot parse `vm_access` values from string json: %w", err)
|
||||
}
|
||||
b.vmAccessClaimObject = va
|
||||
case fastjson.TypeNull:
|
||||
return ErrVMAccessFieldMissing
|
||||
switch {
|
||||
case vaObject == nil || vaObject.Type() == fastjson.TypeNull:
|
||||
b.hasVMAccess = false
|
||||
default:
|
||||
return fmt.Errorf("unexpected type for `vm_access` field; got: %q, want object {}", vaObject.Type())
|
||||
// some IDPs encode custom claims as a string
|
||||
// try parsing as an object and fallback to a string
|
||||
switch vaObject.Type() {
|
||||
case fastjson.TypeObject:
|
||||
if err := b.vmAccessClaim.parseFrom(vaObject); err != nil {
|
||||
return err
|
||||
}
|
||||
case fastjson.TypeString:
|
||||
b.claimsParser = parserPool.Get()
|
||||
va, err := b.claimsParser.ParseBytes(vaObject.GetStringBytes())
|
||||
if err != nil {
|
||||
return fmt.Errorf("cannot parse `vm_access` string json: %w", err)
|
||||
}
|
||||
if err := b.vmAccessClaim.parseFrom(va); err != nil {
|
||||
return fmt.Errorf("cannot parse `vm_access` values from string json: %w", err)
|
||||
}
|
||||
b.vmAccessClaimObject = va
|
||||
default:
|
||||
return fmt.Errorf("unexpected type for `vm_access` field; got: %q, want object {}", vaObject.Type())
|
||||
}
|
||||
b.hasVMAccess = true
|
||||
}
|
||||
b.Jti = bytesutil.ToUnsafeString(jv.GetStringBytes("jti"))
|
||||
|
||||
@@ -218,6 +225,7 @@ func (b *body) reset() {
|
||||
b.buf = b.buf[:0]
|
||||
b.allClaims = nil
|
||||
b.vmAccessClaim.reset()
|
||||
b.hasVMAccess = false
|
||||
if b.p != nil {
|
||||
parserPool.Put(b.p)
|
||||
b.p = nil
|
||||
@@ -229,11 +237,9 @@ func (b *body) reset() {
|
||||
if b.vmAccessClaimObject != nil {
|
||||
b.vmAccessClaimObject = nil
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
// Parse parses JWT token from given source string
|
||||
//
|
||||
// Token field is valid until src is reachable
|
||||
func (t *Token) Parse(src string, enforceAuthPrefix bool) error {
|
||||
if enforceAuthPrefix && (len(src) < len(prefix) || !strings.EqualFold(src[:len(prefix)], prefix)) {
|
||||
@@ -268,6 +274,11 @@ func (t *Token) Parse(src string, enforceAuthPrefix bool) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// HasVMAccessClaim reports whether the parsed token contains a `vm_access` claim.
|
||||
func (t *Token) HasVMAccessClaim() bool {
|
||||
return t.body.hasVMAccess
|
||||
}
|
||||
|
||||
// Issuer returns `iss` claim value from token body
|
||||
func (t *Token) Issuer() string {
|
||||
return t.body.Iss
|
||||
@@ -371,30 +382,30 @@ func (t *Token) Reset() {
|
||||
|
||||
// VMAccessClaim represent JWT claim object
|
||||
type VMAccessClaim struct {
|
||||
MetricsExtraFilters []string `json:"metrics_extra_filters,omitempty"`
|
||||
MetricsExtraLabels []string `json:"metrics_extra_labels,omitempty"`
|
||||
LogsExtraFilters []string `json:"logs_extra_filters,omitempty"`
|
||||
LogsExtraStreamFilters []string `json:"logs_extra_stream_filters,omitempty"`
|
||||
MetricsExtraFilters []string `json:"metrics_extra_filters,omitempty" yaml:"metrics_extra_filters,omitempty"`
|
||||
MetricsExtraLabels []string `json:"metrics_extra_labels,omitempty" yaml:"metrics_extra_labels,omitempty"`
|
||||
LogsExtraFilters []string `json:"logs_extra_filters,omitempty" yaml:"logs_extra_filters,omitempty"`
|
||||
LogsExtraStreamFilters []string `json:"logs_extra_stream_filters,omitempty" yaml:"logs_extra_stream_filters,omitempty"`
|
||||
|
||||
MetricsAccountID uint32 `json:"metrics_account_id,omitempty"`
|
||||
MetricsProjectID uint32 `json:"metrics_project_id,omitempty"`
|
||||
MetricsAccountID uint32 `json:"metrics_account_id,omitempty" yaml:"metrics_account_id,omitempty"`
|
||||
MetricsProjectID uint32 `json:"metrics_project_id,omitempty" yaml:"metrics_project_id,omitempty"`
|
||||
|
||||
LogsAccountID uint32 `json:"logs_account_id,omitempty"`
|
||||
LogsProjectID uint32 `json:"logs_project_id,omitempty"`
|
||||
LogsAccountID uint32 `json:"logs_account_id,omitempty" yaml:"logs_account_id,omitempty"`
|
||||
LogsProjectID uint32 `json:"logs_project_id,omitempty" yaml:"logs_project_id,omitempty"`
|
||||
|
||||
// Properties below are deprecated and retained only for compatibility with vmgateway, which is itself deprecated.
|
||||
|
||||
// promql filters applied to each select query
|
||||
// Deprecated
|
||||
ExtraFilters []string `json:"extra_filters,omitempty"`
|
||||
ExtraFilters []string `json:"extra_filters,omitempty" yaml:"-"`
|
||||
// Deprecated
|
||||
Tenant TenantID `json:"tenant_id"`
|
||||
Tenant TenantID `json:"tenant_id" yaml:"-"`
|
||||
// role can be denied as 1 = read, 2 = write, 3 = read and write
|
||||
// 0 = unconfigured - read and write
|
||||
// Deprecated
|
||||
Mode int `json:"mode,omitempty"`
|
||||
Mode int `json:"mode,omitempty" yaml:"-"`
|
||||
// Deprecated
|
||||
Labels []string `json:"extra_labels,omitempty"`
|
||||
Labels []string `json:"extra_labels,omitempty" yaml:"-"`
|
||||
// labelsBuf holds allocated memory for Labels
|
||||
// Deprecated
|
||||
labelsBuf []byte
|
||||
@@ -425,7 +436,6 @@ func (vac *VMAccessClaim) reset() {
|
||||
}
|
||||
|
||||
func (vac *VMAccessClaim) parseFrom(jv *fastjson.Value) error {
|
||||
|
||||
if err := vac.Tenant.parseFrom(jv); err != nil {
|
||||
return err
|
||||
}
|
||||
@@ -569,6 +579,9 @@ func NewToken(auth string, enforceAuthPrefix bool) (*Token, error) {
|
||||
if err := t.parse(jwt[0], jwt[1], jwt[2]); err != nil {
|
||||
return nil, err
|
||||
}
|
||||
if !t.body.hasVMAccess {
|
||||
return nil, ErrVMAccessFieldMissing
|
||||
}
|
||||
return &t, nil
|
||||
}
|
||||
|
||||
|
||||
@@ -168,17 +168,10 @@ func TestParseJWTBody_Failure(t *testing.T) {
|
||||
true,
|
||||
)
|
||||
|
||||
// invalid body type json
|
||||
// non-object body type
|
||||
f(
|
||||
`[]`,
|
||||
"missing `vm_access` claim",
|
||||
true,
|
||||
)
|
||||
|
||||
// missing vm_access claim
|
||||
f(
|
||||
`{}`,
|
||||
"missing `vm_access` claim",
|
||||
`unexpected non json object; type: "array"`,
|
||||
true,
|
||||
)
|
||||
|
||||
@@ -189,13 +182,6 @@ func TestParseJWTBody_Failure(t *testing.T) {
|
||||
true,
|
||||
)
|
||||
|
||||
// vm_access claim null
|
||||
f(
|
||||
`{"vm_access": null}`,
|
||||
"missing `vm_access` claim",
|
||||
true,
|
||||
)
|
||||
|
||||
// invalid vm_access: account_id type mismatch
|
||||
f(
|
||||
`{"vm_access": {"tenant_id": {"account_id": "1", "project_id": 5}}}`,
|
||||
@@ -555,6 +541,33 @@ func TestParseJWTBody_Success(t *testing.T) {
|
||||
)
|
||||
}
|
||||
|
||||
func TestParseJWTBody_VMAccessPresence(t *testing.T) {
|
||||
f := func(data string, wantHasVMAccess bool) {
|
||||
t.Helper()
|
||||
|
||||
encodedLen := base64.RawURLEncoding.EncodedLen(len(data))
|
||||
encoded := make([]byte, encodedLen)
|
||||
base64.RawURLEncoding.Encode(encoded, []byte(data))
|
||||
|
||||
var b body
|
||||
if err := b.parse(string(encoded)); err != nil {
|
||||
t.Fatalf("unexpected error: %s", err)
|
||||
}
|
||||
if b.hasVMAccess != wantHasVMAccess {
|
||||
t.Fatalf("unexpected hasVMAccess; got %v; want %v", b.hasVMAccess, wantHasVMAccess)
|
||||
}
|
||||
}
|
||||
|
||||
// vm_access claim is present
|
||||
f(`{"vm_access": {}}`, true)
|
||||
f(`{"vm_access": {"metrics_account_id": 1}}`, true)
|
||||
|
||||
// vm_access claim is absent or null - parsing must succeed with hasVMAccess=false
|
||||
f(`{}`, false)
|
||||
f(`{"vm_access": null}`, false)
|
||||
f(`{"role": "admin"}`, false)
|
||||
}
|
||||
|
||||
func TestNewTokenFromRequest_Failure(t *testing.T) {
|
||||
f := func(r *http.Request) {
|
||||
t.Helper()
|
||||
@@ -866,7 +879,6 @@ func TestNewTokenFromRequest_Success(t *testing.T) {
|
||||
}
|
||||
|
||||
func TestTokenMatchClaims(t *testing.T) {
|
||||
|
||||
/*
|
||||
{
|
||||
"iss": "https://login.microsoftonline.com/-6691-4868-a77b-1b0f9bbe5f43/v2.0",
|
||||
|
||||
311
lib/mdx/filter.go
Normal file
311
lib/mdx/filter.go
Normal file
@@ -0,0 +1,311 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"flag"
|
||||
"strconv"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
var (
|
||||
vmLabel = flag.String("mdx.label", "", "Optional label value in the form 'name=value' used to identify VictoriaMetrics metrics for MDX. "+
|
||||
"Metrics containing the specified label are forwarded to `-remoteWrite.url` endpoints configured with `-remoteWrite.mdx.enable=true`.")
|
||||
)
|
||||
|
||||
const (
|
||||
vmAppLabelName = "victoriametrics_app"
|
||||
vmAppLabelValue = "true"
|
||||
vmAppVersionMetricName = "vm_app_version"
|
||||
)
|
||||
|
||||
// Ctx defines filtering context
|
||||
type Ctx struct {
|
||||
// labels hold modified timeseries labels
|
||||
// valid until PutContext call
|
||||
labels []prompb.Label
|
||||
|
||||
buf []byte
|
||||
hasVMAppLabel bool
|
||||
hasVMAppVersionLabel bool
|
||||
hasFilterLabelValue bool
|
||||
jobLabelValue string
|
||||
instanceLabelValue string
|
||||
}
|
||||
|
||||
func (ctx *Ctx) reset() {
|
||||
// do not reset labels intentionally
|
||||
// it must live until PutContext call
|
||||
|
||||
ctx.buf = ctx.buf[:0]
|
||||
ctx.hasVMAppLabel = false
|
||||
ctx.hasVMAppVersionLabel = false
|
||||
ctx.hasFilterLabelValue = false
|
||||
ctx.jobLabelValue = ""
|
||||
ctx.instanceLabelValue = ""
|
||||
}
|
||||
|
||||
var ctxPool = &sync.Pool{
|
||||
New: func() any {
|
||||
return &Ctx{}
|
||||
},
|
||||
}
|
||||
|
||||
// GetContext returns filtering context
|
||||
func GetContext() *Ctx {
|
||||
return ctxPool.Get().(*Ctx)
|
||||
}
|
||||
|
||||
// PutContext resets context
|
||||
func PutContext(ctx *Ctx) {
|
||||
clear(ctx.labels)
|
||||
ctx.labels = ctx.labels[:0]
|
||||
ctx.reset()
|
||||
ctxPool.Put(ctx)
|
||||
}
|
||||
|
||||
// Filter manages the list of VictoriaMetrics instances grouped by job:instance labels.
|
||||
// job and instance must present at timeseries.
|
||||
//
|
||||
// Filter keeps timeseries with any of the following conditions:
|
||||
// * vm_app_version present
|
||||
// * victoriametrics_app=true label present at timeseries
|
||||
// * if labels has label value defined with flag `-mdx.label`
|
||||
//
|
||||
// Filter track entries with TTL of 1 hour
|
||||
type Filter struct {
|
||||
tracker *instanceTracker
|
||||
filterByLabelName string
|
||||
label string
|
||||
}
|
||||
|
||||
// NewFilter returns new Filter instance
|
||||
func NewFilter() *Filter {
|
||||
filter := &Filter{
|
||||
tracker: newInstanceTracker(),
|
||||
}
|
||||
if len(*vmLabel) > 0 {
|
||||
n := strings.IndexByte(*vmLabel, '=')
|
||||
if n < 0 {
|
||||
logger.Fatalf("missing '=' in `-mdx.label`. It must contain label in the form `name=value`; got %q", *vmLabel)
|
||||
}
|
||||
filter.filterByLabelName = (*vmLabel)[:n]
|
||||
filter.label = (*vmLabel)[n+1:]
|
||||
if len(filter.filterByLabelName) == 0 || len(filter.label) == 0 {
|
||||
logger.Fatalf("label name and value cannot be empty in `-mdx.label`. It must contain label in the form `name=value`; got %q", *vmLabel)
|
||||
}
|
||||
}
|
||||
|
||||
return filter
|
||||
}
|
||||
|
||||
// VMInstancesCount returns amount of currently tracked instances
|
||||
func (filter *Filter) VMInstancesCount() int {
|
||||
return filter.tracker.len()
|
||||
}
|
||||
|
||||
// MustStop stops filter instance
|
||||
func (filter *Filter) MustStop() {
|
||||
filter.tracker.mustStop()
|
||||
}
|
||||
|
||||
// Filter filters provided timeseries with given context.
|
||||
//
|
||||
// Returned timeseries is valid as long as Ctx is valid
|
||||
func (filter *Filter) Filter(ctx *Ctx, tss []prompb.TimeSeries) []prompb.TimeSeries {
|
||||
dstTss := tss[:0]
|
||||
for _, ts := range tss {
|
||||
ctx.prepare(ts.Labels, filter.filterByLabelName, filter.label)
|
||||
key := ctx.formatTimeSeriesKey()
|
||||
if len(key) == 0 {
|
||||
// metrics with empty job or instance labels must be always dropped
|
||||
// despite any other conditions
|
||||
continue
|
||||
}
|
||||
if ctx.hasVMAppLabel {
|
||||
filter.trackInstance(key)
|
||||
dstTss = append(dstTss, ts)
|
||||
continue
|
||||
}
|
||||
if ctx.hasFilterLabelValue || ctx.hasVMAppVersionLabel {
|
||||
ts.Labels = ctx.addVMAppLabel(ts.Labels)
|
||||
filter.trackInstance(key)
|
||||
dstTss = append(dstTss, ts)
|
||||
continue
|
||||
}
|
||||
ok := filter.tracker.has(key)
|
||||
if ok {
|
||||
ts.Labels = ctx.addVMAppLabel(ts.Labels)
|
||||
dstTss = append(dstTss, ts)
|
||||
}
|
||||
}
|
||||
return dstTss
|
||||
}
|
||||
|
||||
func (filter *Filter) trackInstance(key string) {
|
||||
if filter.tracker.has(key) {
|
||||
return
|
||||
}
|
||||
key = strings.Clone(key)
|
||||
filter.tracker.register(key)
|
||||
}
|
||||
|
||||
func (ctx *Ctx) prepare(labels []prompb.Label, filterByLabelName, label string) {
|
||||
ctx.reset()
|
||||
|
||||
// always use the last label=value pair
|
||||
// because in case of possible label duplicates,
|
||||
// the last added label must win
|
||||
for _, l := range labels {
|
||||
switch l.Name {
|
||||
case "job":
|
||||
ctx.jobLabelValue = l.Value
|
||||
case "instance":
|
||||
ctx.instanceLabelValue = l.Value
|
||||
case vmAppLabelName:
|
||||
if l.Value == vmAppLabelValue {
|
||||
ctx.hasVMAppLabel = true
|
||||
}
|
||||
case "__name__":
|
||||
if l.Value == vmAppVersionMetricName {
|
||||
ctx.hasVMAppVersionLabel = true
|
||||
}
|
||||
}
|
||||
if len(filterByLabelName) > 0 {
|
||||
if l.Name == filterByLabelName && l.Value == label {
|
||||
ctx.hasFilterLabelValue = true
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// formatTimeSeriesKey returns timeseries key after ctx.prepare call
|
||||
// if it catched job and instances labels
|
||||
//
|
||||
// returned string is valid until next ctx.prepare
|
||||
func (ctx *Ctx) formatTimeSeriesKey() string {
|
||||
if len(ctx.jobLabelValue) == 0 || len(ctx.instanceLabelValue) == 0 {
|
||||
return ""
|
||||
}
|
||||
buf := ctx.buf[:0]
|
||||
buf = strconv.AppendQuote(buf, ctx.jobLabelValue)
|
||||
buf = append(buf, ':')
|
||||
buf = strconv.AppendQuote(buf, ctx.instanceLabelValue)
|
||||
ctx.buf = buf
|
||||
return bytesutil.ToUnsafeString(buf)
|
||||
}
|
||||
|
||||
func (ctx *Ctx) addVMAppLabel(labels []prompb.Label) []prompb.Label {
|
||||
// unconditionally add vmAppLabelValue at the end of labels list
|
||||
// it will overwrite any exist vmAppLabelName labels with a value different to vmAppLabelValue
|
||||
// it's guaranteed by VictoriaMetrics ingestion contract
|
||||
poolLabels := ctx.labels
|
||||
poolLabelsLen := len(poolLabels)
|
||||
poolLabels = append(poolLabels, labels...)
|
||||
poolLabels = append(poolLabels, prompb.Label{Name: vmAppLabelName, Value: vmAppLabelValue})
|
||||
ctx.labels = poolLabels
|
||||
return poolLabels[poolLabelsLen:len(poolLabels):len(poolLabels)]
|
||||
}
|
||||
|
||||
type instanceTracker struct {
|
||||
mu sync.RWMutex
|
||||
lastAccessByKey map[string]*atomic.Uint64
|
||||
wg sync.WaitGroup
|
||||
stop chan struct{}
|
||||
}
|
||||
|
||||
func newInstanceTracker() *instanceTracker {
|
||||
c := &instanceTracker{
|
||||
lastAccessByKey: make(map[string]*atomic.Uint64),
|
||||
stop: make(chan struct{}),
|
||||
}
|
||||
c.wg.Add(1)
|
||||
go c.startStaleWatcher()
|
||||
return c
|
||||
}
|
||||
|
||||
func (it *instanceTracker) len() int {
|
||||
it.mu.RLock()
|
||||
s := len(it.lastAccessByKey)
|
||||
it.mu.RUnlock()
|
||||
return s
|
||||
}
|
||||
|
||||
func (it *instanceTracker) has(key string) bool {
|
||||
it.mu.RLock()
|
||||
lat, ok := it.lastAccessByKey[key]
|
||||
it.mu.RUnlock()
|
||||
if ok {
|
||||
lat.Store(fasttime.UnixTimestamp())
|
||||
}
|
||||
return ok
|
||||
}
|
||||
|
||||
func (it *instanceTracker) register(key string) {
|
||||
it.mu.Lock()
|
||||
// key could be registered by concurrent goroutine
|
||||
lat, ok := it.lastAccessByKey[key]
|
||||
if !ok {
|
||||
lat = &atomic.Uint64{}
|
||||
it.lastAccessByKey[key] = lat
|
||||
}
|
||||
it.mu.Unlock()
|
||||
lat.Store(fasttime.UnixTimestamp())
|
||||
}
|
||||
|
||||
func (it *instanceTracker) startStaleWatcher() {
|
||||
defer it.wg.Done()
|
||||
|
||||
t := time.NewTicker(time.Minute)
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
case <-it.stop:
|
||||
return
|
||||
case <-t.C:
|
||||
it.cleanStale()
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
var entryTTLSeconds = uint64(time.Hour.Seconds())
|
||||
|
||||
func (it *instanceTracker) cleanStale() {
|
||||
ct := fasttime.UnixTimestamp()
|
||||
var toDelete map[string]*atomic.Uint64
|
||||
|
||||
it.mu.RLock()
|
||||
for key, lastAccessTime := range it.lastAccessByKey {
|
||||
accessedAt := lastAccessTime.Load()
|
||||
if ct > accessedAt+entryTTLSeconds {
|
||||
if toDelete == nil {
|
||||
toDelete = make(map[string]*atomic.Uint64)
|
||||
}
|
||||
toDelete[key] = lastAccessTime
|
||||
}
|
||||
}
|
||||
it.mu.RUnlock()
|
||||
|
||||
if len(toDelete) > 0 {
|
||||
it.mu.Lock()
|
||||
for key, lastAccessTime := range toDelete {
|
||||
accessedAt := lastAccessTime.Load()
|
||||
// concurrent goroutine may refresh lastAccessTime
|
||||
if ct > accessedAt+entryTTLSeconds {
|
||||
delete(it.lastAccessByKey, key)
|
||||
}
|
||||
}
|
||||
it.mu.Unlock()
|
||||
}
|
||||
}
|
||||
|
||||
func (it *instanceTracker) mustStop() {
|
||||
close(it.stop)
|
||||
it.wg.Wait()
|
||||
}
|
||||
104
lib/mdx/filter_synctest_test.go
Normal file
104
lib/mdx/filter_synctest_test.go
Normal file
@@ -0,0 +1,104 @@
|
||||
//go:build synctest
|
||||
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"testing"
|
||||
"testing/synctest"
|
||||
"time"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
func TestMdxInstanceCleanup(t *testing.T) {
|
||||
|
||||
synctest.Test(t, func(t *testing.T) {
|
||||
filter := NewFilter()
|
||||
defer filter.MustStop()
|
||||
assertFilterLen := func(expectedLen int) {
|
||||
t.Helper()
|
||||
if filter.VMInstancesCount() != expectedLen {
|
||||
t.Fatalf("unexpected instance map length; got %d; want %d", filter.VMInstancesCount(), expectedLen)
|
||||
}
|
||||
}
|
||||
|
||||
ctx := GetContext()
|
||||
filter.Filter(ctx, []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_up"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "node-exporter1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_request_duration_seconds"},
|
||||
{Name: "instance", Value: "service1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent1:8429"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "scrape_targets_up"},
|
||||
{Name: "instance", Value: "vmagent1:8429"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
PutContext(ctx)
|
||||
|
||||
time.Sleep(1 * time.Minute)
|
||||
// the entries should not be cleaned.
|
||||
assertFilterLen(2)
|
||||
|
||||
time.Sleep(58 * time.Minute)
|
||||
// receive samples from victoria-metrics1:8428 after 59 minutes.
|
||||
// so the entry will be refreshed.
|
||||
ctx = GetContext()
|
||||
filter.Filter(ctx, []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_up"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
PutContext(ctx)
|
||||
assertFilterLen(2)
|
||||
|
||||
// entry for job:instance - test:vmagent1:8429 must be removed
|
||||
time.Sleep(4 * time.Minute)
|
||||
assertFilterLen(1)
|
||||
|
||||
// no samples from vmagent1:8429 in the last hour, so it should be removed from the mdx instance list.
|
||||
time.Sleep(2 * time.Hour)
|
||||
|
||||
assertFilterLen(0)
|
||||
})
|
||||
|
||||
}
|
||||
435
lib/mdx/filter_test.go
Normal file
435
lib/mdx/filter_test.go
Normal file
@@ -0,0 +1,435 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
func TestMdxInstanceFilter(t *testing.T) {
|
||||
originalVmLabel := *vmLabel
|
||||
*vmLabel = "service=victoriametrics"
|
||||
t.Cleanup(func() {
|
||||
*vmLabel = originalVmLabel
|
||||
})
|
||||
f := func(input []prompb.TimeSeries, expectedOutput []prompb.TimeSeries) {
|
||||
t.Helper()
|
||||
filter := NewFilter()
|
||||
defer filter.MustStop()
|
||||
|
||||
ctx := GetContext()
|
||||
defer PutContext(ctx)
|
||||
inputCopy := append([]prompb.TimeSeries{}, input...)
|
||||
output := filter.Filter(ctx, inputCopy)
|
||||
if diff := cmp.Diff(expectedOutput, output); len(diff) > 0 {
|
||||
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
|
||||
}
|
||||
// make sure that result is the same over multiple calls
|
||||
inputCopy = append([]prompb.TimeSeries{}, input...)
|
||||
output = filter.Filter(ctx, inputCopy)
|
||||
if diff := cmp.Diff(expectedOutput, output); len(diff) > 0 {
|
||||
t.Fatalf("unexpected result (-want, +got):\n%s", diff)
|
||||
}
|
||||
|
||||
}
|
||||
// metrics with vm_app_version and different order of labels.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics2:8428"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics3:8428"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics2:8428"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics3:8428"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
}},
|
||||
)
|
||||
// metrics without vm_app_version but with service=victoriametrics that is specified in `-mdx.label`.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
})
|
||||
// metrics without vm_app_version but with service=victoriametrics that is specified in `-mdx.label`.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
}},
|
||||
[]prompb.TimeSeries{
|
||||
// 2.
|
||||
// metrics without vm_app_version but with service=victoriametrics that is specified in `-mdx.label`.
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
}})
|
||||
|
||||
// metrics with vm_app_version and service=victoriametrics should be preserved.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
},
|
||||
)
|
||||
// metrics without vm_app_version and `service=victoriametrics` but with `victoriametrics_app=true`, which should be preserved.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics6:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics6:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "vm_slow_queries_total"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// metrics without vm_app_version and service=victoriametrics and `victoriametrics_app=true`, which should be filtered out.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "go_gc_duration_seconds"},
|
||||
{Name: "instance", Value: "node-exporter1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_request_duration_seconds"},
|
||||
{Name: "instance", Value: "service1"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{},
|
||||
)
|
||||
|
||||
// metrics with vm_app_version but job or instance is empty (or missing), they should be dropped.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: ""},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent2:8429"},
|
||||
{Name: "job", Value: ""},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "vmagent2:8429"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{})
|
||||
|
||||
// metrics without vm_app_version, but the instances were already registered with first timeseries
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_rows_inserted_total"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_rows_inserted_total"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// metrics without vm_app_version, `service=victoriametrics` and `victoriametrics_app=true`, and the instance wasn't already registered in the previous call, so it will be dropped.
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vminsert_request_duration_seconds_bucket"},
|
||||
{Name: "instance", Value: "victoria-metrics7:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
}},
|
||||
)
|
||||
|
||||
// metrics with duplicate victoriametrics_app label
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "other_value"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "other_value"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
})
|
||||
|
||||
// metrics with duplicate job and instance labels
|
||||
// last value wins
|
||||
f([]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "job", Value: "test2"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_requests_total"},
|
||||
{Name: "job", Value: "test2"},
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
},
|
||||
},
|
||||
},
|
||||
[]prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "job", Value: "test2"},
|
||||
{Name: "instance", Value: "victoria-metrics4:8428"},
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "http_requests_total"},
|
||||
{Name: "job", Value: "test2"},
|
||||
{Name: "instance", Value: "victoria-metrics5:8428"},
|
||||
{Name: "service", Value: "victoriametrics"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
}})
|
||||
|
||||
}
|
||||
|
||||
func TestMdxInstanceFilterConcurrent(t *testing.T) {
|
||||
originalVmLabel := *vmLabel
|
||||
*vmLabel = "service=victoriametrics"
|
||||
t.Cleanup(func() { *vmLabel = originalVmLabel })
|
||||
|
||||
filter := NewFilter()
|
||||
defer filter.MustStop()
|
||||
|
||||
const concurrency = 8
|
||||
const iterations = 200
|
||||
|
||||
generateSeries := func(g int) []prompb.TimeSeries {
|
||||
return []prompb.TimeSeries{
|
||||
{Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: fmt.Sprintf("vm-%d:8428", g)},
|
||||
}},
|
||||
// shared job:instance
|
||||
{Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "instance", Value: "vmagent:8428"},
|
||||
}},
|
||||
}
|
||||
}
|
||||
|
||||
var wg sync.WaitGroup
|
||||
for worker := range concurrency {
|
||||
wg.Go(func() {
|
||||
input := generateSeries(worker)
|
||||
var expectedOutput []prompb.TimeSeries
|
||||
for _, inputTs := range input {
|
||||
labels := append([]prompb.Label{}, inputTs.Labels...)
|
||||
labels = append(labels, prompb.Label{Name: vmAppLabelName, Value: vmAppLabelValue})
|
||||
expectedOutput = append(expectedOutput, prompb.TimeSeries{Labels: labels})
|
||||
}
|
||||
for range iterations {
|
||||
ctx := GetContext()
|
||||
inputCopy := append([]prompb.TimeSeries{}, input...)
|
||||
output := filter.Filter(ctx, inputCopy)
|
||||
if diff := cmp.Diff(expectedOutput, output); len(diff) > 0 {
|
||||
t.Errorf("unexpected result (-want, +got):\n%s", diff)
|
||||
}
|
||||
PutContext(ctx)
|
||||
|
||||
}
|
||||
})
|
||||
}
|
||||
wg.Wait()
|
||||
|
||||
// goroutines + 1 shared
|
||||
if got := filter.VMInstancesCount(); got != concurrency+1 {
|
||||
t.Errorf("unexpected instance count: got %d, want %d", got, concurrency+1)
|
||||
}
|
||||
}
|
||||
85
lib/mdx/filter_timing_test.go
Normal file
85
lib/mdx/filter_timing_test.go
Normal file
@@ -0,0 +1,85 @@
|
||||
package mdx
|
||||
|
||||
import (
|
||||
"testing"
|
||||
|
||||
"github.com/google/go-cmp/cmp"
|
||||
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
)
|
||||
|
||||
func BenchmarkFilter(b *testing.B) {
|
||||
f := func(name string, input, want []prompb.TimeSeries) {
|
||||
b.Helper()
|
||||
|
||||
b.Run(name, func(b *testing.B) {
|
||||
filter := NewFilter()
|
||||
defer filter.MustStop()
|
||||
b.ReportAllocs()
|
||||
b.ResetTimer()
|
||||
b.RunParallel(func(pb *testing.PB) {
|
||||
for pb.Next() {
|
||||
ctx := GetContext()
|
||||
localInput := append([]prompb.TimeSeries{}, input...)
|
||||
tss := filter.Filter(ctx, localInput)
|
||||
if len(tss) != len(want) {
|
||||
diff := cmp.Diff(want, tss)
|
||||
b.Fatalf("unexpected result (-want, +got):\n%s", diff)
|
||||
}
|
||||
PutContext(ctx)
|
||||
}
|
||||
})
|
||||
})
|
||||
}
|
||||
|
||||
input := []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "http_requests_total"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "http_requests_errors_total"},
|
||||
},
|
||||
},
|
||||
}
|
||||
expected := []prompb.TimeSeries{
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "__name__", Value: "vm_app_version"},
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "http_requests_total"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
{
|
||||
Labels: []prompb.Label{
|
||||
{Name: "instance", Value: "victoria-metrics1:8428"},
|
||||
{Name: "job", Value: "test"},
|
||||
{Name: "__name__", Value: "http_requests_errors_total"},
|
||||
{Name: "victoriametrics_app", Value: "true"},
|
||||
},
|
||||
},
|
||||
}
|
||||
f("match vm_app_version", input, expected)
|
||||
}
|
||||
@@ -112,11 +112,19 @@ func (m *TimeSeries) size() (n int) {
|
||||
}
|
||||
for _, e := range m.Labels {
|
||||
l := e.size()
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
for _, e := range m.Samples {
|
||||
l := e.size()
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
@@ -126,10 +134,18 @@ func (m *Label) size() (n int) {
|
||||
return 0
|
||||
}
|
||||
if l := len(m.Name); l > 0 {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
if l := len(m.Value); l > 0 {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
@@ -160,6 +176,11 @@ func (m *WriteRequest) marshalToSizedBuffer(dst []byte) (int, error) {
|
||||
}
|
||||
|
||||
func encodeVarint(dst []byte, offset int, v uint64) int {
|
||||
if v < 1<<7 {
|
||||
offset--
|
||||
dst[offset] = byte(v)
|
||||
return offset
|
||||
}
|
||||
offset -= sov(v)
|
||||
base := offset
|
||||
for v >= 1<<7 {
|
||||
@@ -180,7 +201,11 @@ func (m *WriteRequest) size() (n int) {
|
||||
}
|
||||
for _, e := range m.Metadata {
|
||||
l := e.size()
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
return n
|
||||
}
|
||||
@@ -238,13 +263,25 @@ func (m *MetricMetadata) size() (n int) {
|
||||
n += 1 + sov(uint64(m.Type))
|
||||
}
|
||||
if l := len(m.MetricFamilyName); l > 0 {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
if l := len(m.Help); l > 0 {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
if l := len(m.Unit); l > 0 {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
if l < 128 {
|
||||
n += 2 + l
|
||||
} else {
|
||||
n += 1 + l + sov(uint64(l))
|
||||
}
|
||||
}
|
||||
if m.AccountID != 0 {
|
||||
n += 1 + sov(uint64(m.AccountID))
|
||||
|
||||
Reference in New Issue
Block a user