From 4470cd8d67406308228fa40266050cdd545d2173 Mon Sep 17 00:00:00 2001 From: Andrii Chubatiuk Date: Tue, 14 Apr 2026 17:22:47 +0300 Subject: [PATCH] lib/streamaggr: fix issue, when multiple flushes may occur with the same timestamp --- docs/victoriametrics/changelog/CHANGELOG.md | 1 + lib/streamaggr/deduplicator.go | 1 + lib/streamaggr/streamaggr.go | 1 + lib/streamaggr/streamaggr_synctest_test.go | 28 ++++++++++++--------- 4 files changed, 19 insertions(+), 12 deletions(-) diff --git a/docs/victoriametrics/changelog/CHANGELOG.md b/docs/victoriametrics/changelog/CHANGELOG.md index 2139f5155f..250986f1e2 100644 --- a/docs/victoriametrics/changelog/CHANGELOG.md +++ b/docs/victoriametrics/changelog/CHANGELOG.md @@ -28,6 +28,7 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): stop emitting stale values for `quantiles(...)` outputs when a time series has no samples during the current aggregation interval. Thanks to @alexei38 for the [pull request](https://github.com/VictoriaMetrics/VictoriaMetrics/pull/10918). * BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): extend delay on aggregation windows flush by the biggest lag among pushed samples. Before, the delay was calculated as 95th percentile across samples, which could underrepresent outliers and reject them from aggregation as "too old". See [#10402](https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10402). +* BUGFIX: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): address potential problem of multiple flushes occurring with an identical timestamp. ## [v1.143.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.143.0) diff --git a/lib/streamaggr/deduplicator.go b/lib/streamaggr/deduplicator.go index cbf31a41d8..206516fb5e 100644 --- a/lib/streamaggr/deduplicator.go +++ b/lib/streamaggr/deduplicator.go @@ -231,6 +231,7 @@ func (d *Deduplicator) flush(pushFunc PushFunc) { logger.Warnf("deduplication couldn't be finished in the configured dedupInterval=%s; it took %.03fs; "+ "possible solutions: increase dedupInterval; reduce samples' ingestion rate", d.interval, duration.Seconds()) } + deadlineTime = deadlineTime.Add(d.interval) for time.Now().After(deadlineTime) { deadlineTime = deadlineTime.Add(d.interval) } diff --git a/lib/streamaggr/streamaggr.go b/lib/streamaggr/streamaggr.go index 129d13483a..25c29374af 100644 --- a/lib/streamaggr/streamaggr.go +++ b/lib/streamaggr/streamaggr.go @@ -845,6 +845,7 @@ func (a *aggregator) runFlusher(pushFunc PushFunc, alignFlushToInterval, skipFlu } else { a.flush(pf, flushTime, cs, false) } + flushTime = flushTime.Add(a.interval) for time.Now().After(flushTime) { flushTime = flushTime.Add(a.interval) } diff --git a/lib/streamaggr/streamaggr_synctest_test.go b/lib/streamaggr/streamaggr_synctest_test.go index 509529578d..0a62bf3b37 100644 --- a/lib/streamaggr/streamaggr_synctest_test.go +++ b/lib/streamaggr/streamaggr_synctest_test.go @@ -1,5 +1,3 @@ -//go:build synctest - package streamaggr import ( @@ -485,10 +483,8 @@ foo 3.3 `, ``, ``, ``, ``}, time.Minute, `foo:1m_count_series 1 foo:1m_count_series{bar="baz"} 1 foo:1m_sum_samples 0 -foo:1m_sum_samples 0 foo:1m_sum_samples 4.3 foo:1m_sum_samples{bar="baz"} 0 -foo:1m_sum_samples{bar="baz"} 0 foo:1m_sum_samples{bar="baz"} 2 foo:5m_by_bar_sum_samples 4.3 foo:5m_by_bar_sum_samples{bar="baz"} 2 @@ -694,21 +690,29 @@ foo:1m_by_cde_rate_sum{cde="1"} 0.125 // test rate_sum and rate_avg, when two aggregation intervals are empty f([]string{` -foo{abc="123", cde="1"} 2 -foo{abc="456", cde="1"} 8 -foo{abc="777", cde="1"} 9 -10 +foo{abc="123", cde="1"} 1 +foo{abc="123", cde="1"} 2 1 +foo{abc="456", cde="1"} 7 +foo{abc="456", cde="1"} 8 1 +foo{abc="777", cde="1"} 8 +foo{abc="777", cde="1"} 9 1 `, ``, ``, ` -foo{abc="123", cde="1"} 20 +foo{abc="123", cde="1"} 19 +foo{abc="123", cde="1"} 20 1 foo{abc="456", cde="1"} 26 -foo{abc="777", cde="1"} 27 -10 -`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 0.1 -foo:1m_by_cde_rate_sum{cde="1"} 0.2 +foo{abc="456", cde="1"} 27 1 +foo{abc="777", cde="1"} 27 +foo{abc="777", cde="1"} 28 1 +`}, time.Minute, `foo:1m_by_cde_rate_avg{cde="1"} 1 +foo:1m_by_cde_rate_avg{cde="1"} 1 +foo:1m_by_cde_rate_sum{cde="1"} 3 +foo:1m_by_cde_rate_sum{cde="1"} 3 `, ` - interval: 1m by: [cde] outputs: [rate_sum, rate_avg] enable_windows: true -`, "111111") +`, "111111111111") // rate_sum and rate_avg with duplicated events f([]string{`