diff --git a/docs/victoriametrics/changelog/CHANGELOG.md b/docs/victoriametrics/changelog/CHANGELOG.md index 1d27d76433..2d94ca3788 100644 --- a/docs/victoriametrics/changelog/CHANGELOG.md +++ b/docs/victoriametrics/changelog/CHANGELOG.md @@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel ## tip +* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918). + ## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0) Released at 2026-05-08 diff --git a/lib/streamaggr/quantiles.go b/lib/streamaggr/quantiles.go index c351b63fcb..98db910cbe 100644 --- a/lib/streamaggr/quantiles.go +++ b/lib/streamaggr/quantiles.go @@ -1,9 +1,10 @@ package streamaggr import ( + "strconv" + "github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil" "github.com/valyala/histogram" - "strconv" ) // quantilesAggrValue calculates output=quantiles, e.g. the given quantiles over the input samples. @@ -19,18 +20,19 @@ func (av *quantilesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ str } func (av *quantilesAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) { - ac := c.(*quantilesAggrConfig) - if av.h != nil { - ac.quantiles = av.h.Quantiles(ac.quantiles[:0], ac.phis) - histogram.PutFast(av.h) - av.h = nil + if av.h == nil { + return } - if len(ac.quantiles) > 0 { - for i, quantile := range ac.quantiles { - ac.b = strconv.AppendFloat(ac.b[:0], ac.phis[i], 'g', -1, 64) - phiStr := bytesutil.InternBytes(ac.b) - ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr) - } + ac := c.(*quantilesAggrConfig) + ac.quantiles = av.h.Quantiles(ac.quantiles[:0], ac.phis) + histogram.PutFast(av.h) + // reset h to avoid producing stale results on the next flush if av didn't get new pushSample() calls + av.h = nil + + for i, quantile := range ac.quantiles { + ac.b = strconv.AppendFloat(ac.b[:0], ac.phis[i], 'g', -1, 64) + phiStr := bytesutil.InternBytes(ac.b) + ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr) } } diff --git a/lib/streamaggr/streamaggr_synctest_test.go b/lib/streamaggr/streamaggr_synctest_test.go index 1cb016e3d6..509529578d 100644 --- a/lib/streamaggr/streamaggr_synctest_test.go +++ b/lib/streamaggr/streamaggr_synctest_test.go @@ -634,6 +634,19 @@ cpu_usage:1m_without_cpu_quantiles{quantile="1"} 90 outputs: ["quantiles(0, 0.5, 1)"] `, "1111111") + // no stale quantiles should be produced + f([]string{` +cpu_usage{cpu="1"} 3 +cpu_usage{cpu="2"} 3`, + `cpu_usage{cpu="2"} 4`, + }, time.Minute, `cpu_usage:1m_quantiles{cpu="1",quantile="1"} 3 +cpu_usage:1m_quantiles{cpu="2",quantile="1"} 3 +cpu_usage:1m_quantiles{cpu="2",quantile="1"} 4 +`, ` +- interval: 1m + outputs: ["quantiles(1)"] +`, "111") + // append additional label f([]string{` foo{abc="123"} 4