mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 08:36:55 +03:00
### Describe Your Changes
Fix stale `quantiles(...)` stream aggregation output for series without
samples in the current aggregation interval.
Previously, `quantilesAggrConfig` reused the `quantiles` buffer across
aggregation values. If `quantilesAggrValue.flush` was called for a
series without samples after another series had already calculated
quantiles, the stale quantile
values could be emitted for the empty series.
This could produce unrealistic `*_quantiles` output values and make the
same aggregated value appear across unrelated labelsets.
The PR skips `quantiles(...)` output when there is no histogram for the
current interval and adds a regression test for this case.
### Checklist
The following checks are **mandatory**:
- [x] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [x] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).
---------
Co-authored-by: hagen1778 <roman@victoriametrics.com>
(cherry picked from commit 20d4314168)
58 lines
1.4 KiB
Go
58 lines
1.4 KiB
Go
package streamaggr
|
|
|
|
import (
|
|
"strconv"
|
|
|
|
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
|
|
"github.com/valyala/histogram"
|
|
)
|
|
|
|
// quantilesAggrValue calculates output=quantiles, e.g. the given quantiles over the input samples.
|
|
type quantilesAggrValue struct {
|
|
h *histogram.Fast
|
|
}
|
|
|
|
func (av *quantilesAggrValue) pushSample(_ aggrConfig, sample *pushSample, _ string, _ int64) {
|
|
if av.h == nil {
|
|
av.h = histogram.GetFast()
|
|
}
|
|
av.h.Update(sample.value)
|
|
}
|
|
|
|
func (av *quantilesAggrValue) flush(c aggrConfig, ctx *flushCtx, key string, _ bool) {
|
|
if av.h == nil {
|
|
return
|
|
}
|
|
ac := c.(*quantilesAggrConfig)
|
|
ac.quantiles = av.h.Quantiles(ac.quantiles[:0], ac.phis)
|
|
histogram.PutFast(av.h)
|
|
// reset h to avoid producing stale results on the next flush if av didn't get new pushSample() calls
|
|
av.h = nil
|
|
|
|
for i, quantile := range ac.quantiles {
|
|
ac.b = strconv.AppendFloat(ac.b[:0], ac.phis[i], 'g', -1, 64)
|
|
phiStr := bytesutil.InternBytes(ac.b)
|
|
ctx.appendSeriesWithExtraLabel(key, "quantiles", quantile, "quantile", phiStr)
|
|
}
|
|
}
|
|
|
|
func (*quantilesAggrValue) state() any {
|
|
return nil
|
|
}
|
|
|
|
func newQuantilesAggrConfig(phis []float64) aggrConfig {
|
|
return &quantilesAggrConfig{
|
|
phis: phis,
|
|
}
|
|
}
|
|
|
|
type quantilesAggrConfig struct {
|
|
phis []float64
|
|
quantiles []float64
|
|
b []byte
|
|
}
|
|
|
|
func (*quantilesAggrConfig) getValue(_ any) aggrValue {
|
|
return &quantilesAggrValue{}
|
|
}
|