From 11f488d8ff7dfd30d1d4ef9e25d4e148adacf8e6 Mon Sep 17 00:00:00 2001 From: Nikolay Date: Fri, 24 Oct 2025 10:29:48 +0200 Subject: [PATCH] lib/streamaggr: concurrently push timeseries to aggregators Previously all timeseries pushed into aggregators were added sequentially. It could cause delays on data ingestion and it was not possible to use all available. This commit adds concurrency based on available CPU cores. Also, it adds new generic Buffer and BufferPool into slicesutil. Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878 --- app/vmagent/remotewrite/remotewrite.go | 5 ++- app/vminsert/common/insert_ctx.go | 4 +- app/vminsert/common/streamaggr.go | 5 ++- docs/victoriametrics/changelog/CHANGELOG.md | 1 + lib/slicesutil/buffer.go | 34 +++++++++++++++ lib/streamaggr/streamaggr.go | 24 ++++++++--- lib/streamaggr/streamaggr_synctest_test.go | 2 +- lib/streamaggr/streamaggr_timing_test.go | 47 ++++++++++++++++++++- 8 files changed, 108 insertions(+), 14 deletions(-) create mode 100644 lib/slicesutil/buffer.go diff --git a/app/vmagent/remotewrite/remotewrite.go b/app/vmagent/remotewrite/remotewrite.go index a3ac58675d..92d3552754 100644 --- a/app/vmagent/remotewrite/remotewrite.go +++ b/app/vmagent/remotewrite/remotewrite.go @@ -27,6 +27,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits" "github.com/VictoriaMetrics/metrics" @@ -1011,9 +1012,9 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro return false } -var matchIdxsPool bytesutil.ByteBufferPool +var matchIdxsPool slicesutil.BufferPool[uint32] -func dropAggregatedSeries(src []prompb.TimeSeries, matchIdxs []byte, dropInput bool) []prompb.TimeSeries { +func dropAggregatedSeries(src []prompb.TimeSeries, matchIdxs []uint32, dropInput bool) []prompb.TimeSeries { dst := src[:0] if !dropInput { for i, match := range matchIdxs { diff --git a/app/vminsert/common/insert_ctx.go b/app/vminsert/common/insert_ctx.go index 239f3600ba..a7367ffb5e 100644 --- a/app/vminsert/common/insert_ctx.go +++ b/app/vminsert/common/insert_ctx.go @@ -221,7 +221,7 @@ func (ctx *InsertCtx) FlushBufs() error { } } -func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []byte) { +func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []uint32) { dst := ctx.mrs[:0] src := ctx.mrs if !*streamAggrDropInput { @@ -239,4 +239,4 @@ func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []byte) { ctx.mrs = dst } -var matchIdxsPool bytesutil.ByteBufferPool +var matchIdxsPool slicesutil.BufferPool[uint32] diff --git a/app/vminsert/common/streamaggr.go b/app/vminsert/common/streamaggr.go index 5c837ab717..62bc4bed96 100644 --- a/app/vminsert/common/streamaggr.go +++ b/app/vminsert/common/streamaggr.go @@ -13,6 +13,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/logger" "github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/storage" "github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr" "github.com/VictoriaMetrics/metrics" @@ -189,7 +190,7 @@ func (ctx *streamAggrCtx) Reset() { ctx.buf = ctx.buf[:0] } -func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte { +func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []uint32) []uint32 { mn := &ctx.mn tss := ctx.tss labels := ctx.labels @@ -248,7 +249,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte if sas.IsEnabled() { matchIdxs = sas.Push(tss, matchIdxs) } else if deduplicator != nil { - matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss)) + matchIdxs = slicesutil.SetLength(matchIdxs, len(tss)) for i := range matchIdxs { matchIdxs[i] = 1 } diff --git a/docs/victoriametrics/changelog/CHANGELOG.md b/docs/victoriametrics/changelog/CHANGELOG.md index b6bdae1ec0..3a7b45cd61 100644 --- a/docs/victoriametrics/changelog/CHANGELOG.md +++ b/docs/victoriametrics/changelog/CHANGELOG.md @@ -41,6 +41,7 @@ It disables `Discovered targets` debug IU by default. * FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `-secret.flags` command-line flag to configure flags to be hidden in logs and on `/metrics`. This is useful for protecting sensitive flag values (for example `-remoteWrite.headers`) from being exposed in logs or metrics. See [#6938](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6938). Thank you @truepele for the issue and PR * FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): change default value of the flag `-promscrape.dropOriginalLabels` from `false` to `true`. This helps reducing CPU and Memory usage by dropping targets original labels during [service discovery](https://docs.victoriametrics.com/victoriametrics/sd_configs/), but disables [relabel debugging UI](https://docs.victoriametrics.com/victoriametrics/relabeling/#relabel-debugging). This change gives constant improvement on resource usage, and relabel debugging can be turned on on demand. See this issue [#9665](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9665) for details. +* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): improve performance for the [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/) with multiple configured rules. See this issue [#9878](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878) for details. * FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): on the [Cardinality Explorer](https://docs.victoriametrics.com/victoriametrics/#cardinality-explorer) page, rename `Reset` to `Reset filters` and disable the button when no filters are modified. See [#9609](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9609). * FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): reflect query visibility state in URL on `Query` or `Raw Query` pages. Now, when sharing URL with a query, its visibility state will be preserved. See [#9826](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9826) for more details. diff --git a/lib/slicesutil/buffer.go b/lib/slicesutil/buffer.go new file mode 100644 index 0000000000..d13c8210f7 --- /dev/null +++ b/lib/slicesutil/buffer.go @@ -0,0 +1,34 @@ +package slicesutil + +import "sync" + +// Buffer implements a simple buffer for T. +type Buffer[T any] struct { + // B is the underlying T slice. + B []T +} + +// Reset resets b. +func (b *Buffer[T]) Reset() { + b.B = b.B[:0] +} + +// BufferPool is a pool of T Buffers. +type BufferPool[T any] struct { + p sync.Pool +} + +// Get obtains a Buffer from bp. +func (bp *BufferPool[T]) Get() *Buffer[T] { + bbv := bp.p.Get() + if bbv == nil { + return &Buffer[T]{} + } + return bbv.(*Buffer[T]) +} + +// Put puts b into bp. +func (bp *BufferPool[T]) Put(b *Buffer[T]) { + b.Reset() + bp.p.Put(b) +} diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 41aaf71d6c..8dd129f3d8 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -21,6 +21,7 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" "github.com/valyala/histogram" @@ -359,8 +360,8 @@ func (a *Aggregators) Equal(b *Aggregators) bool { // Push returns matchIdxs with len equal to len(tss). // It reuses the matchIdxs if it has enough capacity to hold len(tss) items. // Otherwise it allocates new matchIdxs. -func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []byte) []byte { - matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss)) +func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []uint32) []uint32 { + matchIdxs = slicesutil.SetLength(matchIdxs, len(tss)) for i := range matchIdxs { matchIdxs[i] = 0 } @@ -368,10 +369,23 @@ func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []byte) []byte { return matchIdxs } + // use all available CPU cores to copy time-series into aggregators + // See this issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878 + var wg sync.WaitGroup + concurrencyChan := make(chan struct{}, cgroup.AvailableCPUs()) + + wg.Add(len(a.as)) for _, aggr := range a.as { - aggr.Push(tss, matchIdxs) + concurrencyChan <- struct{}{} + go func(aggr *aggregator) { + aggr.Push(tss, matchIdxs) + wg.Done() + <-concurrencyChan + }(aggr) } + wg.Wait() + return matchIdxs } @@ -951,7 +965,7 @@ func (a *aggregator) MustStop() { } // Push pushes tss to a. -func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []byte) { +func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) { ctx := getPushCtx() defer putPushCtx(ctx) @@ -975,7 +989,7 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []byte) { if !a.match.Match(ts.Labels) { continue } - matchIdxs[idx] = 1 + atomic.StoreUint32(&matchIdxs[idx], 1) if len(dropLabels) > 0 { labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels) diff --git a/lib/streamaggr/streamaggr_synctest_test.go b/lib/streamaggr/streamaggr_synctest_test.go index ac94e78b70..d04622dabf 100644 --- a/lib/streamaggr/streamaggr_synctest_test.go +++ b/lib/streamaggr/streamaggr_synctest_test.go @@ -17,7 +17,7 @@ func TestAggregatorsSuccess(t *testing.T) { f := func(inputMetrics []string, interval time.Duration, outputMetricsExpected, config, matchIdxsStrExpected string) { t.Helper() synctest.Run(func() { - var matchIdxs []byte + var matchIdxs []uint32 var tssOutput []prompb.TimeSeries var tssOutputLock sync.Mutex diff --git a/lib/streamaggr/streamaggr_timing_test.go b/lib/streamaggr/streamaggr_timing_test.go index 01a4f54233..c4f4340a11 100644 --- a/lib/streamaggr/streamaggr_timing_test.go +++ b/lib/streamaggr/streamaggr_timing_test.go @@ -51,7 +51,7 @@ func benchmarkAggregatorsPush(b *testing.B, output string) { b.ReportAllocs() b.SetBytes(int64(len(benchSeries) * loops)) b.RunParallel(func(pb *testing.PB) { - var matchIdxs []byte + var matchIdxs []uint32 for pb.Next() { for i := 0; i < loops; i++ { matchIdxs = a.Push(benchSeries, matchIdxs) @@ -65,13 +65,13 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators { for i := range outputs { outputsQuoted[i] = stringsutil.JSONString(outputs[i]) } + config := fmt.Sprintf(` - match: http_requests_total interval: 24h by: [job] outputs: [%s] `, strings.Join(outputsQuoted, ",")) - a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias") if err != nil { panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) @@ -94,3 +94,46 @@ func newBenchSeries(seriesCount int) []prompb.TimeSeries { const seriesCount = 10_000 var benchSeries = newBenchSeries(seriesCount) + +func BenchmarkConcurrentAggregatorsPush(b *testing.B) { + pushFunc := func(_ []prompb.TimeSeries) {} + + a := newPerOutputBenchAggregators(benchOutputs, pushFunc) + defer a.MustStop() + + const loops = 100 + + b.ResetTimer() + b.ReportAllocs() + b.SetBytes(int64(len(benchSeries) * loops)) + b.RunParallel(func(pb *testing.PB) { + var matchIdxs []uint32 + for pb.Next() { + for i := 0; i < loops; i++ { + matchIdxs = a.Push(benchSeries, matchIdxs) + } + } + }) +} + +func newPerOutputBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators { + outputsQuoted := make([]string, len(outputs)) + var config string + for i := range outputs { + outputsQuoted[i] = stringsutil.JSONString(outputs[i]) + cfg := fmt.Sprintf(` +- match: http_requests_total + interval: 24h + by: [job] + outputs: [%s] +`, stringsutil.JSONString(outputs[i])) + config += cfg + + } + + a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias") + if err != nil { + panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err)) + } + return a +}