From e7c46a0f4c8de76664c8c724b5e6e4e0881f5cb1 Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Mon, 11 May 2026 14:21:20 +0300 Subject: [PATCH] lib/streamaggr: use max samples lag for flush delay calculation (#10835) ### Describe Your Changes fixes #10402 use max sample lag for flush delay calculation when aggregation windows enabled. before 95th percentile of samples lag was used, which led to dropped data ### Checklist The following checks are **mandatory**: - [ ] My change adheres to [VictoriaMetrics contributing guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist). - [ ] My change adheres to [VictoriaMetrics development goals](https://docs.victoriametrics.com/victoriametrics/goals/). --------- Signed-off-by: hagen1778 Co-authored-by: hagen1778 --- docs/victoriametrics/changelog/CHANGELOG.md | 1 + lib/streamaggr/deduplicator.go | 27 ++++++++-------- lib/streamaggr/streamaggr.go | 34 ++++++++------------- 3 files changed, 26 insertions(+), 36 deletions(-) diff --git a/docs/victoriametrics/changelog/CHANGELOG.md b/docs/victoriametrics/changelog/CHANGELOG.md index 2d94ca3788..2139f5155f 100644 --- a/docs/victoriametrics/changelog/CHANGELOG.md +++ b/docs/victoriametrics/changelog/CHANGELOG.md @@ -27,6 +27,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel ## tip * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918). +* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): extend delay on aggregation windows flush by the biggest lag among pushed samples. Before, the delay was calculated as 95th percentile across samples, which could underrepresent outliers and reject them from aggregation as "too old". See [#10402](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10402). ## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0) diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index b94c1d86af..cbf31a41d8 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -12,7 +12,6 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb" "github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil" "github.com/VictoriaMetrics/metrics" - "github.com/valyala/histogram" ) // Deduplicator deduplicates samples per each time series. @@ -29,9 +28,8 @@ type Deduplicator struct { stopCh chan struct{} ms *metrics.Set - // time to wait after interval end before flush - flushAfter *histogram.Fast - muFlushAfter sync.Mutex + // flushAfterMsec is the max sample lag (in milliseconds) observed in the current flush interval. + flushAfterMsec atomic.Int64 } // NewDeduplicator returns new deduplicator, which deduplicates samples per each time series. @@ -59,7 +57,6 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati } d.cs.Store(cs) if enableWindows { - d.flushAfter = histogram.GetFast() d.minDeadline.Store(startTime.UnixMilli()) } d.cs.Store(cs) @@ -145,9 +142,15 @@ func (d *Deduplicator) Push(tss []prompb.TimeSeries) { } if d.enableWindows && maxLagMsec > 0 { - d.muFlushAfter.Lock() - d.flushAfter.Update(float64(maxLagMsec)) - d.muFlushAfter.Unlock() + for { + old := d.flushAfterMsec.Load() + if maxLagMsec <= old { + break + } + if d.flushAfterMsec.CompareAndSwap(old, maxLagMsec) { + break + } + } } if len(ctx.blue) > 0 { @@ -172,7 +175,6 @@ func dropSeriesLabels(dst, src []prompb.Label, labelNames []string) []prompb.Lab func (d *Deduplicator) runFlusher(pushFunc PushFunc) { t := time.NewTicker(d.interval) - var fa *histogram.Fast defer t.Stop() for { select { @@ -180,12 +182,7 @@ func (d *Deduplicator) runFlusher(pushFunc PushFunc) { return case <-t.C: if d.enableWindows { - // Calculate delay and wait - d.muFlushAfter.Lock() - fa, d.flushAfter = d.flushAfter, histogram.GetFast() - d.muFlushAfter.Unlock() - delay := time.Duration(fa.Quantile(flushQuantile)) * time.Millisecond - histogram.PutFast(fa) + delay := time.Duration(d.flushAfterMsec.Swap(0)) * time.Millisecond time.Sleep(delay) } d.flush(pushFunc) diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 2e76ddd08b..129d13483a 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -24,14 +24,9 @@ import ( "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil" "github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool" "github.com/VictoriaMetrics/metrics" - "github.com/valyala/histogram" "gopkg.in/yaml.v2" ) -// defines ingested samples lag quantile to determine a time to wait before flush. -// It's not configurable at the moment. -const flushQuantile = 0.95 - var supportedOutputs = []string{ "avg", "count_samples", @@ -440,9 +435,9 @@ type aggregator struct { // aggrOutputs contains aggregate states for the given outputs aggrOutputs *aggrOutputs - // time to wait after interval end before flush - flushAfter *histogram.Fast - muFlushAfter sync.Mutex + // flushAfterMsec is the max sample lag (in milliseconds) observed in the current flush interval. + // It is used to properly delay the flush time while using aggregation windows. + flushAfterMsec atomic.Int64 // suffix contains a suffix, which should be added to aggregate metric names // @@ -705,9 +700,6 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set, minTime = minTime.Add(interval) } } - if enableWindows { - a.flushAfter = histogram.GetFast() - } a.minDeadline.Store(minTime.UnixMilli()) cs := ¤tState{} if a.dedupInterval > 0 { @@ -834,16 +826,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu t := time.NewTicker(interval) defer t.Stop() - var fa *histogram.Fast for tickerWait(t) { pf := pushFunc if a.enableWindows { - // Calculate delay and wait - a.muFlushAfter.Lock() - fa, a.flushAfter = a.flushAfter, histogram.GetFast() - a.muFlushAfter.Unlock() - delay := time.Duration(fa.Quantile(flushQuantile)) * time.Millisecond - histogram.PutFast(fa) + delay := time.Duration(a.flushAfterMsec.Swap(0)) * time.Millisecond time.Sleep(delay) } @@ -1044,9 +1030,15 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) { } } if enableWindows && maxLagMsec > 0 { - a.muFlushAfter.Lock() - a.flushAfter.Update(float64(maxLagMsec)) - a.muFlushAfter.Unlock() + for { + old := a.flushAfterMsec.Load() + if maxLagMsec <= old { + break + } + if a.flushAfterMsec.CompareAndSwap(old, maxLagMsec) { + break + } + } } a.samplesLag.Update(float64(maxLagMsec) / 1_000)