lib/streamaggr: use max samples lag for flush delay calculation (#10835)

### Describe Your Changes

fixes #10402

use max sample lag for flush delay calculation when aggregation windows
enabled. before 95th percentile of samples lag was used, which led to
dropped data

### Checklist

The following checks are **mandatory**:

- [ ] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [ ] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).

---------

Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
This commit is contained in:
Andrii Chubatiuk
2026-05-11 14:21:20 +03:00
committed by GitHub
parent 20d4314168
commit e7c46a0f4c
3 changed files with 26 additions and 36 deletions

View File

@@ -27,6 +27,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918).
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): extend delay on aggregation windows flush by the biggest lag among pushed samples. Before, the delay was calculated as 95th percentile across samples, which could underrepresent outliers and reject them from aggregation as "too old". See [#10402](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10402).
## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0)

View File

@@ -12,7 +12,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/histogram"
)
// Deduplicator deduplicates samples per each time series.
@@ -29,9 +28,8 @@ type Deduplicator struct {
stopCh chan struct{}
ms *metrics.Set
// time to wait after interval end before flush
flushAfter *histogram.Fast
muFlushAfter sync.Mutex
// flushAfterMsec is the max sample lag (in milliseconds) observed in the current flush interval.
flushAfterMsec atomic.Int64
}
// NewDeduplicator returns new deduplicator, which deduplicates samples per each time series.
@@ -59,7 +57,6 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
}
d.cs.Store(cs)
if enableWindows {
d.flushAfter = histogram.GetFast()
d.minDeadline.Store(startTime.UnixMilli())
}
d.cs.Store(cs)
@@ -145,9 +142,15 @@ func (d *Deduplicator) Push(tss []prompb.TimeSeries) {
}
if d.enableWindows && maxLagMsec > 0 {
d.muFlushAfter.Lock()
d.flushAfter.Update(float64(maxLagMsec))
d.muFlushAfter.Unlock()
for {
old := d.flushAfterMsec.Load()
if maxLagMsec <= old {
break
}
if d.flushAfterMsec.CompareAndSwap(old, maxLagMsec) {
break
}
}
}
if len(ctx.blue) > 0 {
@@ -172,7 +175,6 @@ func dropSeriesLabels(dst, src []prompb.Label, labelNames []string) []prompb.Lab
func (d *Deduplicator) runFlusher(pushFunc PushFunc) {
t := time.NewTicker(d.interval)
var fa *histogram.Fast
defer t.Stop()
for {
select {
@@ -180,12 +182,7 @@ func (d *Deduplicator) runFlusher(pushFunc PushFunc) {
return
case <-t.C:
if d.enableWindows {
// Calculate delay and wait
d.muFlushAfter.Lock()
fa, d.flushAfter = d.flushAfter, histogram.GetFast()
d.muFlushAfter.Unlock()
delay := time.Duration(fa.Quantile(flushQuantile)) * time.Millisecond
histogram.PutFast(fa)
delay := time.Duration(d.flushAfterMsec.Swap(0)) * time.Millisecond
time.Sleep(delay)
}
d.flush(pushFunc)

View File

@@ -24,14 +24,9 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
"github.com/VictoriaMetrics/metrics"
"github.com/valyala/histogram"
"gopkg.in/yaml.v2"
)
// defines ingested samples lag quantile to determine a time to wait before flush.
// It's not configurable at the moment.
const flushQuantile = 0.95
var supportedOutputs = []string{
"avg",
"count_samples",
@@ -440,9 +435,9 @@ type aggregator struct {
// aggrOutputs contains aggregate states for the given outputs
aggrOutputs *aggrOutputs
// time to wait after interval end before flush
flushAfter *histogram.Fast
muFlushAfter sync.Mutex
// flushAfterMsec is the max sample lag (in milliseconds) observed in the current flush interval.
// It is used to properly delay the flush time while using aggregation windows.
flushAfterMsec atomic.Int64
// suffix contains a suffix, which should be added to aggregate metric names
//
@@ -705,9 +700,6 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
minTime = minTime.Add(interval)
}
}
if enableWindows {
a.flushAfter = histogram.GetFast()
}
a.minDeadline.Store(minTime.UnixMilli())
cs := &currentState{}
if a.dedupInterval > 0 {
@@ -834,16 +826,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu
t := time.NewTicker(interval)
defer t.Stop()
var fa *histogram.Fast
for tickerWait(t) {
pf := pushFunc
if a.enableWindows {
// Calculate delay and wait
a.muFlushAfter.Lock()
fa, a.flushAfter = a.flushAfter, histogram.GetFast()
a.muFlushAfter.Unlock()
delay := time.Duration(fa.Quantile(flushQuantile)) * time.Millisecond
histogram.PutFast(fa)
delay := time.Duration(a.flushAfterMsec.Swap(0)) * time.Millisecond
time.Sleep(delay)
}
@@ -1044,9 +1030,15 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
}
}
if enableWindows && maxLagMsec > 0 {
a.muFlushAfter.Lock()
a.flushAfter.Update(float64(maxLagMsec))
a.muFlushAfter.Unlock()
for {
old := a.flushAfterMsec.Load()
if maxLagMsec <= old {
break
}
if a.flushAfterMsec.CompareAndSwap(old, maxLagMsec) {
break
}
}
}
a.samplesLag.Update(float64(maxLagMsec) / 1_000)