mirror of
https://github.com/VictoriaMetrics/VictoriaMetrics.git
synced 2026-05-17 00:26:36 +03:00
lib/streamaggr: use max samples lag for flush delay calculation (#10835)
### Describe Your Changes
fixes #10402
use max sample lag for flush delay calculation when aggregation windows
enabled. before 95th percentile of samples lag was used, which led to
dropped data
### Checklist
The following checks are **mandatory**:
- [ ] My change adheres to [VictoriaMetrics contributing
guidelines](https://docs.victoriametrics.com/victoriametrics/contributing/#pull-request-checklist).
- [ ] My change adheres to [VictoriaMetrics development
goals](https://docs.victoriametrics.com/victoriametrics/goals/).
---------
Signed-off-by: hagen1778 <roman@victoriametrics.com>
Co-authored-by: hagen1778 <roman@victoriametrics.com>
(cherry picked from commit e7c46a0f4c)
This commit is contained in:
committed by
hagen1778
parent
29a5c914c8
commit
df1f58b017
@@ -27,6 +27,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
|
||||
## tip
|
||||
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918).
|
||||
* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): extend delay on aggregation windows flush by the biggest lag among pushed samples. Before, the delay was calculated as 95th percentile across samples, which could underrepresent outliers and reject them from aggregation as "too old". See [#10402](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10402).
|
||||
|
||||
## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0)
|
||||
|
||||
|
||||
@@ -12,7 +12,6 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/prompb"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/promutil"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/valyala/histogram"
|
||||
)
|
||||
|
||||
// Deduplicator deduplicates samples per each time series.
|
||||
@@ -29,9 +28,8 @@ type Deduplicator struct {
|
||||
stopCh chan struct{}
|
||||
|
||||
ms *metrics.Set
|
||||
// time to wait after interval end before flush
|
||||
flushAfter *histogram.Fast
|
||||
muFlushAfter sync.Mutex
|
||||
// flushAfterMsec is the max sample lag (in milliseconds) observed in the current flush interval.
|
||||
flushAfterMsec atomic.Int64
|
||||
}
|
||||
|
||||
// NewDeduplicator returns new deduplicator, which deduplicates samples per each time series.
|
||||
@@ -59,7 +57,6 @@ func NewDeduplicator(pushFunc PushFunc, enableWindows bool, interval time.Durati
|
||||
}
|
||||
d.cs.Store(cs)
|
||||
if enableWindows {
|
||||
d.flushAfter = histogram.GetFast()
|
||||
d.minDeadline.Store(startTime.UnixMilli())
|
||||
}
|
||||
d.cs.Store(cs)
|
||||
@@ -145,9 +142,15 @@ func (d *Deduplicator) Push(tss []prompb.TimeSeries) {
|
||||
}
|
||||
|
||||
if d.enableWindows && maxLagMsec > 0 {
|
||||
d.muFlushAfter.Lock()
|
||||
d.flushAfter.Update(float64(maxLagMsec))
|
||||
d.muFlushAfter.Unlock()
|
||||
for {
|
||||
old := d.flushAfterMsec.Load()
|
||||
if maxLagMsec <= old {
|
||||
break
|
||||
}
|
||||
if d.flushAfterMsec.CompareAndSwap(old, maxLagMsec) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if len(ctx.blue) > 0 {
|
||||
@@ -172,7 +175,6 @@ func dropSeriesLabels(dst, src []prompb.Label, labelNames []string) []prompb.Lab
|
||||
|
||||
func (d *Deduplicator) runFlusher(pushFunc PushFunc) {
|
||||
t := time.NewTicker(d.interval)
|
||||
var fa *histogram.Fast
|
||||
defer t.Stop()
|
||||
for {
|
||||
select {
|
||||
@@ -180,12 +182,7 @@ func (d *Deduplicator) runFlusher(pushFunc PushFunc) {
|
||||
return
|
||||
case <-t.C:
|
||||
if d.enableWindows {
|
||||
// Calculate delay and wait
|
||||
d.muFlushAfter.Lock()
|
||||
fa, d.flushAfter = d.flushAfter, histogram.GetFast()
|
||||
d.muFlushAfter.Unlock()
|
||||
delay := time.Duration(fa.Quantile(flushQuantile)) * time.Millisecond
|
||||
histogram.PutFast(fa)
|
||||
delay := time.Duration(d.flushAfterMsec.Swap(0)) * time.Millisecond
|
||||
time.Sleep(delay)
|
||||
}
|
||||
d.flush(pushFunc)
|
||||
|
||||
@@ -24,14 +24,9 @@ import (
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
|
||||
"github.com/VictoriaMetrics/VictoriaMetrics/lib/timerpool"
|
||||
"github.com/VictoriaMetrics/metrics"
|
||||
"github.com/valyala/histogram"
|
||||
"gopkg.in/yaml.v2"
|
||||
)
|
||||
|
||||
// defines ingested samples lag quantile to determine a time to wait before flush.
|
||||
// It's not configurable at the moment.
|
||||
const flushQuantile = 0.95
|
||||
|
||||
var supportedOutputs = []string{
|
||||
"avg",
|
||||
"count_samples",
|
||||
@@ -440,9 +435,9 @@ type aggregator struct {
|
||||
// aggrOutputs contains aggregate states for the given outputs
|
||||
aggrOutputs *aggrOutputs
|
||||
|
||||
// time to wait after interval end before flush
|
||||
flushAfter *histogram.Fast
|
||||
muFlushAfter sync.Mutex
|
||||
// flushAfterMsec is the max sample lag (in milliseconds) observed in the current flush interval.
|
||||
// It is used to properly delay the flush time while using aggregation windows.
|
||||
flushAfterMsec atomic.Int64
|
||||
|
||||
// suffix contains a suffix, which should be added to aggregate metric names
|
||||
//
|
||||
@@ -705,9 +700,6 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
|
||||
minTime = minTime.Add(interval)
|
||||
}
|
||||
}
|
||||
if enableWindows {
|
||||
a.flushAfter = histogram.GetFast()
|
||||
}
|
||||
a.minDeadline.Store(minTime.UnixMilli())
|
||||
cs := ¤tState{}
|
||||
if a.dedupInterval > 0 {
|
||||
@@ -834,16 +826,10 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu
|
||||
t := time.NewTicker(interval)
|
||||
defer t.Stop()
|
||||
|
||||
var fa *histogram.Fast
|
||||
for tickerWait(t) {
|
||||
pf := pushFunc
|
||||
if a.enableWindows {
|
||||
// Calculate delay and wait
|
||||
a.muFlushAfter.Lock()
|
||||
fa, a.flushAfter = a.flushAfter, histogram.GetFast()
|
||||
a.muFlushAfter.Unlock()
|
||||
delay := time.Duration(fa.Quantile(flushQuantile)) * time.Millisecond
|
||||
histogram.PutFast(fa)
|
||||
delay := time.Duration(a.flushAfterMsec.Swap(0)) * time.Millisecond
|
||||
time.Sleep(delay)
|
||||
}
|
||||
|
||||
@@ -1044,9 +1030,15 @@ func (a *aggregator) Push(tss []prompb.TimeSeries, matchIdxs []uint32) {
|
||||
}
|
||||
}
|
||||
if enableWindows && maxLagMsec > 0 {
|
||||
a.muFlushAfter.Lock()
|
||||
a.flushAfter.Update(float64(maxLagMsec))
|
||||
a.muFlushAfter.Unlock()
|
||||
for {
|
||||
old := a.flushAfterMsec.Load()
|
||||
if maxLagMsec <= old {
|
||||
break
|
||||
}
|
||||
if a.flushAfterMsec.CompareAndSwap(old, maxLagMsec) {
|
||||
break
|
||||
}
|
||||
}
|
||||
}
|
||||
a.samplesLag.Update(float64(maxLagMsec) / 1_000)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user