lib/streamaggr: concurrently push timeseries to aggregators

Previously all timeseries pushed into aggregators were added
sequentially. It could cause delays on data ingestion and it was not
possible to use all available.

 This commit adds concurrency based on available CPU cores.

Also, it adds new generic Buffer and BufferPool into slicesutil.

Fixes https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
This commit is contained in:
Nikolay
2025-10-24 10:29:48 +02:00
committed by GitHub
parent d0f8773f4b
commit 11f488d8ff
8 changed files with 108 additions and 14 deletions

View File

@@ -27,6 +27,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/ratelimiter"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timeserieslimits"
"github.com/VictoriaMetrics/metrics"
@@ -1011,9 +1012,9 @@ func (rwctx *remoteWriteCtx) TryPushTimeSeries(tss []prompb.TimeSeries, forceDro
return false
}
var matchIdxsPool bytesutil.ByteBufferPool
var matchIdxsPool slicesutil.BufferPool[uint32]
func dropAggregatedSeries(src []prompb.TimeSeries, matchIdxs []byte, dropInput bool) []prompb.TimeSeries {
func dropAggregatedSeries(src []prompb.TimeSeries, matchIdxs []uint32, dropInput bool) []prompb.TimeSeries {
dst := src[:0]
if !dropInput {
for i, match := range matchIdxs {

View File

@@ -221,7 +221,7 @@ func (ctx *InsertCtx) FlushBufs() error {
}
}
func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []byte) {
func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []uint32) {
dst := ctx.mrs[:0]
src := ctx.mrs
if !*streamAggrDropInput {
@@ -239,4 +239,4 @@ func (ctx *InsertCtx) dropAggregatedRows(matchIdxs []byte) {
ctx.mrs = dst
}
var matchIdxsPool bytesutil.ByteBufferPool
var matchIdxsPool slicesutil.BufferPool[uint32]

View File

@@ -13,6 +13,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/logger"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/procutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/storage"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/streamaggr"
"github.com/VictoriaMetrics/metrics"
@@ -189,7 +190,7 @@ func (ctx *streamAggrCtx) Reset() {
ctx.buf = ctx.buf[:0]
}
func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte {
func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []uint32) []uint32 {
mn := &ctx.mn
tss := ctx.tss
labels := ctx.labels
@@ -248,7 +249,7 @@ func (ctx *streamAggrCtx) push(mrs []storage.MetricRow, matchIdxs []byte) []byte
if sas.IsEnabled() {
matchIdxs = sas.Push(tss, matchIdxs)
} else if deduplicator != nil {
matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss))
matchIdxs = slicesutil.SetLength(matchIdxs, len(tss))
for i := range matchIdxs {
matchIdxs[i] = 1
}

View File

@@ -41,6 +41,7 @@ It disables `Discovered targets` debug IU by default.
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/), [vmalert](https://docs.victoriametrics.com/victoriametrics/vmalert/): add `-secret.flags` command-line flag to configure flags to be hidden in logs and on `/metrics`. This is useful for protecting sensitive flag values (for example `-remoteWrite.headers`) from being exposed in logs or metrics. See [#6938](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/6938). Thank you @truepele for the issue and PR
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): change default value of the flag `-promscrape.dropOriginalLabels` from `false` to `true`. This helps reducing CPU and Memory usage by dropping targets original labels during [service discovery](https://docs.victoriametrics.com/victoriametrics/sd_configs/), but disables [relabel debugging UI](https://docs.victoriametrics.com/victoriametrics/relabeling/#relabel-debugging). This change gives constant improvement on resource usage, and relabel debugging can be turned on on demand. See this issue [#9665](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9665) for details.
* FEATURE: [vmagent](https://docs.victoriametrics.com/victoriametrics/vmagent/): improve performance for the [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/) with multiple configured rules. See this issue [#9878](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878) for details.
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): on the [Cardinality Explorer](https://docs.victoriametrics.com/victoriametrics/#cardinality-explorer) page, rename `Reset` to `Reset filters` and disable the button when no filters are modified. See [#9609](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9609).
* FEATURE: [vmui](https://docs.victoriametrics.com/victoriametrics/single-server-victoriametrics/#vmui): reflect query visibility state in URL on `Query` or `Raw Query` pages. Now, when sharing URL with a query, its visibility state will be preserved. See [#9826](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/9826) for more details.

34
lib/slicesutil/buffer.go Normal file
View File

@@ -0,0 +1,34 @@
package slicesutil
import "sync"
// Buffer implements a simple buffer for T.
type Buffer[T any] struct {
// B is the underlying T slice.
B []T
}
// Reset resets b.
func (b *Buffer[T]) Reset() {
b.B = b.B[:0]
}
// BufferPool is a pool of T Buffers.
type BufferPool[T any] struct {
p sync.Pool
}
// Get obtains a Buffer from bp.
func (bp *BufferPool[T]) Get() *Buffer[T] {
bbv := bp.p.Get()
if bbv == nil {
return &Buffer[T]{}
}
return bbv.(*Buffer[T])
}
// Put puts b into bp.
func (bp *BufferPool[T]) Put(b *Buffer[T]) {
b.Reset()
bp.p.Put(b)
}

View File

@@ -21,6 +21,7 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promrelabel"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/histogram"
@@ -359,8 +360,8 @@ func (a *Aggregators) Equal(b *Aggregators) bool {
// Push returns matchIdxs with len equal to len(tss).
// It reuses the matchIdxs if it has enough capacity to hold len(tss) items.
// Otherwise it allocates new matchIdxs.
func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []byte) []byte {
matchIdxs = bytesutil.ResizeNoCopyMayOverallocate(matchIdxs, len(tss))
func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []uint32) []uint32 {
matchIdxs = slicesutil.SetLength(matchIdxs, len(tss))
for i := range matchIdxs {
matchIdxs[i] = 0
}
@@ -368,10 +369,23 @@ func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []byte) []byte {
return matchIdxs
}
// use all available CPU cores to copy time-series into aggregators
// See this issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
var wg sync.WaitGroup
concurrencyChan := make(chan struct{}, cgroup.AvailableCPUs())
wg.Add(len(a.as))
for _, aggr := range a.as {
aggr.Push(tss, matchIdxs)
concurrencyChan <- struct{}{}
go func(aggr *aggregator) {
aggr.Push(tss, matchIdxs)
wg.Done()
<-concurrencyChan
}(aggr)
}
wg.Wait()
return matchIdxs
}
@@ -951,7 +965,7 @@ func (a *aggregator) MustStop() {
}
// Push pushes tss to a.
func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []byte) {
func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
ctx := getPushCtx()
defer putPushCtx(ctx)
@@ -975,7 +989,7 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []byte) {
if !a.match.Match(ts.Labels) {
continue
}
matchIdxs[idx] = 1
atomic.StoreUint32(&matchIdxs[idx], 1)
if len(dropLabels) > 0 {
labels.Labels = dropSeriesLabels(labels.Labels[:0], ts.Labels, dropLabels)

View File

@@ -17,7 +17,7 @@ func TestAggregatorsSuccess(t *testing.T) {
f := func(inputMetrics []string, interval time.Duration, outputMetricsExpected, config, matchIdxsStrExpected string) {
t.Helper()
synctest.Run(func() {
var matchIdxs []byte
var matchIdxs []uint32
var tssOutput []prompb.TimeSeries
var tssOutputLock sync.Mutex

View File

@@ -51,7 +51,7 @@ func benchmarkAggregatorsPush(b *testing.B, output string) {
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
var matchIdxs []byte
var matchIdxs []uint32
for pb.Next() {
for i := 0; i < loops; i++ {
matchIdxs = a.Push(benchSeries, matchIdxs)
@@ -65,13 +65,13 @@ func newBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
for i := range outputs {
outputsQuoted[i] = stringsutil.JSONString(outputs[i])
}
config := fmt.Sprintf(`
- match: http_requests_total
interval: 24h
by: [job]
outputs: [%s]
`, strings.Join(outputsQuoted, ","))
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
@@ -94,3 +94,46 @@ func newBenchSeries(seriesCount int) []prompb.TimeSeries {
const seriesCount = 10_000
var benchSeries = newBenchSeries(seriesCount)
func BenchmarkConcurrentAggregatorsPush(b *testing.B) {
pushFunc := func(_ []prompb.TimeSeries) {}
a := newPerOutputBenchAggregators(benchOutputs, pushFunc)
defer a.MustStop()
const loops = 100
b.ResetTimer()
b.ReportAllocs()
b.SetBytes(int64(len(benchSeries) * loops))
b.RunParallel(func(pb *testing.PB) {
var matchIdxs []uint32
for pb.Next() {
for i := 0; i < loops; i++ {
matchIdxs = a.Push(benchSeries, matchIdxs)
}
}
})
}
func newPerOutputBenchAggregators(outputs []string, pushFunc PushFunc) *Aggregators {
outputsQuoted := make([]string, len(outputs))
var config string
for i := range outputs {
outputsQuoted[i] = stringsutil.JSONString(outputs[i])
cfg := fmt.Sprintf(`
- match: http_requests_total
interval: 24h
by: [job]
outputs: [%s]
`, stringsutil.JSONString(outputs[i]))
config += cfg
}
a, err := LoadFromData([]byte(config), pushFunc, nil, "some_alias")
if err != nil {
panic(fmt.Errorf("unexpected error when initializing aggregators: %s", err))
}
return a
}