Compare commits

...

1 Commits

Author SHA1 Message Date
Andrii Chubatiuk
ae4b3a081e chore(lib/streamaggr): minor optimizations
* replace O(n) slices.Contains calls in getInputOutputLabels with O(1) map lookups by converting by/without slices to sets at aggregator init time.
* preallocate Aggregators.workCh once in loadFromData instead of creating a new channel per Push call.
* add a fast path for the single-aggregator case to skip creating goroutine entirely.
2026-05-22 22:33:03 +03:00

View File

@@ -4,7 +4,6 @@ import (
"encoding/json"
"fmt"
"math"
"slices"
"sort"
"strconv"
"strings"
@@ -257,6 +256,10 @@ type Config struct {
type Aggregators struct {
as []*aggregator
// workCh limits the number of concurrent aggregator.Push calls.
// Pre-allocated once to avoid per-Push channel allocation.
workCh chan struct{}
// configData contains marshaled configs.
// It is used in Equal() for comparing Aggregators.
configData []byte
@@ -307,6 +310,7 @@ func loadFromData(data []byte, filePath string, pushFunc PushFunc, opts *Options
metrics.RegisterSet(ms)
return &Aggregators{
as: as,
workCh: make(chan struct{}, cgroup.AvailableCPUs()),
configData: configData,
filePath: filePath,
ms: ms,
@@ -364,19 +368,21 @@ func (a *Aggregators) Push(tss []prompb.TimeSeries, matchIdxs []uint32) []uint32
return matchIdxs
}
// use all available CPU cores to copy time-series into aggregators
// See this issue https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
var wg sync.WaitGroup
concurrencyChan := make(chan struct{}, cgroup.AvailableCPUs())
for _, aggr := range a.as {
concurrencyChan <- struct{}{}
wg.Go(func() {
aggr.Push(tss, matchIdxs)
<-concurrencyChan
})
if len(a.as) == 1 {
a.as[0].Push(tss, matchIdxs)
return matchIdxs
}
// Use all available CPU cores to push time-series into aggregators in parallel.
// See https://github.com/VictoriaMetrics/VictoriaMetrics/issues/9878
var wg sync.WaitGroup
for _, aggr := range a.as {
a.workCh <- struct{}{}
wg.Go(func() {
aggr.Push(tss, matchIdxs)
<-a.workCh
})
}
wg.Wait()
return matchIdxs
@@ -413,8 +419,8 @@ type aggregator struct {
ignoreOldSamples bool
enableWindows bool
by []string
without []string
by map[string]struct{}
without map[string]struct{}
aggregateOnlyByTime bool
// interval is the interval between flushes
@@ -645,8 +651,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
enableWindows: enableWindows,
stalenessInterval: stalenessInterval,
by: by,
without: without,
by: makeStringSet(by),
without: makeStringSet(without),
aggregateOnlyByTime: aggregateOnlyByTime,
interval: interval,
@@ -1115,10 +1121,10 @@ func putPushCtx(ctx *pushCtx) {
var pushCtxPool sync.Pool
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, without []string) ([]prompb.Label, []prompb.Label) {
func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, without map[string]struct{}) ([]prompb.Label, []prompb.Label) {
if len(without) > 0 {
for _, label := range labels {
if slices.Contains(without, label.Name) {
if _, ok := without[label.Name]; ok {
dstInput = append(dstInput, label)
} else {
dstOutput = append(dstOutput, label)
@@ -1126,7 +1132,7 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, withou
}
} else {
for _, label := range labels {
if !slices.Contains(by, label.Name) {
if _, ok := by[label.Name]; !ok {
dstInput = append(dstInput, label)
} else {
dstOutput = append(dstOutput, label)
@@ -1136,6 +1142,17 @@ func getInputOutputLabels(dstInput, dstOutput, labels []prompb.Label, by, withou
return dstInput, dstOutput
}
func makeStringSet(keys []string) map[string]struct{} {
if len(keys) == 0 {
return nil
}
m := make(map[string]struct{}, len(keys))
for _, k := range keys {
m[k] = struct{}{}
}
return m
}
func getFlushCtx(a *aggregator, ao *aggrOutputs, pushFunc PushFunc, flushTimestamp int64, isLast bool) *flushCtx {
v := flushCtxPool.Get()
if v == nil {