diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index a3ac58675d..92d3552754 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -27,6 +27,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits" "github.com/VictoriaMetrics/metrics" @@ -1011,9 +1012,9 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro return false } -var matchIdxsPool bytesutil.ByteBufferPool +var matchIdxsPool slicesutil.BufferPool[uint32] -func dropAggregatedSeries(src []prompb.TimeSeries, matchIdxs []byte, dropInput bool) []prompb.TimeSeries { +func dropAggregatedSeries(src []prompb.TimeSeries, matchIdxs []uint32, dropInput bool) []prompb.TimeSeries { dst := src[:0] if !dropInput { for i, match := range matchIdxs { diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index 239f3600ba..a7367ffb5e 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -221,7 +221,7 @@ func (ctx *InsertCtx) FlushBufs() error { } } -func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []byte) { +func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []uint32) { dst := ctx.mrs[:0] src := ctx.mrs if !*streamAggrDropInput { @@ -239,4 +239,4 @@ func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []byte) { ctx.mrs = dst } -var matchIdxsPool bytesutil.ByteBufferPool +var matchIdxsPool slicesutil.BufferPool[uint32] diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 5c837ab717..62bc4bed96 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/metrics" @@ -189,7 +190,7 @@ func (ctx *streamAggrCtx) Reset() { ctx.buf = ctx.buf[:0] } -func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte { +func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []uint32) []uint32 { mn := &ctx.mn tss := ctx.tss labels := ctx.labels @@ -248,7 +249,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte if sas.IsEnabled() { matchIdxs = sas.Push(tss, matchIdxs) } else if deduplicator != nil { - matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss)) + matchIdxs = slicesutil.SetLength(matchIdxs, len(tss)) for i := range matchIdxs { matchIdxs[i] = 1 } diff --git a/docs/victoriametrics/changelog/CHANGELOG.md b/docs/victoriametrics/changelog/CHANGELOG.md index b6bdae1ec0..3a7b45cd61 100644 --- a/docs/victoriametrics/changelog/CHANGELOG.md +++ b/docs/victoriametrics/changelog/CHANGELOG.md @@ -41,6 +41,7 @@ It disables `Discovered targets` debug IU by default. * FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `-secret.flags` command-line flag to configure flags to be hidden in logs and on `/metrics`. This is useful for protecting sensitive flag values (for example `-remoteWrite.headers`) from being exposed in logs or metrics. See [#6938](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6938). Thank you @truepele for the issue and PR * FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): change default value of the flag `-promscrape.dropOriginalLabels` from `false` to `true`. This helps reducing CPU and Memory usage by dropping targets original labels during [service discovery](https://docs.victoriametrics.com/victoriametrics/sd_configs/), but disables [relabel debugging UI](https://docs.victoriametrics.com/victoriametrics/relabeling/#relabel-debugging). This change gives constant improvement on resource usage, and relabel debugging can be turned on on demand. See this issue [#9665](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9665) for details. +* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): improve performance for the [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/) with multiple configured rules. See this issue [#9878](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878) for details. * FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): on the [Cardinality Explorer](https://docs.victoriametrics.com/victoriametrics/#cardinality-explorer) page, rename `Reset` to `Reset filters` and disable the button when no filters are modified. See [#9609](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9609). * FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): reflect query visibility state in URL on `Query` or `Raw Query` pages. Now, when sharing URL with a query, its visibility state will be preserved. See [#9826](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9826) for more details. diff --git a/lib/slicesutil/buffer.go b/lib/slicesutil/buffer.go new file mode 100644 index 0000000000..d13c8210f7 --- /dev/null +++ b/lib/slicesutil/buffer.go @@ -0,0 +1,34 @@ +package slicesutil + +import "sync" + +// Buffer implements a simple buffer for T. +type Buffer[T any] struct { + // B is the underlying T slice. + B []T +} + +// Reset resets b. +func (b *Buffer[T]) Reset() { + b.B = b.B[:0] +} + +// BufferPool is a pool of T Buffers. +type BufferPool[T any] struct { + p sync.Pool +} + +// Get obtains a Buffer from bp. +func (bp *BufferPool[T]) Get() *Buffer[T] { + bbv := bp.p.Get() + if bbv == nil { + return &Buffer[T]{} + } + return bbv.(*Buffer[T]) +} + +// Put puts b into bp. +func (bp *BufferPool[T]) Put(b *Buffer[T]) { + b.Reset() + bp.p.Put(b) +} diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 41aaf71d6c..8dd129f3d8 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -21,6 +21,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" "github.com/valyala/histogram" @@ -359,8 +360,8 @@ func (a *Aggregators) Equal(b *Aggregators) bool { // Push returns matchIdxs with len equal to len(tss). // It reuses the matchIdxs if it has enough capacity to hold len(tss) items. // Otherwise it allocates new matchIdxs. -func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []byte) []byte { - matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss)) +func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []uint32) []uint32 { + matchIdxs = slicesutil.SetLength(matchIdxs, len(tss)) for i := range matchIdxs { matchIdxs[i] = 0 } @@ -368,10 +369,23 @@ func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []byte) []byte { return matchIdxs } + // use all available CPU cores to copy time-series into aggregators + // See this issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878 + var wg sync.WaitGroup + concurrencyChan := make(chan struct{}, cgroup.AvailableCPUs()) + + wg.Add(len(a.as)) for _, aggr := range a.as { - aggr.Push(tss, matchIdxs) + concurrencyChan <- struct{}{} + go func(aggr *aggregator) { + aggr.Push(tss, matchIdxs) + wg.Done() + <-concurrencyChan + }(aggr) } + wg.Wait() + return matchIdxs } @@ -951,7 +965,7 @@ func (a *aggregator) MustStop() { } // Push pushes tss to a. -func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []byte) { +func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) { ctx := getPushCtx() defer putPushCtx(ctx) @@ -975,7 +989,7 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []byte) { if !a.match.Match(ts.Labels) { continue } - matchIdxs[idx] = 1 + atomic.StoreUint32(&matchIdxs[idx], 1) if len(dropLabels) > 0 { labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels) diff --git a/lib/streamaggr/streamaggr_synctest_test.go b/lib/streamaggr/streamaggr_synctest_test.go index ac94e78b70..d04622dabf 100644 --- a/lib/streamaggr/streamaggr_synctest_test.go +++ b/lib/streamaggr/streamaggr_synctest_test.go @@ -17,7 +17,7 @@ func TestAggregatorsSuccess(t *testing.T) { f := func(inputMetrics []string, interval time.Duration, outputMetricsExpected, config, matchIdxsStrExpected string) { t.Helper() synctest.Run(func() { - var matchIdxs []byte + var matchIdxs []uint32 var tssOutput []prompb.TimeSeries var tssOutputLock sync.Mutex diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index 01a4f54233..c4f4340a11 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -51,7 +51,7 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { b.ReportAllocs() b.SetBytes(int64(len(benchSeries) * loops)) b.RunParallel(func(pb *testing.PB) { - var matchIdxs []byte + var matchIdxs []uint32 for pb.Next() { for i := 0; i < loops; i++ { matchIdxs = a.Push(benchSeries, matchIdxs) @@ -65,13 +65,13 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators { for i := range outputs { outputsQuoted[i] = stringsutil.JSONString(outputs[i]) } + config := fmt.Sprintf(` - match: http_requests_total interval: 24h by: [job] outputs: [%s] `, strings.Join(outputsQuoted, ",")) - a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias") if err != nil { panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) @@ -94,3 +94,46 @@ func newBenchSeries(seriesCount int) []prompb.TimeSeries { const seriesCount = 10_000 var benchSeries = newBenchSeries(seriesCount) + +func BenchmarkConcurrentAggregatorsPush(b *testing.B) { + pushFunc := func(_ []prompb.TimeSeries) {} + + a := newPerOutputBenchAggregators(benchOutputs, pushFunc) + defer a.MustStop() + + const loops = 100 + + b.ResetTimer() + b.ReportAllocs() + b.SetBytes(int64(len(benchSeries) * loops)) + b.RunParallel(func(pb *testing.PB) { + var matchIdxs []uint32 + for pb.Next() { + for i := 0; i < loops; i++ { + matchIdxs = a.Push(benchSeries, matchIdxs) + } + } + }) +} + +func newPerOutputBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators { + outputsQuoted := make([]string, len(outputs)) + var config string + for i := range outputs { + outputsQuoted[i] = stringsutil.JSONString(outputs[i]) + cfg := fmt.Sprintf(` +- match: http_requests_total + interval: 24h + by: [job] + outputs: [%s] +`, stringsutil.JSONString(outputs[i])) + config += cfg + + } + + a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias") + if err != nil { + panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) + } + return a +}