Compare commits

...

1 Commits

Author SHA1 Message Date
Andrii Chubatiuk
b3eefe6483 lib/streamaggr: added vm_streamaggr_counter_resets_total counter 2026-04-14 17:11:45 +03:00
6 changed files with 269 additions and 14 deletions

View File

@@ -26,6 +26,8 @@ See also [LTS releases](https://docs.victoriametrics.com/victoriametrics/lts-rel
## tip
* FEATURE: [stream aggregation](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/): add `vm_streamaggr_counter_resets_total` metric for `total*`, `increase*` and `rate*` outputs that is useful for aggregation behaviour tracking.
## [v1.140.0](https://github.com/VictoriaMetrics/VictoriaMetrics/releases/tag/v1.140.0)
Released at 2026-04-10

View File

@@ -571,6 +571,7 @@ Below is an example of an `aggr.yaml` configuration that drops the `replica` and
# Troubleshooting
- [Unexpected spikes for `total` or `increase` outputs](#staleness).
- [Excessively large values for `total*`, `increase*`, and `rate*` outputs](#counter-resets).
- [Lower than expected values for `total_prometheus` and `increase_prometheus` outputs](#staleness).
- [High memory usage and CPU usage](#high-resource-usage).
- [Unexpected results in vmagent cluster mode](#cluster-mode).
@@ -601,6 +602,10 @@ the following settings:
- `enable_windows` option in [aggregation config](https://docs.victoriametrics.com/victoriametrics/stream-aggregation/#stream-aggregation-config).
It allows enabling aggregation windows for a specific aggregator.
## Counter resets
If counter-specific outputs, such as `total*`, `rate*`, and `increase*`, exhibit values that are significantly higher than anticipated, it is advisable to examine the `vm_streamaggr_counter_resets` metric. The observation of frequent counter resets may indicate potential issues with the raw data, including possible series collisions.
## Staleness
The following outputs track the last seen per-series values in order to properly calculate output values:

232
lib/streamaggr/dedup Normal file
View File

@@ -0,0 +1,232 @@
diff --git a/lib/streamaggr/dedup.go b/lib/streamaggr/dedup.go
index ee87c7786f..e1038b2706 100644
--- a/lib/streamaggr/dedup.go
+++ b/lib/streamaggr/dedup.go
@@ -1,6 +1,7 @@
package streamaggr
import (
+ "math/bits"
"sync"
"sync/atomic"
"unsafe"
@@ -11,7 +12,6 @@ import (
"github.com/VictoriaMetrics/VictoriaMetrics/lib/atomicutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/decimal"
- "github.com/VictoriaMetrics/VictoriaMetrics/lib/slicesutil"
)
const dedupAggrShardsCount = 128
@@ -30,9 +30,11 @@ type dedupAggrShard struct {
}
type dedupAggrState struct {
- m map[string]*dedupAggrSample
mu sync.Mutex
samplesBuf []dedupAggrSample
+ mask uint64
+ keys []string
+ count int
sizeBytes atomic.Uint64
itemsCount atomic.Uint64
}
@@ -43,6 +45,9 @@ type dedupAggrShardNopad struct {
}
type dedupAggrSample struct {
+ hash uint64
+ keyIdx uint32
+ _ uint32 // padding so value stays 8-byte aligned
value float64
timestamp int64
}
@@ -55,9 +60,8 @@ func newDedupAggr() *dedupAggr {
func (da *dedupAggr) sizeBytes() uint64 {
n := uint64(unsafe.Sizeof(*da))
- var shard *dedupAggrShard
for i := range da.shards {
- shard = &da.shards[i]
+ shard := &da.shards[i]
n += shard.blue.sizeBytes.Load()
n += shard.green.sizeBytes.Load()
}
@@ -66,9 +70,8 @@ func (da *dedupAggr) sizeBytes() uint64 {
func (da *dedupAggr) itemsCount() uint64 {
n := uint64(0)
- var shard *dedupAggrShard
for i := range da.shards {
- shard = &da.shards[i]
+ shard := &da.shards[i]
n += shard.blue.itemsCount.Load()
n += shard.green.itemsCount.Load()
}
@@ -169,7 +172,6 @@ var perShardSamplesPool sync.Pool
func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
var state *dedupAggrState
-
if isGreen {
state = &das.green
} else {
@@ -177,29 +179,81 @@ func (das *dedupAggrShard) pushSamples(samples []pushSample, isGreen bool) {
}
state.mu.Lock()
- defer state.mu.Unlock()
- if state.m == nil {
- state.m = make(map[string]*dedupAggrSample, len(samples))
+ if state.samplesBuf == nil {
+ n := nextPow2(max(16, len(samples)*2))
+ state.samplesBuf = make([]dedupAggrSample, n)
+ state.mask = uint64(n - 1)
}
- samplesBuf := state.samplesBuf
- for _, sample := range samples {
- s, ok := state.m[sample.key]
- if !ok {
- samplesBuf = slicesutil.SetLength(samplesBuf, len(samplesBuf)+1)
- s = &samplesBuf[len(samplesBuf)-1]
- s.value = sample.value
- s.timestamp = sample.timestamp
-
- key := bytesutil.InternString(sample.key)
- state.m[key] = s
-
- state.itemsCount.Add(1)
- state.sizeBytes.Add(uint64(len(key)) + uint64(unsafe.Sizeof(key)+unsafe.Sizeof(s)+unsafe.Sizeof(*s)))
+ for i := range samples {
+ state.push(&samples[i])
+ }
+ sz := uint64(state.count) * uint64(unsafe.Sizeof(dedupAggrSample{}))
+ cnt := uint64(state.count)
+ state.sizeBytes.Store(sz)
+ state.itemsCount.Store(cnt)
+ state.mu.Unlock()
+}
+
+// push inserts or deduplicates a single sample into the hash table.
+// Must be called with state.mu held.
+func (s *dedupAggrState) push(sample *pushSample) {
+ if s.count*4 >= len(s.samplesBuf)*3 {
+ s.grow()
+ }
+
+ key := sample.key
+ h := xxhash.Sum64(bytesutil.ToUnsafeBytes(key))
+ idx := h & s.mask
+
+ for {
+ e := &s.samplesBuf[idx]
+ if e.keyIdx == 0 {
+ s.keys = append(s.keys, key)
+ e.hash = h
+ e.keyIdx = uint32(len(s.keys)) // 1-based
+ e.value = sample.value
+ e.timestamp = sample.timestamp
+ s.count++
+ return
+ }
+ // Hash check first to skip the string comparison in the common case.
+ if e.hash == h && s.keys[e.keyIdx-1] == key {
+ e.timestamp, e.value = deduplicateSamples(e.timestamp, sample.timestamp, e.value, sample.value)
+ return
+ }
+ idx = (idx + 1) & s.mask
+ }
+}
+
+// grow doubles the hash table capacity and rehashes all existing entries.
+// Must be called with state.mu held.
+func (s *dedupAggrState) grow() {
+ newSize := len(s.samplesBuf) * 2
+ if newSize == 0 {
+ newSize = 16
+ }
+ newSamplesBuf := make([]dedupAggrSample, newSize)
+ newMask := uint64(newSize - 1)
+ for _, e := range s.samplesBuf {
+ if e.keyIdx == 0 {
continue
}
- s.timestamp, s.value = deduplicateSamples(s.timestamp, sample.timestamp, s.value, sample.value)
+ idx := e.hash & newMask
+ for newSamplesBuf[idx].keyIdx != 0 {
+ idx = (idx + 1) & newMask
+ }
+ newSamplesBuf[idx] = e
}
- state.samplesBuf = samplesBuf
+ s.samplesBuf = newSamplesBuf
+ s.mask = newMask
+}
+
+// nextPow2 returns the smallest power of two >= n.
+func nextPow2(n int) int {
+ if n <= 1 {
+ return 1
+ }
+ return 1 << bits.Len(uint(n-1))
}
// deduplicateSamples returns deduplicated timestamp and value results.
@@ -208,8 +262,6 @@ func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64) {
if newT > oldT {
return newT, newV
}
- // if both samples have the same timestamp, choose the maximum value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/3333;
- // always prefer a non-decimal.StaleNaN value, see https://github.com/VictoriaMetrics/VictoriaMetrics/issues/10196
if newT == oldT {
if decimal.IsStaleNaN(oldV) {
return newT, newV
@@ -222,7 +274,6 @@ func deduplicateSamples(oldT, newT int64, oldV, newV float64) (int64, float64) {
}
func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
- var m map[string]*dedupAggrSample
var state *dedupAggrState
if ctx.isGreen {
state = &das.green
@@ -231,23 +282,28 @@ func (das *dedupAggrShard) flush(ctx *dedupFlushCtx, f aggrPushFunc) {
}
state.mu.Lock()
- if len(state.m) > 0 {
- m = state.m
- state.m = make(map[string]*dedupAggrSample, len(state.m))
- state.samplesBuf = make([]dedupAggrSample, 0, len(state.samplesBuf))
- state.sizeBytes.Store(0)
- state.itemsCount.Store(0)
- }
- state.mu.Unlock()
-
- if len(m) == 0 {
+ if state.count == 0 {
+ state.mu.Unlock()
return
}
+ samplesBuf := state.samplesBuf
+ keys := state.keys
+ n := len(samplesBuf)
+ state.samplesBuf = make([]dedupAggrSample, n)
+ state.mask = uint64(n - 1)
+ state.keys = make([]string, 0, len(keys))
+ state.count = 0
+ state.sizeBytes.Store(0)
+ state.itemsCount.Store(0)
+ state.mu.Unlock()
dstSamples := ctx.samples
- for key, s := range m {
+ for _, s := range samplesBuf {
+ if s.keyIdx == 0 {
+ continue
+ }
dstSamples = append(dstSamples, pushSample{
- key: key,
+ key: keys[s.keyIdx-1],
value: s.value,
timestamp: s.timestamp,
})

View File

@@ -1,8 +1,11 @@
package streamaggr
import (
"fmt"
"sync"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
)
@@ -92,7 +95,8 @@ type rateAggrValue struct {
isGreen bool
}
func (av *rateAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
func (av *rateAggrValue) pushSample(c aggrConfig, sample *pushSample, key string, deleteDeadline int64) {
ac := c.(*rateAggrConfig)
var state *rateAggrStateValue
sv, ok := av.shared[key]
if ok {
@@ -106,6 +110,7 @@ func (av *rateAggrValue) pushSample(_ aggrConfig, sample *pushSample, key string
} else {
// counter reset
state.increase += sample.value
ac.counterResetsTotal.Inc()
}
} else {
sv = getRateAggrSharedValue(av.isGreen)
@@ -169,14 +174,17 @@ func (av *rateAggrValue) state() any {
return av.shared
}
func newRateAggrConfig(isAvg bool) aggrConfig {
return &rateAggrConfig{
func newRateAggrConfig(ms *metrics.Set, metricLabels string, isAvg bool) aggrConfig {
cfg := rateAggrConfig{
isAvg: isAvg,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
return &cfg
}
type rateAggrConfig struct {
isAvg bool
isAvg bool
counterResetsTotal *metrics.Counter
}
func (*rateAggrConfig) getValue(s any) aggrValue {

View File

@@ -613,7 +613,8 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
}
outputsSeen := make(map[string]struct{}, len(cfg.Outputs))
for i, output := range cfg.Outputs {
ac, err := newOutputConfig(output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
outputMetricLabels := fmt.Sprintf(`output=%q,name=%q,path=%q,url=%q,position="%d"`, output, name, path, alias, aggrID)
ac, err := newOutputConfig(ms, outputMetricLabels, output, outputsSeen, useSharedState, ignoreFirstSampleInterval)
if err != nil {
return nil, err
}
@@ -723,7 +724,7 @@ func newAggregator(cfg *Config, path string, pushFunc PushFunc, ms *metrics.Set,
return a, nil
}
func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
func newOutputConfig(ms *metrics.Set, metricLabels, output string, outputsSeen map[string]struct{}, useSharedState bool, ignoreFirstSampleInterval time.Duration) (aggrConfig, error) {
// check for duplicated output
if _, ok := outputsSeen[output]; ok {
return nil, fmt.Errorf("`outputs` list contains duplicate aggregation function: %s", output)
@@ -769,9 +770,9 @@ func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedSt
case "histogram_bucket":
return newHistogramBucketAggrConfig(useSharedState), nil
case "increase":
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, true, true), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, true), nil
case "increase_prometheus":
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, true, false), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, true, false), nil
case "last":
return newLastAggrConfig(), nil
case "max":
@@ -779,9 +780,9 @@ func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedSt
case "min":
return newMinAggrConfig(), nil
case "rate_avg":
return newRateAggrConfig(true), nil
return newRateAggrConfig(ms, metricLabels, true), nil
case "rate_sum":
return newRateAggrConfig(false), nil
return newRateAggrConfig(ms, metricLabels, false), nil
case "stddev":
return newStddevAggrConfig(), nil
case "stdvar":
@@ -789,9 +790,9 @@ func newOutputConfig(output string, outputsSeen map[string]struct{}, useSharedSt
case "sum_samples":
return newSumSamplesAggrConfig(), nil
case "total":
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, false, true), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, true), nil
case "total_prometheus":
return newTotalAggrConfig(ignoreFirstSampleIntervalSecs, false, false), nil
return newTotalAggrConfig(ms, metricLabels, ignoreFirstSampleIntervalSecs, false, false), nil
case "unique_samples":
return newUniqueSamplesAggrConfig(), nil
default:

View File

@@ -1,8 +1,11 @@
package streamaggr
import (
"fmt"
"math"
"github.com/VictoriaMetrics/metrics"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/bytesutil"
"github.com/VictoriaMetrics/VictoriaMetrics/lib/fasttime"
)
@@ -38,6 +41,7 @@ func (av *totalAggrValue) pushSample(c aggrConfig, sample *pushSample, key strin
} else {
// counter reset
av.total += sample.value
ac.counterResetsTotal.Inc()
}
}
lv.value = sample.value
@@ -74,13 +78,15 @@ func (av *totalAggrValue) state() any {
return av.shared
}
func newTotalAggrConfig(ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
func newTotalAggrConfig(ms *metrics.Set, metricLabels string, ignoreFirstSampleIntervalSecs uint64, resetTotalOnFlush, keepFirstSample bool) aggrConfig {
ignoreFirstSampleDeadline := fasttime.UnixTimestamp() + ignoreFirstSampleIntervalSecs
return &totalAggrConfig{
cfg := &totalAggrConfig{
keepFirstSample: keepFirstSample,
resetTotalOnFlush: resetTotalOnFlush,
ignoreFirstSampleDeadline: ignoreFirstSampleDeadline,
}
cfg.counterResetsTotal = ms.NewCounter(fmt.Sprintf(`vm_streamaggr_counter_resets_total{%s}`, metricLabels))
return cfg
}
type totalAggrConfig struct {
@@ -93,6 +99,7 @@ type totalAggrConfig struct {
// This allows avoiding an initial spike of the output values at startup when new time series
// cannot be distinguished from already existing series. This is tracked with ignoreFirstSampleDeadline.
ignoreFirstSampleDeadline uint64
counterResetsTotal *metrics.Counter
}
func (*totalAggrConfig) getValue(s any) aggrValue {